diff --git a/frappe/deprecation_dumpster.py b/frappe/deprecation_dumpster.py index 3a5d837e04..48f76bbb44 100644 --- a/frappe/deprecation_dumpster.py +++ b/frappe/deprecation_dumpster.py @@ -117,7 +117,7 @@ def deprecation_warning(marked: str, graduation: str, msg: str): Color.RED, ) + colorize(f"{msg}\n", Color.YELLOW), - category=DeprecationWarning, + category=FrappeDeprecationWarning, stacklevel=2, ) @@ -475,61 +475,282 @@ def tests_timeout(*args, **kwargs): return timeout(*args, **kwargs) -def get_tests_FrappeTestCase(): - class CompatFrappeTestCase: +def get_tests_CompatFrappeTestCase(): + """Unfortunately, due to circular imports, we just have to copy the entire old implementation here, even though IntegrationTestCase is overwhelmingly api-compatible.""" + import copy + import datetime + import unittest + from collections.abc import Sequence + from contextlib import contextmanager + + import frappe + from frappe.model.base_document import BaseDocument + from frappe.utils import cint + + datetime_like_types = (datetime.datetime, datetime.date, datetime.time, datetime.timedelta) + + def _commit_watcher(): + import traceback + + print("Warning:, transaction committed during tests.") + traceback.print_stack(limit=10) + + def _rollback_db(): + frappe.db.value_cache = {} + frappe.db.rollback() + + def _restore_thread_locals(flags): + frappe.local.flags = flags + frappe.local.error_log = [] + frappe.local.message_log = [] + frappe.local.debug_log = [] + frappe.local.conf = frappe._dict(frappe.get_site_config()) + frappe.local.cache = {} + frappe.local.lang = "en" + frappe.local.preload_assets = {"style": [], "script": [], "icons": []} + + if hasattr(frappe.local, "request"): + delattr(frappe.local, "request") + + class FrappeTestCase(unittest.TestCase): + """Base test class for Frappe tests. + + + If you specify `setUpClass` then make sure to call `super().setUpClass` + otherwise this class will become ineffective. + """ + + @deprecated( + "frappe.tests.utils.FrappeTestCase", + "2024-20-08", + "v17", + "Import `frappe.tests.UnitTestCase` or `frappe.tests.IntegrationTestCase` respectively instead of `frappe.tests.utils.FrappeTestCase` - also see wiki for more info: https://github.com/frappe/frappe/wiki#testing-guide", + ) def __new__(cls, *args, **kwargs): - from frappe.tests import IntegrationTestCase + return super().__new__(cls) - class _CompatFrappeTestCase(IntegrationTestCase): - def __init__(self, *args, **kwargs): - deprecation_warning( - "2024-20-08", - "v17", - "Import `frappe.tests.UnitTestCase` or `frappe.tests.IntegrationTestCase` respectively instead of `frappe.tests.utils.FrappeTestCase`", - ) - super().__init__(*args, **kwargs) + TEST_SITE = "test_site" - return _CompatFrappeTestCase(*args, **kwargs) + SHOW_TRANSACTION_COMMIT_WARNINGS = False + maxDiff = 10_000 # prints long diffs but useful in CI - return CompatFrappeTestCase + @classmethod + def setUpClass(cls) -> None: + cls.TEST_SITE = getattr(frappe.local, "site", None) or cls.TEST_SITE + frappe.init(cls.TEST_SITE) + cls.ADMIN_PASSWORD = frappe.get_conf(cls.TEST_SITE).admin_password + cls._primary_connection = frappe.local.db + cls._secondary_connection = None + # flush changes done so far to avoid flake + frappe.db.commit() # nosemgrep + if cls.SHOW_TRANSACTION_COMMIT_WARNINGS: + frappe.db.before_commit.add(_commit_watcher) + # enqueue teardown actions (executed in LIFO order) + cls.addClassCleanup(_restore_thread_locals, copy.deepcopy(frappe.local.flags)) + cls.addClassCleanup(_rollback_db) -def get_tests_IntegrationTestCase(): - class CompatIntegrationTestCase: - def __new__(cls, *args, **kwargs): - from frappe.tests import IntegrationTestCase + return super().setUpClass() - class _CompatIntegrationTestCase(IntegrationTestCase): - def __init__(self, *args, **kwargs): - deprecation_warning( - "2024-20-08", - "v17", - "Import `frappe.tests.IntegrationTestCase` instead of `frappe.tests.utils.IntegrationTestCase`", - ) - super().__init__(*args, **kwargs) + def _apply_debug_decorator(self, exceptions=()): + from frappe.tests.utils import debug_on - return _CompatIntegrationTestCase(*args, **kwargs) + setattr(self, self._testMethodName, debug_on(*exceptions)(getattr(self, self._testMethodName))) - return CompatIntegrationTestCase + def assertSequenceSubset(self, larger: Sequence, smaller: Sequence, msg=None): + """Assert that `expected` is a subset of `actual`.""" + self.assertTrue(set(smaller).issubset(set(larger)), msg=msg) + # --- Frappe Framework specific assertions + def assertDocumentEqual(self, expected, actual): + """Compare a (partial) expected document with actual Document.""" -def get_tests_UnitTestCase(): - class CompatUnitTestCase: - def __new__(cls, *args, **kwargs): - from frappe.tests import UnitTestCase + if isinstance(expected, BaseDocument): + expected = expected.as_dict() - class _CompatUnitTestCase(UnitTestCase): - def __init__(self, *args, **kwargs): - deprecation_warning( - "2024-20-08", - "v17", - "Import `frappe.tests.UnitTestCase` instead of `frappe.tests.utils.UnitTestCase`", - ) - super().__init__(*args, **kwargs) + for field, value in expected.items(): + if isinstance(value, list): + actual_child_docs = actual.get(field) + self.assertEqual(len(value), len(actual_child_docs), msg=f"{field} length should be same") + for exp_child, actual_child in zip(value, actual_child_docs, strict=False): + self.assertDocumentEqual(exp_child, actual_child) + else: + self._compare_field(value, actual.get(field), actual, field) - return _CompatUnitTestCase(*args, **kwargs) + def _compare_field(self, expected, actual, doc: BaseDocument, field: str): + msg = f"{field} should be same." - return CompatUnitTestCase + if isinstance(expected, float): + precision = doc.precision(field) + self.assertAlmostEqual( + expected, actual, places=precision, msg=f"{field} should be same to {precision} digits" + ) + elif isinstance(expected, bool | int): + self.assertEqual(expected, cint(actual), msg=msg) + elif isinstance(expected, datetime_like_types) or isinstance(actual, datetime_like_types): + self.assertEqual(str(expected), str(actual), msg=msg) + else: + self.assertEqual(expected, actual, msg=msg) + + def normalize_html(self, code: str) -> str: + """Formats HTML consistently so simple string comparisons can work on them.""" + from bs4 import BeautifulSoup + + return BeautifulSoup(code, "html.parser").prettify(formatter=None) + + def normalize_sql(self, query: str) -> str: + """Formats SQL consistently so simple string comparisons can work on them.""" + import sqlparse + + return sqlparse.format(query.strip(), keyword_case="upper", reindent=True, strip_comments=True) + + @contextmanager + def primary_connection(self): + """Switch to primary DB connection + + This is used for simulating multiple users performing actions by simulating two DB connections""" + try: + current_conn = frappe.local.db + frappe.local.db = self._primary_connection + yield + finally: + frappe.local.db = current_conn + + @contextmanager + def secondary_connection(self): + """Switch to secondary DB connection.""" + if self._secondary_connection is None: + frappe.connect() # get second connection + self._secondary_connection = frappe.local.db + + try: + current_conn = frappe.local.db + frappe.local.db = self._secondary_connection + yield + finally: + frappe.local.db = current_conn + self.addCleanup(self._rollback_connections) + + def _rollback_connections(self): + self._primary_connection.rollback() + self._secondary_connection.rollback() + + def assertQueryEqual(self, first: str, second: str): + self.assertEqual(self.normalize_sql(first), self.normalize_sql(second)) + + @contextmanager + def assertQueryCount(self, count): + queries = [] + + def _sql_with_count(*args, **kwargs): + ret = orig_sql(*args, **kwargs) + queries.append(args[0].last_query) + return ret + + try: + orig_sql = frappe.db.__class__.sql + frappe.db.__class__.sql = _sql_with_count + yield + self.assertLessEqual(len(queries), count, msg="Queries executed: \n" + "\n\n".join(queries)) + finally: + frappe.db.__class__.sql = orig_sql + + @contextmanager + def assertRedisCallCounts(self, count): + commands = [] + + def execute_command_and_count(*args, **kwargs): + ret = orig_execute(*args, **kwargs) + key_len = 2 + if "H" in args[0]: + key_len = 3 + commands.append((args)[:key_len]) + return ret + + try: + orig_execute = frappe.cache.execute_command + frappe.cache.execute_command = execute_command_and_count + yield + self.assertLessEqual( + len(commands), count, msg="commands executed: \n" + "\n".join(str(c) for c in commands) + ) + finally: + frappe.cache.execute_command = orig_execute + + @contextmanager + def assertRowsRead(self, count): + rows_read = 0 + + def _sql_with_count(*args, **kwargs): + nonlocal rows_read + + ret = orig_sql(*args, **kwargs) + # count of last touched rows as per DB-API 2.0 https://peps.python.org/pep-0249/#rowcount + rows_read += cint(frappe.db._cursor.rowcount) + return ret + + try: + orig_sql = frappe.db.sql + frappe.db.sql = _sql_with_count + yield + self.assertLessEqual(rows_read, count, msg="Queries read more rows than expected") + finally: + frappe.db.sql = orig_sql + + @classmethod + def enable_safe_exec(cls) -> None: + """Enable safe exec and disable them after test case is completed.""" + from frappe.installer import update_site_config + from frappe.utils.safe_exec import SAFE_EXEC_CONFIG_KEY + + cls._common_conf = os.path.join(frappe.local.sites_path, "common_site_config.json") + update_site_config(SAFE_EXEC_CONFIG_KEY, 1, validate=False, site_config_path=cls._common_conf) + + cls.addClassCleanup( + lambda: update_site_config( + SAFE_EXEC_CONFIG_KEY, 0, validate=False, site_config_path=cls._common_conf + ) + ) + + @contextmanager + def set_user(self, user: str): + try: + old_user = frappe.session.user + frappe.set_user(user) + yield + finally: + frappe.set_user(old_user) + + @contextmanager + def switch_site(self, site: str): + """Switch connection to different site. + Note: Drops current site connection completely.""" + + try: + old_site = frappe.local.site + frappe.init(site, force=True) + frappe.connect() + yield + finally: + frappe.init(old_site, force=True) + frappe.connect() + + @contextmanager + def freeze_time(self, time_to_freeze, is_utc=False, *args, **kwargs): + import pytz + from freezegun import freeze_time + + from frappe.utils.data import convert_utc_to_timezone, get_datetime, get_system_timezone + + if not is_utc: + # Freeze time expects UTC or tzaware objects. We have neither, so convert to UTC. + timezone = pytz.timezone(get_system_timezone()) + time_to_freeze = timezone.localize(get_datetime(time_to_freeze)).astimezone(pytz.utc) + + with freeze_time(time_to_freeze, *args, **kwargs): + yield + + return FrappeTestCase @deprecated( diff --git a/frappe/testing/discovery.py b/frappe/testing/discovery.py index d29b1824c3..caba5522cd 100644 --- a/frappe/testing/discovery.py +++ b/frappe/testing/discovery.py @@ -27,7 +27,7 @@ from pathlib import Path from typing import TYPE_CHECKING import frappe -from frappe.tests import IntegrationTestCase +from frappe.tests import IntegrationTestCase, UnitTestCase from .utils import debug_timer @@ -128,7 +128,20 @@ def _add_module_tests(runner, app: str, module: str): for test in runner._iterate_suite(test_suite): if runner.cfg.tests and test._testMethodName not in runner.cfg.tests: continue - category = "integration" if isinstance(test, IntegrationTestCase) else "unit" + match test: + case IntegrationTestCase(): + category = "integration" + case UnitTestCase(): + category = "unit" + case _: + from frappe.deprecation_dumpster import deprecation_warning + + deprecation_warning( + "2024-20-08", + "v17", + "discovery and categorization of FrappeTestCase will be removed from this runner", + ) + category = "deprecated-old-style-unspecified" if runner.cfg.selected_categories and category not in runner.cfg.selected_categories: continue runner.per_app_categories[app][category].addTest(test) diff --git a/frappe/tests/utils/__init__.py b/frappe/tests/utils/__init__.py index da82b6a97a..1d1192eb6c 100644 --- a/frappe/tests/utils/__init__.py +++ b/frappe/tests/utils/__init__.py @@ -27,9 +27,7 @@ def check_orpahned_doctypes(): from frappe.deprecation_dumpster import ( - get_tests_FrappeTestCase, - get_tests_IntegrationTestCase, - get_tests_UnitTestCase, + get_tests_CompatFrappeTestCase, ) from frappe.deprecation_dumpster import ( tests_change_settings as change_settings, @@ -38,9 +36,7 @@ from frappe.deprecation_dumpster import ( tests_debug_on as debug_on, ) -FrappeTestCase = get_tests_FrappeTestCase() -IntegrationTestCase = get_tests_IntegrationTestCase() -UnitTestCase = get_tests_UnitTestCase() +FrappeTestCase = get_tests_CompatFrappeTestCase() from frappe.deprecation_dumpster import ( tests_patch_hooks as patch_hooks,