diff --git a/frappe/commands/utils.py b/frappe/commands/utils.py index 0980b0a38f..8f593049db 100644 --- a/frappe/commands/utils.py +++ b/frappe/commands/utils.py @@ -822,7 +822,7 @@ def run_tests( click.secho("Simply remove the flag.", fg="green") return - unit_ret, integration_ret = frappe.test_runner.main( + frappe.test_runner.main( site, app, module, @@ -841,18 +841,6 @@ def run_tests( selected_categories=[] if test_category == "all" else test_category, ) - if ( - len(unit_ret.failures) == 0 - and len(unit_ret.errors) == 0 - and len(integration_ret.failures) == 0 - and len(integration_ret.errors) == 0 - ): - ret = 0 - else: - ret = (unit_ret, integration_ret) - if os.environ.get("CI"): - sys.exit(ret) - @click.command("run-parallel-tests") @click.option("--app", help="For App", default="frappe") diff --git a/frappe/test_runner.py b/frappe/test_runner.py index 19e2e3a7c5..c557273911 100644 --- a/frappe/test_runner.py +++ b/frappe/test_runner.py @@ -9,6 +9,7 @@ as well as functions for creating and managing test records. from __future__ import annotations +import contextlib import cProfile import importlib import json @@ -18,6 +19,7 @@ import pstats import sys import time import unittest +from collections import defaultdict from dataclasses import dataclass, field from functools import wraps from io import StringIO @@ -34,6 +36,15 @@ from frappe.utils import cint SLOW_TEST_THRESHOLD = 2 +# Define category priorities +CATEGORY_PRIORITIES = { + "unit": 1, + "integration": 2, + "functional": 3, + # Add more categories and their priorities as needed +} + + logger = logging.getLogger(__name__) @@ -49,6 +60,12 @@ def debug_timer(func): return wrapper +def iterOnFirstArg(func): + return lambda self, arg, *args, **kwargs: [ + func(self, a, *args, **kwargs) for a in ([arg] if isinstance(arg, str | tuple) else arg) + ][-1] + + class TestRunner(unittest.TextTestRunner): def __init__( self, @@ -61,161 +78,148 @@ class TestRunner(unittest.TextTestRunner): warnings=None, *, tb_locals=False, - junit_xml_output: bool = False, - profile: bool = False, + cfg: TestConfig, ): super().__init__( stream=stream, descriptions=descriptions, verbosity=verbosity, - failfast=failfast, + failfast=cfg.failfast, buffer=buffer, - resultclass=resultclass or TestResult, + resultclass=resultclass or (TestResult if not cfg.junit_xml_output else None), warnings=warnings, tb_locals=tb_locals, ) - self.junit_xml_output = junit_xml_output - self.profile = profile + self.cfg = cfg + self.per_app_categories = defaultdict(lambda: defaultdict(unittest.TestSuite)) logger.debug("TestRunner initialized") - def run( - self, test_suites: tuple[unittest.TestSuite, unittest.TestSuite] - ) -> tuple[unittest.TestResult, unittest.TestResult | None]: - unit_suite, integration_suite = test_suites + def run(self) -> list[unittest.TestResult]: + results = [] + for app, categories in self.per_app_categories.items(): + sorted_categories = sorted( + categories.items(), key=lambda x: CATEGORY_PRIORITIES.get(x[0], float("inf")) + ) + for category, suite in sorted_categories: + if not self._has_tests(suite): + continue - if self.profile: + self._prepare_category(category, suite, app) + self._apply_debug_decorators(suite) + + with self._profile(): + click.secho( + f"\nRunning {suite.countTestCases()} {category} tests for {app}", fg="cyan", bold=True + ) + result = super().run(suite) + results.append((app, category, result)) + if not result.wasSuccessful() and self.cfg.failfast: + break + return results + + def _has_tests(self, suite): + return next(self._iterate_suite(suite), None) is not None + + def _prepare_category(self, category, suite, app): + dispatcher = { + "integration": self._prepare_integration, + # Add other categories here as needed + } + prepare_method = dispatcher.get(category.lower()) + if prepare_method: + prepare_method(suite, app) + else: + logger.warning(f"Unknown test category: {category}. No specific preparation performed.") + + def _apply_debug_decorators(self, suite): + if self.cfg.pdb_on_exceptions: + for test in self._iterate_suite(suite): + if hasattr(test, "_apply_debug_decorator"): + test._apply_debug_decorator(self.cfg.pdb_on_exceptions) + + @contextlib.contextmanager + def _profile(self): + if self.cfg.profile: pr = cProfile.Profile() pr.enable() - - # Run unit tests - click.echo( - "\n" + click.style(f"Running {unit_suite.countTestCases()} unit tests", fg="cyan", bold=True) - ) - unit_result = super().run(unit_suite) - - # Run integration tests only if unit tests pass - integration_result = None - if unit_result.wasSuccessful(): - click.echo( - "\n" - + click.style( - f"Running {integration_suite.countTestCases()} integration tests", - fg="cyan", - bold=True, - ) - ) - integration_result = super().run(integration_suite) - - if self.profile: + yield + if self.cfg.profile: pr.disable() s = StringIO() ps = pstats.Stats(pr, stream=s).sort_stats("cumulative") ps.print_stats() print(s.getvalue()) - return unit_result, integration_result + @iterOnFirstArg + def discover_tests(self, app: str) -> TestRunner: + logger.debug(f"Discovering tests for app: {app}") + app_path = Path(frappe.get_app_path(app)) + for path, folders, files in os.walk(app_path): + folders[:] = [f for f in folders if not f.startswith(".")] + for dontwalk in ("node_modules", "locals", "public", "__pycache__"): + if dontwalk in folders: + folders.remove(dontwalk) + if os.path.sep.join(["doctype", "doctype", "boilerplate"]) in path: + continue + path = Path(path) + for file in [ + path.joinpath(filename) + for filename in files + if filename.startswith("test_") and filename.endswith(".py") and filename != "test_runner.py" + ]: + module_name = f"{'.'.join(file.relative_to(app_path.parent).parent.parts)}.{file.stem}" + self._add_module_tests(app, module_name) + return self - def discover_tests( - self, apps: list[str], config: TestConfig - ) -> tuple[unittest.TestSuite, unittest.TestSuite]: - logger.debug(f"Discovering tests for apps: {apps}") - unit_test_suite = unittest.TestSuite() - integration_test_suite = unittest.TestSuite() + @iterOnFirstArg + def discover_module_tests(self, spec: tuple[str, str]) -> TestRunner: + app, module = spec + self._add_module_tests(app, module) + return self - for app in apps: - app_path = Path(frappe.get_app_path(app)) - for path, folders, files in os.walk(app_path): - folders[:] = [f for f in folders if not f.startswith(".")] - for dontwalk in ("node_modules", "locals", "public", "__pycache__"): - if dontwalk in folders: - folders.remove(dontwalk) - if os.path.sep.join(["doctype", "doctype", "boilerplate"]) in path: - # in /doctype/doctype/boilerplate/ - continue - - path = Path(path) - for file in [ - path.joinpath(filename) - for filename in files - if filename.startswith("test_") - and filename.endswith(".py") - and filename != "test_runner.py" - ]: - module_name = f"{'.'.join(file.relative_to(app_path.parent).parent.parts)}.{file.stem}" - module = importlib.import_module(module_name) - self._add_module_tests(module, unit_test_suite, integration_test_suite, config) - - logger.debug( - f"Discovered {unit_test_suite.countTestCases()} unit tests and {integration_test_suite.countTestCases()} integration tests" - ) - return unit_test_suite, integration_test_suite - - def discover_doctype_tests( - self, doctypes: str | list[str], config: TestConfig, force: bool = False - ) -> tuple[unittest.TestSuite, unittest.TestSuite]: - unit_test_suite = unittest.TestSuite() - integration_test_suite = unittest.TestSuite() - - if isinstance(doctypes, str): - doctypes = [doctypes] - - for doctype in doctypes: - module = frappe.db.get_value("DocType", doctype, "module") - if not module: - raise TestRunnerError(f"Invalid doctype {doctype}") - - test_module = get_module_name(doctype, module, "test_") - if force: - frappe.db.delete(doctype) - - try: - module = importlib.import_module(test_module) - self._add_module_tests(module, unit_test_suite, integration_test_suite, config) - except ImportError: - logger.warning(f"No test module found for doctype {doctype}") - - return unit_test_suite, integration_test_suite - - def discover_module_tests( - self, modules, config: TestConfig - ) -> tuple[unittest.TestSuite, unittest.TestSuite]: - unit_test_suite = unittest.TestSuite() - integration_test_suite = unittest.TestSuite() - - modules = [modules] if not isinstance(modules, list | tuple) else modules - - for module in modules: - module = importlib.import_module(module) - self._add_module_tests(module, unit_test_suite, integration_test_suite, config) - - return unit_test_suite, integration_test_suite - - def _add_module_tests( - self, - module, - unit_test_suite: unittest.TestSuite, - integration_test_suite: unittest.TestSuite, - config: TestConfig, - ): - if config.case: - test_suite = unittest.TestLoader().loadTestsFromTestCase(getattr(module, config.case)) + def _add_module_tests(self, app: str, module: str): + module = importlib.import_module(module) + if self.cfg.case: + test_suite = unittest.TestLoader().loadTestsFromTestCase(getattr(module, self.cfg.case)) else: test_suite = unittest.TestLoader().loadTestsFromModule(module) for test in self._iterate_suite(test_suite): - if config.tests and test._testMethodName not in config.tests: + if self.cfg.tests and test._testMethodName not in self.cfg.tests: continue - category = "integration" if isinstance(test, IntegrationTestCase) else "unit" - - if config.selected_categories and category not in config.selected_categories: + if self.cfg.selected_categories and category not in self.cfg.selected_categories: continue + self.per_app_categories[app or "default"][category].addTest(test) - config.categories[category].append(test) - if category == "unit": - unit_test_suite.addTest(test) - else: - integration_test_suite.addTest(test) + def _prepare_integration(self, suite: unittest.TestSuite, app: str) -> None: + """Prepare the environment for integration tests.""" + if not self.cfg.skip_before_tests: + self._run_before_test_hooks(app) + else: + logger.debug("Skipping before_tests hooks: Explicitly skipped") + + if app: + self._create_global_test_record_dependencies(app) + + @staticmethod + @debug_timer + def _run_before_test_hooks(app: str | None): + """Run 'before_tests' hooks""" + logger.debug('Running "before_tests" hooks') + for hook_function in frappe.get_hooks("before_tests", app_name=app): + frappe.get_attr(hook_function)() + + @staticmethod + @debug_timer + def _create_global_test_record_dependencies(app: str | None): + """Create global test record dependencies""" + test_module = frappe.get_module(f"{app}.tests") + if hasattr(test_module, "global_test_dependencies"): + logger.info("Creating global test record dependencies ...") + for doctype in test_module.global_test_dependencies: + logger.debug(f"Creating global test records for {doctype}") + make_test_records(doctype, commit=True) @staticmethod def _iterate_suite(suite): @@ -317,7 +321,6 @@ class TestConfig: tests: tuple = () case: str | None = None pdb_on_exceptions: tuple | None = None - categories: dict = field(default_factory=lambda: {"unit": [], "integration": []}) selected_categories: list[str] = field(default_factory=list) skip_before_tests: bool = False @@ -401,39 +404,63 @@ def main( try: # Create TestRunner instance runner = TestRunner( - resultclass=TestResult if not test_config.junit_xml_output else None, verbosity=2 if logger.getEffectiveLevel() < logging.INFO else 1, - failfast=test_config.failfast, tb_locals=logger.getEffectiveLevel() <= logging.INFO, - junit_xml_output=test_config.junit_xml_output, - profile=test_config.profile, + cfg=test_config, ) if doctype or doctype_list_path: doctype = _load_doctype_list(doctype_list_path) if doctype_list_path else doctype - unit_result, integration_result = _run_doctype_tests(doctype, test_config, runner, force, app) + results = _run_doctype_tests(doctype, runner, force, app) elif module_def: - unit_result, integration_result = _run_module_def_tests( - app, module_def, test_config, runner, force - ) + results = _run_module_def_tests(app, module_def, runner, force) elif module: - unit_result, integration_result = _run_module_tests(module, test_config, runner, app) + results = _run_module_tests(module, runner, app) else: - unit_result, integration_result = _run_all_tests(app, test_config, runner) + apps = [app] if app else frappe.get_installed_apps() + results = _run_all_tests(apps, runner) - print_test_results(unit_result, integration_result) + # Determine overall success by checking if any test suite failed + success = all(result.wasSuccessful() for _, result in results) + click.secho("\nTest Results:", fg="cyan", bold=True) - # Determine overall success - success = unit_result.wasSuccessful() and ( - integration_result is None or integration_result.wasSuccessful() - ) + def _print_result(category, result): + tests_run = result.testsRun + failures = len(result.failures) + errors = len(result.errors) + click.echo( + f"\n{click.style(f'{category} Tests:', bold=True)}\n" + f" Ran: {click.style(f'{tests_run:<3}', fg='cyan')}" + f" Failures: {click.style(f'{failures:<3}', fg='red' if failures else 'green')}" + f" Errors: {click.style(f'{errors:<3}', fg='red' if errors else 'green')}" + ) + + if failures > 0: + click.echo(f"\n{click.style(category + ' Test Failures:', fg='red', bold=True)}") + for i, failure in enumerate(result.failures, 1): + click.echo(f" {i}. {click.style(str(failure[0]), fg='yellow')}") + + if errors > 0: + click.echo(f"\n{click.style(category + ' Test Errors:', fg='red', bold=True)}") + for i, error in enumerate(result.errors, 1): + click.echo(f" {i}. {click.style(str(error[0]), fg='yellow')}") + click.echo(click.style(" " + str(error[1]).split("\n")[-2], fg="red")) + + for category, result in results: + _print_result(category.title(), result) + + if success: + click.echo(f"\n{click.style('All tests passed successfully!', fg='green', bold=True)}") + else: + click.echo(f"\n{click.style('Some tests failed or encountered errors.', fg='red', bold=True)}") if not success: sys.exit(1) - return unit_result, integration_result + return results finally: + _cleanup_after_tests() if xml_output_file: xml_output_file.close() @@ -441,49 +468,6 @@ def main( logger.debug(f"Total test run time: {end_time - start_time:.3f} seconds") -def print_test_results(unit_result: unittest.TestResult, integration_result: unittest.TestResult | None): - """Print detailed test results including failures and errors""" - click.echo("\n" + click.style("Test Results:", fg="cyan", bold=True)) - - def _print_result(result, category): - tests_run = result.testsRun - failures = len(result.failures) - errors = len(result.errors) - click.echo( - f"\n{click.style(f'{category} Tests:', bold=True)}\n" - f" Ran: {click.style(f'{tests_run:<3}', fg='cyan')}" - f" Failures: {click.style(f'{failures:<3}', fg='red' if failures else 'green')}" - f" Errors: {click.style(f'{errors:<3}', fg='red' if errors else 'green')}" - ) - - if failures > 0: - click.echo(f"\n{click.style(category + ' Test Failures:', fg='red', bold=True)}") - for i, failure in enumerate(result.failures, 1): - click.echo(f" {i}. {click.style(str(failure[0]), fg='yellow')}") - - if errors > 0: - click.echo(f"\n{click.style(category + ' Test Errors:', fg='red', bold=True)}") - for i, error in enumerate(result.errors, 1): - click.echo(f" {i}. {click.style(str(error[0]), fg='yellow')}") - click.echo(click.style(" " + str(error[1]).split("\n")[-2], fg="red")) - - _print_result(unit_result, "Unit") - - if integration_result: - _print_result(integration_result, "Integration") - - # Print overall status - total_failures = len(unit_result.failures) + ( - len(integration_result.failures) if integration_result else 0 - ) - total_errors = len(unit_result.errors) + (len(integration_result.errors) if integration_result else 0) - - if total_failures == 0 and total_errors == 0: - click.echo(f"\n{click.style('All tests passed successfully!', fg='green', bold=True)}") - else: - click.echo(f"\n{click.style('Some tests failed or encountered errors.', fg='red', bold=True)}") - - @debug_timer def _initialize_test_environment(site, config: TestConfig): """Initialize the test environment""" @@ -527,11 +511,11 @@ def _load_doctype_list(doctype_list_path): def _run_module_def_tests( - app, module_def, config: TestConfig, runner: TestRunner, force + app, module_def, runner: TestRunner, force ) -> tuple[unittest.TestResult, unittest.TestResult | None]: """Run tests for the specified module definition""" doctypes = _get_doctypes_for_module_def(app, module_def) - return _run_doctype_tests(doctypes, config, runner, force, app) + return _run_doctype_tests(doctypes, runner, force, app) def _get_doctypes_for_module_def(app, module_def): @@ -577,125 +561,56 @@ def _cleanup_after_tests(): @debug_timer -def _run_all_tests( - app: str | None, config: TestConfig, runner: TestRunner -) -> tuple[unittest.TestResult, unittest.TestResult | None]: +def _run_all_tests(apps: list[str], runner: TestRunner) -> list[unittest.TestResult]: """Run all tests for the specified app or all installed apps""" - - apps = [app] if app else frappe.get_installed_apps() logger.debug(f"Running tests for apps: {apps}") try: - unit_test_suite, integration_test_suite = runner.discover_tests(apps, config) - if config.pdb_on_exceptions: - for test_suite in (unit_test_suite, integration_test_suite): - for test_case in runner._iterate_suite(test_suite): - if hasattr(test_case, "_apply_debug_decorator"): - test_case._apply_debug_decorator(config.pdb_on_exceptions) - - for app in apps: - _prepare_integration_tests(runner, integration_test_suite, config, app) - res = runner.run((unit_test_suite, integration_test_suite)) - _cleanup_after_tests() - return res + runner = runner.discover_tests(apps).run() except Exception as e: - logger.error(f"Error running all tests for {app or 'all apps'}: {e!s}") - raise TestRunnerError(f"Failed to run tests for {app or 'all apps'}: {e!s}") from e + logger.error(f"Error running all tests for {apps or 'all apps'}: {e!s}") + raise TestRunnerError(f"Failed to run tests for {apps or 'all apps'}: {e!s}") from e @debug_timer def _run_doctype_tests( - doctypes, config: TestConfig, runner: TestRunner, force=False, app: str | None = None -) -> tuple[unittest.TestResult, unittest.TestResult | None]: + doctypes: str | list[str], runner: TestRunner, force=False, app: str | None = None +) -> list[unittest.TestResult]: """Run tests for the specified doctype(s)""" + if isinstance(doctypes, str): + doctypes = [doctypes] + args = [] + for doctype in doctypes: + module = frappe.db.get_value("DocType", doctype, "module") + if not module: + raise TestRunnerError(f"Invalid doctype {doctype}") + + # Check if the DocType belongs to the specified app + doctype_app = frappe.db.get_value("Module Def", module, "app_name") + if app and doctype_app != app: + raise TestRunnerError(f"DocType {doctype} does not belong to app {app}") + elif not app: + app = doctype_app + test_module = frappe.modules.utils.get_module_name(doctype, module, "test_") + args.append((app, test_module)) + force and frappe.db.delete(doctype) try: - unit_test_suite, integration_test_suite = runner.discover_doctype_tests(doctypes, config, force) - - if config.pdb_on_exceptions: - for test_suite in (unit_test_suite, integration_test_suite): - for test_case in runner._iterate_suite(test_suite): - if hasattr(test_case, "_apply_debug_decorator"): - test_case._apply_debug_decorator(config.pdb_on_exceptions) - _prepare_integration_tests(runner, integration_test_suite, config, app) - res = runner.run((unit_test_suite, integration_test_suite)) - _cleanup_after_tests() - return res + return runner.discover_module_tests(args).run(app) except Exception as e: logger.error(f"Error running tests for doctypes {doctypes}: {e!s}") raise TestRunnerError(f"Failed to run tests for doctypes: {e!s}") from e @debug_timer -def _run_module_tests( - module, config: TestConfig, runner: TestRunner, app: str | None = None -) -> tuple[unittest.TestResult, unittest.TestResult | None]: - """Run tests for the specified module""" +def _run_module_tests(module, runner: TestRunner, app: str | None = None) -> list[unittest.TestResult]: + """Run tests for the specified module python test module""" try: - unit_test_suite, integration_test_suite = runner.discover_module_tests(module, config) - - if config.pdb_on_exceptions: - for test_suite in (unit_test_suite, integration_test_suite): - for test_case in runner._iterate_suite(test_suite): - if hasattr(test_case, "_apply_debug_decorator"): - test_case._apply_debug_decorator(config.pdb_on_exceptions) - - _prepare_integration_tests(runner, integration_test_suite, config, app) - res = runner.run((unit_test_suite, integration_test_suite)) - _cleanup_after_tests() - return res + return runner.discover_module_tests((app, module)).run(app) except Exception as e: logger.error(f"Error running tests for module {module}: {e!s}") raise TestRunnerError(f"Failed to run tests for module: {e!s}") from e -def _prepare_integration_tests( - runner: TestRunner, integration_test_suite: unittest.TestSuite, config: TestConfig, app: str -) -> None: - """Prepare the environment for integration tests.""" - if next(runner._iterate_suite(integration_test_suite), None) is not None: - # Explanatory comment - """ - We perform specific setup steps only for integration tests: - - 1. Before Tests Hooks: - - Executed only for integration tests unless explicitly skipped. - - Provides necessary environment setup for integration tests. - - Skipped for unit tests to maintain their independence and isolation. - - 2. Global Test Record Creation: - - Performed only for integration tests. - - Creates or modifies global per-app database records needed for integration tests. - - Skipped for unit tests to maintain their isolation and reproducibility. - """ - if not config.skip_before_tests: - _run_before_test_hooks(config, app) - else: - logger.debug("Skipping before_tests hooks: Explicitly skipped") - if app: - _run_global_test_records_dependencies_install(app) - else: - logger.debug("Skipping before_tests hooks and global test record creation: No integration tests") - - -@debug_timer -def _run_before_test_hooks(config: TestConfig, app: str | None): - """Run 'before_tests' hooks""" - logger.debug(f'Running "before_tests" hooks for {app}') - for hook_function in frappe.get_hooks("before_tests", app_name=app): - frappe.get_attr(hook_function)() - - -@debug_timer -def _run_global_test_records_dependencies_install(app: str): - """Run global test records dependencies install""" - test_module = frappe.get_module(f"{app}.tests") - logger.debug(f"Loading global tests records from {test_module.__name__}") - if hasattr(test_module, "global_test_dependencies"): - for doctype in test_module.global_test_dependencies: - logger.debug(f" Loading records for {doctype}") - make_test_records(doctype, commit=True) - - # Backwards-compatible aliases from frappe.tests.utils import ( TestRecordLog,