import json import os import re import sys import time import unittest import click import frappe import requests from .test_runner import (SLOW_TEST_THRESHOLD, make_test_records, set_test_email_config) click_ctx = click.get_current_context(True) if click_ctx: click_ctx.color = True class ParallelTestRunner(): def __init__(self, app, site, build_number=1, total_builds=1): self.app = app self.site = site self.build_number = frappe.utils.cint(build_number) or 1 self.total_builds = frappe.utils.cint(total_builds) self.setup_test_site() self.run_tests() def setup_test_site(self): frappe.init(site=self.site) if not frappe.db: frappe.connect() frappe.flags.in_test = True frappe.clear_cache() frappe.utils.scheduler.disable_scheduler() set_test_email_config() self.before_test_setup() def before_test_setup(self): start_time = time.time() for fn in frappe.get_hooks("before_tests", app_name=self.app): frappe.get_attr(fn)() test_module = frappe.get_module(f'{self.app}.tests') if hasattr(test_module, "global_test_dependencies"): for doctype in test_module.global_test_dependencies: make_test_records(doctype) elapsed = time.time() - start_time elapsed = click.style(f' ({elapsed:.03}s)', fg='red') click.echo(f'Before Test {elapsed}') def run_tests(self): self.test_result = ParallelTestResult(stream=sys.stderr, descriptions=True, verbosity=2) for test_file_info in self.get_test_file_list(): self.run_tests_for_file(test_file_info) self.print_result() def run_tests_for_file(self, file_info): if not file_info: return frappe.set_user('Administrator') path, filename = file_info module = self.get_module(path, filename) self.create_test_dependency_records(module, path, filename) test_suite = unittest.TestSuite() module_test_cases = unittest.TestLoader().loadTestsFromModule(module) test_suite.addTest(module_test_cases) test_suite(self.test_result) def create_test_dependency_records(self, module, path, filename): if hasattr(module, "test_dependencies"): for doctype in module.test_dependencies: make_test_records(doctype) if os.path.basename(os.path.dirname(path)) == "doctype": # test_data_migration_connector.py > data_migration_connector.json test_record_filename = re.sub('^test_', '', filename).replace(".py", ".json") test_record_file_path = os.path.join(path, test_record_filename) if os.path.exists(test_record_file_path): with open(test_record_file_path, 'r') as f: doc = json.loads(f.read()) doctype = doc["name"] make_test_records(doctype) def get_module(self, path, filename): app_path = frappe.get_pymodule_path(self.app) relative_path = os.path.relpath(path, app_path) if relative_path == '.': module_name = self.app else: relative_path = relative_path.replace('/', '.') module_name = os.path.splitext(filename)[0] module_name = f'{self.app}.{relative_path}.{module_name}' return frappe.get_module(module_name) def print_result(self): self.test_result.printErrors() click.echo(self.test_result) if self.test_result.failures or self.test_result.errors: if os.environ.get('CI'): sys.exit(1) def get_test_file_list(self): test_list = get_all_tests(self.app) split_size = frappe.utils.ceil(len(test_list) / self.total_builds) # [1,2,3,4,5,6] to [[1,2], [3,4], [4,6]] if split_size is 2 test_chunks = [test_list[x:x+split_size] for x in range(0, len(test_list), split_size)] return test_chunks[self.build_number - 1] class ParallelTestResult(unittest.TextTestResult): def startTest(self, test): self._started_at = time.time() super(unittest.TextTestResult, self).startTest(test) test_class = unittest.util.strclass(test.__class__) if not hasattr(self, 'current_test_class') or self.current_test_class != test_class: click.echo(f"\n{unittest.util.strclass(test.__class__)}") self.current_test_class = test_class def getTestMethodName(self, test): return test._testMethodName if hasattr(test, '_testMethodName') else str(test) def addSuccess(self, test): super(unittest.TextTestResult, self).addSuccess(test) elapsed = time.time() - self._started_at threshold_passed = elapsed >= SLOW_TEST_THRESHOLD elapsed = click.style(f' ({elapsed:.03}s)', fg='red') if threshold_passed else '' click.echo(f" {click.style(' ✔ ', fg='green')} {self.getTestMethodName(test)}{elapsed}") def addError(self, test, err): super(unittest.TextTestResult, self).addError(test, err) click.echo(f" {click.style(' ✖ ', fg='red')} {self.getTestMethodName(test)}") def addFailure(self, test, err): super(unittest.TextTestResult, self).addFailure(test, err) click.echo(f" {click.style(' ✖ ', fg='red')} {self.getTestMethodName(test)}") def addSkip(self, test, reason): super(unittest.TextTestResult, self).addSkip(test, reason) click.echo(f" {click.style(' = ', fg='white')} {self.getTestMethodName(test)}") def addExpectedFailure(self, test, err): super(unittest.TextTestResult, self).addExpectedFailure(test, err) click.echo(f" {click.style(' ✖ ', fg='red')} {self.getTestMethodName(test)}") def addUnexpectedSuccess(self, test): super(unittest.TextTestResult, self).addUnexpectedSuccess(test) click.echo(f" {click.style(' ✔ ', fg='green')} {self.getTestMethodName(test)}") def printErrors(self): click.echo('\n') self.printErrorList(' ERROR ', self.errors, 'red') self.printErrorList(' FAIL ', self.failures, 'red') def printErrorList(self, flavour, errors, color): for test, err in errors: click.echo(self.separator1) click.echo(f"{click.style(flavour, bg=color)} {self.getDescription(test)}") click.echo(self.separator2) click.echo(err) def __str__(self): return f"Tests: {self.testsRun}, Failing: {len(self.failures)}, Errors: {len(self.errors)}" def get_all_tests(app): test_file_list = [] for path, folders, files in os.walk(frappe.get_pymodule_path(app)): for dontwalk in ('locals', '.git', 'public', '__pycache__'): if dontwalk in folders: folders.remove(dontwalk) # for predictability folders.sort() files.sort() if os.path.sep.join(["doctype", "doctype", "boilerplate"]) in path: # in /doctype/doctype/boilerplate/ continue for filename in files: if filename.startswith("test_") and filename.endswith(".py") \ and filename != 'test_runner.py': test_file_list.append([path, filename]) return test_file_list class ParallelTestWithOrchestrator(ParallelTestRunner): ''' This can be used to balance-out test time across multiple instances This is dependent on external orchestrator which returns next test to run orchestrator endpoints - register-instance (, , test_spec_list) - get-next-test-spec (, ) - test-completed (, ) ''' def __init__(self, app, site): self.orchestrator_url = os.environ.get('ORCHESTRATOR_URL') if not self.orchestrator_url: click.echo('ORCHESTRATOR_URL environment variable not found!') click.echo('Pass public URL after hosting https://github.com/frappe/test-orchestrator') sys.exit(1) self.ci_build_id = os.environ.get('CI_BUILD_ID') self.ci_instance_id = os.environ.get('CI_INSTANCE_ID') or frappe.generate_hash(length=10) if not self.ci_build_id: click.echo('CI_BUILD_ID environment variable not found!') sys.exit(1) ParallelTestRunner.__init__(self, app, site) def run_tests(self): self.test_status = 'ongoing' self.register_instance() super().run_tests() def get_test_file_list(self): while self.test_status == 'ongoing': yield self.get_next_test() def register_instance(self): test_spec_list = get_all_tests(self.app) response_data = self.call_orchestrator('register-instance', data={ 'test_spec_list': test_spec_list }) self.is_master = response_data.get('is_master') def get_next_test(self): response_data = self.call_orchestrator('get-next-test-spec') self.test_status = response_data.get('status') return response_data.get('next_test') def print_result(self): self.call_orchestrator('test-completed') return super().print_result() def call_orchestrator(self, endpoint, data=None): if data is None: data = {} # add repo token header # build id in header headers = { 'CI-BUILD-ID': self.ci_build_id, 'CI-INSTANCE-ID': self.ci_instance_id, 'REPO-TOKEN': '2948288382838DE' } url = f'{self.orchestrator_url}/{endpoint}' res = requests.get(url, json=data, headers=headers) res.raise_for_status() response_data = {} if 'application/json' in res.headers.get('content-type'): response_data = res.json() return response_data