From 1e2a74bcd9ee8d2ed2467fb93f6c47d6901874b0 Mon Sep 17 00:00:00 2001 From: David Arnold Date: Fri, 4 Oct 2024 20:27:46 +0200 Subject: [PATCH] refactor: Improve Test Runner 2a (#27984) * refactor: Improve readability and maintainability of main function * refactor: Simplify and optimize the run_all_tests function * refactor: simplify run_tests_for_doctype function * refactor: hint private methods * refactor: simplify _run_unittest function * refactor: simplify _add_test function * fix: Rename `iterate_suite` to `_iterate_suite` --- frappe/test_runner.py | 295 ++++++++++++++++++++++-------------------- 1 file changed, 154 insertions(+), 141 deletions(-) diff --git a/frappe/test_runner.py b/frappe/test_runner.py index 61c45acafb..5da704f688 100644 --- a/frappe/test_runner.py +++ b/frappe/test_runner.py @@ -53,6 +53,7 @@ class TestConfig: junit_xml_output: bool = False tests: tuple = () case: str | None = None + pdb_on_exceptions: tuple | None = None def xmlrunner_wrapper(output): @@ -92,15 +93,46 @@ def main( """Main function to run tests""" global unittest_runner - # Construct TestConfig object - test_config = TestConfig() - test_config.verbose = verbose - test_config.profile = profile - test_config.failfast = failfast - test_config.junit_xml_output = bool(junit_xml_output) - test_config.tests = tests - test_config.case = case + test_config = TestConfig( + verbose=verbose, + profile=profile, + failfast=failfast, + junit_xml_output=bool(junit_xml_output), + tests=tests, + case=case, + pdb_on_exceptions=pdb_on_exceptions, + ) + _initialize_test_environment(site, skip_before_tests, skip_test_records, test_config) + xml_output_file = _setup_xml_output(junit_xml_output) + + try: + scheduler_disabled_by_user = _disable_scheduler_if_needed() + + if not frappe.flags.skip_before_tests: + _run_before_test_hooks(test_config, app) + + if doctype or doctype_list_path: + doctype = _load_doctype_list(doctype_list_path) if doctype_list_path else doctype + test_result = _run_doctype_tests(doctype, test_config, force) + elif module_def: + test_result = _run_module_def_tests(app, module_def, test_config, force) + elif module: + test_result = _run_module_tests(module, test_config) + else: + test_result = _run_all_tests(app, test_config) + + _cleanup_after_tests(scheduler_disabled_by_user) + + return test_result + + finally: + if xml_output_file: + xml_output_file.close() + + +def _initialize_test_environment(site, skip_before_tests, skip_test_records, test_config): + """Initialize the test environment""" frappe.init(site) if not frappe.db: frappe.connect() @@ -108,74 +140,82 @@ def main( frappe.flags.skip_before_tests = skip_before_tests frappe.flags.skip_test_records = skip_test_records - if doctype_list_path: - app, doctype_list_path = doctype_list_path.split(os.path.sep, 1) - with open(frappe.get_app_path(app, doctype_list_path)) as f: - doctype = f.read().strip().splitlines() + # Set various test-related flags + frappe.flags.print_messages = test_config.verbose + frappe.flags.in_test = True + frappe.clear_cache() + + +def _setup_xml_output(junit_xml_output): + """Setup XML output for test results if specified""" + global unittest_runner - xmloutput_fh = None if junit_xml_output: - with open(junit_xml_output, "wb") as xmloutput_fh: - unittest_runner = xmlrunner_wrapper(xmloutput_fh) + xml_output_file = open(junit_xml_output, "wb") + unittest_runner = xmlrunner_wrapper(xml_output_file) + return xml_output_file else: unittest_runner = unittest.TextTestRunner + return None - try: - frappe.flags.print_messages = test_config.verbose - frappe.flags.in_test = True - frappe.flags.pdb_on_exceptions = pdb_on_exceptions - # workaround! since there is no separate test db - frappe.clear_cache() - scheduler_disabled_by_user = frappe.utils.scheduler.is_scheduler_disabled(verbose=False) - if not scheduler_disabled_by_user: - frappe.utils.scheduler.disable_scheduler() +def _disable_scheduler_if_needed(): + """Disable scheduler if it's not already disabled""" + scheduler_disabled_by_user = frappe.utils.scheduler.is_scheduler_disabled(verbose=False) + if not scheduler_disabled_by_user: + frappe.utils.scheduler.disable_scheduler() + return scheduler_disabled_by_user - if not frappe.flags.skip_before_tests: - if test_config.verbose: - print('Running "before_tests" hooks') - for fn in frappe.get_hooks("before_tests", app_name=app): - frappe.get_attr(fn)() - if doctype: - ret = run_tests_for_doctype(doctype, test_config, force) - elif module_def: - doctypes = [] - doctypes_ = frappe.get_list( - "DocType", - filters={"module": module_def, "istable": 0}, - fields=["name", "module"], - as_list=True, - ) - for doctype, module in doctypes_: - test_module = get_module_name(doctype, module, "test_", app=app) - try: - importlib.import_module(test_module) - except Exception: - pass - else: - doctypes.append(doctype) +def _run_before_test_hooks(test_config, app): + """Run 'before_tests' hooks if not skipped by the caller""" + if test_config.verbose: + print('Running "before_tests" hooks') + for hook_function in frappe.get_hooks("before_tests", app_name=app): + frappe.get_attr(hook_function)() - ret = run_tests_for_doctype(doctypes, test_config, force) - elif module: - ret = run_tests_for_module(module, test_config) - else: - ret = run_all_tests(app, test_config) - if not scheduler_disabled_by_user: - frappe.utils.scheduler.enable_scheduler() +def _load_doctype_list(doctype_list_path): + """Load the list of doctypes from the specified file""" + app, path = doctype_list_path.split(os.path.sep, 1) + with open(frappe.get_app_path(app, path)) as f: + return f.read().strip().splitlines() - if frappe.db: - frappe.db.commit() - # workaround! since there is no separate test db - frappe.clear_cache() - return ret +def _run_module_def_tests(app, module_def, test_config, force): + """Run tests for the specified module definition""" + doctypes = _get_doctypes_for_module_def(app, module_def) + return _run_doctype_tests(doctypes, test_config, force) - finally: - if xmloutput_fh: - xmloutput_fh.flush() - xmloutput_fh.close() + +def _get_doctypes_for_module_def(app, module_def): + """Get the list of doctypes for the specified module definition""" + doctypes = [] + doctypes_ = frappe.get_list( + "DocType", + filters={"module": module_def, "istable": 0}, + fields=["name", "module"], + as_list=True, + ) + for doctype, module in doctypes_: + test_module = get_module_name(doctype, module, "test_", app=app) + try: + importlib.import_module(test_module) + doctypes.append(doctype) + except Exception: + pass + return doctypes + + +def _cleanup_after_tests(scheduler_disabled_by_user): + """Perform cleanup operations after running tests""" + if not scheduler_disabled_by_user: + frappe.utils.scheduler.enable_scheduler() + + if frappe.db: + frappe.db.commit() + + frappe.clear_cache() class TimeLoggingTestResult(unittest.TextTestResult): @@ -193,36 +233,27 @@ class TimeLoggingTestResult(unittest.TextTestResult): super().addSuccess(test) -def run_all_tests(app: str | None, config: TestConfig) -> unittest.TestResult: +def _run_all_tests(app: str | None, config: TestConfig) -> unittest.TestResult: """Run all tests for the specified app or all installed apps""" - apps: list[str] = [app] if app else frappe.get_installed_apps() - + apps = [app] if app else frappe.get_installed_apps() test_suite = unittest.TestSuite() + for app in apps: for path, folders, files in os.walk(frappe.get_app_path(app)): - for dontwalk in ("locals", ".git", "public", "__pycache__"): - if dontwalk in folders: - folders.remove(dontwalk) - - # for predictability + folders[:] = [f for f in folders if f not in ("locals", ".git", "public", "__pycache__")] folders.sort() files.sort() - # print path for filename in files: if filename.startswith("test_") and filename.endswith(".py") and filename != "test_runner.py": - # print filename[:-3] _add_test(app, path, filename, config.verbose, test_suite) - if config.junit_xml_output: - runner = unittest_runner(verbosity=1 + cint(config.verbose), failfast=config.failfast) - else: - runner = unittest_runner( - resultclass=TimeLoggingTestResult, - verbosity=1 + cint(config.verbose), - failfast=config.failfast, - tb_locals=config.verbose, - ) + runner = unittest_runner( + resultclass=TimeLoggingTestResult if not config.junit_xml_output else None, + verbosity=1 + cint(config.verbose), + failfast=config.failfast, + tb_locals=config.verbose, + ) if config.profile: pr = cProfile.Profile() @@ -233,30 +264,26 @@ def run_all_tests(app: str | None, config: TestConfig) -> unittest.TestResult: if config.profile: pr.disable() s = StringIO() - ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") - ps.print_stats() + pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats() print(s.getvalue()) return out -def run_tests_for_doctype(doctypes, config: TestConfig, force=False): +def _run_doctype_tests(doctypes, config: TestConfig, force=False): """Run tests for the specified doctype(s)""" try: modules = [] - if not isinstance(doctypes, list | tuple): - doctypes = [doctypes] + doctypes = [doctypes] if not isinstance(doctypes, list | tuple) else doctypes for doctype in doctypes: module = frappe.db.get_value("DocType", doctype, "module") if not module: - logger.error(f"Invalid doctype {doctype}") raise TestRunnerError(f"Invalid doctype {doctype}") test_module = get_module_name(doctype, module, "test_") if force: - for name in frappe.db.sql_list(f"select name from `tab{doctype}`"): - frappe.delete_doc(doctype, name, force=True) + frappe.db.delete(doctype) make_test_records(doctype, verbose=config.verbose, force=force, commit=True) modules.append(importlib.import_module(test_module)) @@ -266,7 +293,7 @@ def run_tests_for_doctype(doctypes, config: TestConfig, force=False): raise TestRunnerError(f"Failed to run tests for doctypes: {e!s}") from e -def run_tests_for_module(module, config: TestConfig): +def _run_module_tests(module, config: TestConfig): """Run tests for the specified module""" module = importlib.import_module(module) if hasattr(module, "test_dependencies"): @@ -280,52 +307,39 @@ def run_tests_for_module(module, config: TestConfig): def _run_unittest(modules, config: TestConfig): """Run unittest for the specified module(s)""" frappe.db.begin() - + modules = [modules] if not isinstance(modules, list | tuple) else modules final_test_suite = unittest.TestSuite() - if not isinstance(modules, list | tuple): - modules = [modules] - - def iterate_suite(suite): - for test in suite: - if isinstance(test, unittest.TestSuite): - yield from iterate_suite(test) - elif isinstance(test, unittest.TestCase): - yield test - for module in modules: + test_suite = unittest.TestLoader().loadTestsFromModule(module) if config.case: test_suite = unittest.TestLoader().loadTestsFromTestCase(getattr(module, config.case)) - else: - test_suite = unittest.TestLoader().loadTestsFromModule(module) + if config.tests: - for test_case in iterate_suite(test_suite): - if test_case._testMethodName in config.tests: - final_test_suite.addTest(test_case) + final_test_suite.addTests( + test for test in _iterate_suite(test_suite) if test._testMethodName in config.tests + ) else: final_test_suite.addTest(test_suite) - if frappe.flags.pdb_on_exceptions: - for test_case in iterate_suite(final_test_suite): - if hasattr(test_case, "_apply_debug_decorator"): - test_case._apply_debug_decorator(frappe.flags.pdb_on_exceptions) + if config.pdb_on_exceptions: + for test_case in _iterate_suite(final_test_suite): + if hasattr(test_case, "_apply_debug_decorator"): + test_case._apply_debug_decorator(config.pdb_on_exceptions) - if config.junit_xml_output: - runner = unittest_runner(verbosity=1 + cint(config.verbose), failfast=config.failfast) - else: - runner = unittest_runner( - resultclass=TimeLoggingTestResult, - verbosity=1 + cint(config.verbose), - failfast=config.failfast, - tb_locals=config.verbose, - ) + runner = unittest_runner( + resultclass=None if config.junit_xml_output else TimeLoggingTestResult, + verbosity=1 + cint(config.verbose), + failfast=config.failfast, + tb_locals=config.verbose, + ) + + frappe.flags.tests_verbose = config.verbose if config.profile: pr = cProfile.Profile() pr.enable() - frappe.flags.tests_verbose = config.verbose - out = runner.run(final_test_suite) if config.profile: @@ -338,37 +352,36 @@ def _run_unittest(modules, config: TestConfig): return out +def _iterate_suite(suite): + """Helper function to iterate through a test suite""" + for test in suite: + if isinstance(test, unittest.TestSuite): + yield from _iterate_suite(test) + elif isinstance(test, unittest.TestCase): + yield test + + def _add_test(app, path, filename, verbose, test_suite=None): - import os - - if os.path.sep.join(["doctype", "doctype", "boilerplate"]) in path: - # in /doctype/doctype/boilerplate/ - return - app_path = frappe.get_app_path(app) relative_path = os.path.relpath(path, app_path) - if relative_path == ".": - module_name = app - else: - module_name = "{app}.{relative_path}.{module_name}".format( - app=app, relative_path=relative_path.replace("/", "."), module_name=filename[:-3] - ) + if os.path.sep.join(["doctype", "doctype", "boilerplate"]) in path: + return # Skip boilerplate files + + module_name = f"{app}.{relative_path.replace('/', '.')}.{filename[:-3]}" if relative_path != "." else app module = importlib.import_module(module_name) if hasattr(module, "test_dependencies"): for doctype in module.test_dependencies: make_test_records(doctype, verbose=verbose, commit=True) - if not test_suite: - test_suite = unittest.TestSuite() + test_suite = test_suite or unittest.TestSuite() if os.path.basename(os.path.dirname(path)) == "doctype": - txt_file = os.path.join(path, filename[5:].replace(".py", ".json")) - if os.path.exists(txt_file): - with open(txt_file) as f: - doc = json.loads(f.read()) - doctype = doc["name"] + json_file = os.path.join(path, filename[5:].replace(".py", ".json")) + if os.path.exists(json_file): + with open(json_file) as f: + doctype = json.loads(f.read())["name"] make_test_records(doctype, verbose, commit=True) test_suite.addTest(unittest.TestLoader().loadTestsFromModule(module))