# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE # imports - standard imports import gzip import importlib import json import os import shlex import shutil import subprocess import unittest from contextlib import contextmanager from functools import wraps from glob import glob from typing import List, Optional from unittest.case import skipIf from unittest.mock import patch # imports - third party imports import click from click import Command from click.testing import CliRunner, Result # imports - module imports import frappe import frappe.commands.site import frappe.commands.utils import frappe.recorder from frappe.installer import add_to_installed_apps, remove_app from frappe.utils import add_to_date, get_bench_path, get_bench_relative_path, now from frappe.utils.backups import fetch_latest_backups _result: Optional[Result] = None TEST_SITE = "commands-site-O4PN2QKA.test" # added random string tag to avoid collisions CLI_CONTEXT = frappe._dict(sites=[TEST_SITE]) def clean(value) -> str: """Strips and converts bytes to str Args: value ([type]): [description] Returns: [type]: [description] """ if isinstance(value, bytes): value = value.decode() if isinstance(value, str): value = value.strip() return value def missing_in_backup(doctypes: List, file: os.PathLike) -> List: """Returns list of missing doctypes in the backup. Args: doctypes (list): List of DocTypes to be checked file (str): Path of the database file Returns: doctypes(list): doctypes that are missing in backup """ predicate = 'COPY public."tab{}"' if frappe.conf.db_type == "postgres" else "CREATE TABLE `tab{}`" with gzip.open(file, "rb") as f: content = f.read().decode("utf8").lower() return [doctype for doctype in doctypes if predicate.format(doctype).lower() not in content] def exists_in_backup(doctypes: List, file: os.PathLike) -> bool: """Checks if the list of doctypes exist in the database.sql.gz file supplied Args: doctypes (list): List of DocTypes to be checked file (str): Path of the database file Returns: bool: True if all tables exist """ missing_doctypes = missing_in_backup(doctypes, file) return len(missing_doctypes) == 0 @contextmanager def maintain_locals(): pre_site = frappe.local.site pre_flags = frappe.local.flags.copy() pre_db = frappe.local.db try: yield finally: post_site = getattr(frappe.local, "site", None) if not post_site or post_site != pre_site: frappe.init(site=pre_site) frappe.local.db = pre_db frappe.local.flags.update(pre_flags) def pass_test_context(f): @wraps(f) def decorated_function(*args, **kwargs): return f(CLI_CONTEXT, *args, **kwargs) return decorated_function @contextmanager def cli(cmd: Command, args: Optional[List] = None): with maintain_locals(): global _result patch_ctx = patch("frappe.commands.pass_context", pass_test_context) _module = cmd.callback.__module__ _cmd = cmd.callback.__qualname__ __module = importlib.import_module(_module) patch_ctx.start() importlib.reload(__module) click_cmd = getattr(__module, _cmd) try: _result = CliRunner().invoke(click_cmd, args=args) _result.command = str(cmd) yield _result finally: patch_ctx.stop() __module = importlib.import_module(_module) importlib.reload(__module) importlib.invalidate_caches() class BaseTestCommands(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.setup_test_site() return super().setUpClass() @classmethod def execute(self, command, kwargs=None): site = {"site": frappe.local.site} cmd_input = None if kwargs: cmd_input = kwargs.get("cmd_input", None) if cmd_input: if not isinstance(cmd_input, bytes): raise Exception(f"The input should be of type bytes, not {type(cmd_input).__name__}") del kwargs["cmd_input"] kwargs.update(site) else: kwargs = site self.command = " ".join(command.split()).format(**kwargs) click.secho(self.command, fg="bright_black") command = shlex.split(self.command) self._proc = subprocess.run( command, input=cmd_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) self.stdout = clean(self._proc.stdout) self.stderr = clean(self._proc.stderr) self.returncode = clean(self._proc.returncode) @classmethod def setup_test_site(cls): cmd_config = { "test_site": TEST_SITE, "admin_password": frappe.conf.admin_password, "root_login": frappe.conf.root_login, "root_password": frappe.conf.root_password, "db_type": frappe.conf.db_type, } if not os.path.exists(os.path.join(TEST_SITE, "site_config.json")): cls.execute( "bench new-site {test_site} --admin-password {admin_password} --db-type" " {db_type}", cmd_config, ) def _formatMessage(self, msg, standardMsg): output = super(BaseTestCommands, self)._formatMessage(msg, standardMsg) if not hasattr(self, "command") and _result: command = _result.command stdout = _result.stdout_bytes.decode() if _result.stdout_bytes else None stderr = _result.stderr_bytes.decode() if _result.stderr_bytes else None returncode = _result.exit_code else: command = self.command stdout = self.stdout stderr = self.stderr returncode = self.returncode cmd_execution_summary = "\n".join( [ "-" * 70, "Last Command Execution Summary:", "Command: {}".format(command) if command else "", "Standard Output: {}".format(stdout) if stdout else "", "Standard Error: {}".format(stderr) if stderr else "", "Return Code: {}".format(returncode) if returncode else "", ] ).strip() return "{}\n\n{}".format(output, cmd_execution_summary) class TestCommands(BaseTestCommands): def test_execute(self): # test 1: execute a command expecting a numeric output self.execute("bench --site {site} execute frappe.db.get_database_size") self.assertEqual(self.returncode, 0) self.assertIsInstance(float(self.stdout), float) # test 2: execute a command expecting an errored output as local won't exist self.execute("bench --site {site} execute frappe.local.site") self.assertEqual(self.returncode, 1) self.assertIsNotNone(self.stderr) # test 3: execute a command with kwargs # Note: # terminal command has been escaped to avoid .format string replacement # The returned value has quotes which have been trimmed for the test self.execute("""bench --site {site} execute frappe.bold --kwargs '{{"text": "DocType"}}'""") self.assertEqual(self.returncode, 0) self.assertEqual(self.stdout[1:-1], frappe.bold(text="DocType")) @unittest.skip def test_restore(self): # step 0: create a site to run the test on global_config = { "admin_password": frappe.conf.admin_password, "root_login": frappe.conf.root_login, "root_password": frappe.conf.root_password, "db_type": frappe.conf.db_type, } site_data = {"test_site": TEST_SITE, **global_config} for key, value in global_config.items(): if value: self.execute(f"bench set-config {key} {value} -g") # test 1: bench restore from full backup self.execute("bench --site {test_site} backup --ignore-backup-conf", site_data) self.execute( "bench --site {test_site} execute frappe.utils.backups.fetch_latest_backups", site_data, ) site_data.update({"database": json.loads(self.stdout)["database"]}) self.execute("bench --site {test_site} restore {database}", site_data) # test 2: restore from partial backup self.execute("bench --site {test_site} backup --exclude 'ToDo'", site_data) site_data.update({"kw": "\"{'partial':True}\""}) self.execute( "bench --site {test_site} execute" " frappe.utils.backups.fetch_latest_backups --kwargs {kw}", site_data, ) site_data.update({"database": json.loads(self.stdout)["database"]}) self.execute("bench --site {test_site} restore {database}", site_data) self.assertEqual(self.returncode, 1) def test_partial_restore(self): _now = now() for num in range(10): frappe.get_doc( { "doctype": "ToDo", "date": add_to_date(_now, days=num), "description": frappe.mock("paragraph"), } ).insert() frappe.db.commit() todo_count = frappe.db.count("ToDo") # check if todos exist, create a partial backup and see if the state is the same after restore self.assertIsNot(todo_count, 0) self.execute("bench --site {site} backup --only 'ToDo'") db_path = fetch_latest_backups(partial=True)["database"] self.assertTrue("partial" in db_path) frappe.db.sql_ddl("DROP TABLE IF EXISTS `tabToDo`") frappe.db.commit() self.execute("bench --site {site} partial-restore {path}", {"path": db_path}) self.assertEqual(self.returncode, 0) self.assertEqual(frappe.db.count("ToDo"), todo_count) def test_recorder(self): frappe.recorder.stop() self.execute("bench --site {site} start-recording") frappe.local.cache = {} self.assertEqual(frappe.recorder.status(), True) self.execute("bench --site {site} stop-recording") frappe.local.cache = {} self.assertEqual(frappe.recorder.status(), False) def test_remove_from_installed_apps(self): app = "test_remove_app" add_to_installed_apps(app) # check: confirm that add_to_installed_apps added the app in the default self.execute("bench --site {site} list-apps") self.assertIn(app, self.stdout) # test 1: remove app from installed_apps global default self.execute("bench --site {site} remove-from-installed-apps {app}", {"app": app}) self.assertEqual(self.returncode, 0) self.execute("bench --site {site} list-apps") self.assertNotIn(app, self.stdout) def test_list_apps(self): # test 1: sanity check for command self.execute("bench --site all list-apps") self.assertIsNotNone(self.returncode) self.assertIsInstance(self.stdout or self.stderr, str) # test 2: bare functionality for single site self.execute("bench --site {site} list-apps") self.assertEqual(self.returncode, 0) list_apps = set(_x.split()[0] for _x in self.stdout.split("\n")) doctype = frappe.get_single("Installed Applications").installed_applications if doctype: installed_apps = set(x.app_name for x in doctype) else: installed_apps = set(frappe.get_installed_apps()) self.assertSetEqual(list_apps, installed_apps) # test 3: parse json format self.execute("bench --site {site} list-apps --format json") self.assertEqual(self.returncode, 0) self.assertIsInstance(json.loads(self.stdout), dict) self.execute("bench --site {site} list-apps -f json") self.assertEqual(self.returncode, 0) self.assertIsInstance(json.loads(self.stdout), dict) def test_show_config(self): # test 1: sanity check for command self.execute("bench --site all show-config") self.assertEqual(self.returncode, 0) # test 2: test keys in table text self.execute( "bench --site {site} set-config test_key '{second_order}' --parse", {"second_order": json.dumps({"test_key": "test_value"})}, ) self.execute("bench --site {site} show-config") self.assertEqual(self.returncode, 0) self.assertIn("test_key.test_key", self.stdout.split()) self.assertIn("test_value", self.stdout.split()) # test 3: parse json format self.execute("bench --site all show-config --format json") self.assertEqual(self.returncode, 0) self.assertIsInstance(json.loads(self.stdout), dict) self.execute("bench --site {site} show-config --format json") self.assertIsInstance(json.loads(self.stdout), dict) self.execute("bench --site {site} show-config -f json") self.assertIsInstance(json.loads(self.stdout), dict) def test_get_bench_relative_path(self): bench_path = get_bench_path() test1_path = os.path.join(bench_path, "test1.txt") test2_path = os.path.join(bench_path, "sites", "test2.txt") with open(test1_path, "w+") as test1: test1.write("asdf") with open(test2_path, "w+") as test2: test2.write("asdf") self.assertTrue("test1.txt" in get_bench_relative_path("test1.txt")) self.assertTrue("sites/test2.txt" in get_bench_relative_path("test2.txt")) with self.assertRaises(SystemExit): get_bench_relative_path("test3.txt") os.remove(test1_path) os.remove(test2_path) def test_frappe_site_env(self): os.putenv("FRAPPE_SITE", frappe.local.site) self.execute("bench execute frappe.ping") self.assertEqual(self.returncode, 0) self.assertIn("pong", self.stdout) def test_version(self): self.execute("bench version") self.assertEqual(self.returncode, 0) for output in ["legacy", "plain", "table", "json"]: self.execute(f"bench version -f {output}") self.assertEqual(self.returncode, 0) self.execute("bench version -f invalid") self.assertEqual(self.returncode, 2) def test_set_password(self): from frappe.utils.password import check_password self.execute("bench --site {site} set-password Administrator test1") self.assertEqual(self.returncode, 0) self.assertEqual(check_password("Administrator", "test1"), "Administrator") # to release the lock taken by check_password frappe.db.commit() self.execute("bench --site {site} set-admin-password test2") self.assertEqual(self.returncode, 0) self.assertEqual(check_password("Administrator", "test2"), "Administrator") @skipIf( not ( frappe.conf.root_password and frappe.conf.admin_password and frappe.conf.db_type == "mariadb" ), "DB Root password and Admin password not set in config", ) def test_bench_drop_site_should_archive_site(self): # TODO: Make this test postgres compatible site = TEST_SITE self.execute( f"bench new-site {site} --force --verbose " f"--admin-password {frappe.conf.admin_password} " f"--mariadb-root-password {frappe.conf.root_password} " f"--db-type {frappe.conf.db_type or 'mariadb'} " ) self.assertEqual(self.returncode, 0) self.execute(f"bench drop-site {site} --force --root-password {frappe.conf.root_password}") self.assertEqual(self.returncode, 0) bench_path = get_bench_path() site_directory = os.path.join(bench_path, f"sites/{site}") self.assertFalse(os.path.exists(site_directory)) archive_directory = os.path.join(bench_path, f"archived/sites/{site}") self.assertTrue(os.path.exists(archive_directory)) class TestBackups(BaseTestCommands): backup_map = { "includes": { "includes": [ "ToDo", "Note", ] }, "excludes": {"excludes": ["Activity Log", "Access Log", "Error Log"]}, } home = os.path.expanduser("~") site_backup_path = frappe.utils.get_site_path("private", "backups") def setUp(self): self.files_to_trash = [] def tearDown(self): if self._testMethodName == "test_backup": for file in self.files_to_trash: os.remove(file) try: os.rmdir(os.path.dirname(file)) except OSError: pass def test_backup_no_options(self): """Take a backup without any options""" before_backup = fetch_latest_backups(partial=True) self.execute("bench --site {site} backup") after_backup = fetch_latest_backups(partial=True) self.assertEqual(self.returncode, 0) self.assertIn("successfully completed", self.stdout) self.assertNotEqual(before_backup["database"], after_backup["database"]) def test_backup_with_files(self): """Take a backup with files (--with-files)""" before_backup = fetch_latest_backups() self.execute("bench --site {site} backup --with-files") after_backup = fetch_latest_backups() self.assertEqual(self.returncode, 0) self.assertIn("successfully completed", self.stdout) self.assertIn("with files", self.stdout) self.assertNotEqual(before_backup, after_backup) self.assertIsNotNone(after_backup["public"]) self.assertIsNotNone(after_backup["private"]) def test_backup_with_custom_path(self): """Backup to a custom path (--backup-path)""" backup_path = os.path.join(self.home, "backups") self.execute( "bench --site {site} backup --backup-path {backup_path}", {"backup_path": backup_path} ) self.assertEqual(self.returncode, 0) self.assertTrue(os.path.exists(backup_path)) self.assertGreaterEqual(len(os.listdir(backup_path)), 2) def test_backup_with_different_file_paths(self): """Backup with different file paths (--backup-path-db, --backup-path-files, --backup-path-private-files, --backup-path-conf)""" kwargs = { key: os.path.join(self.home, key, value) for key, value in { "db_path": "database.sql.gz", "files_path": "public.tar", "private_path": "private.tar", "conf_path": "config.json", }.items() } self.execute( """bench --site {site} backup --with-files --backup-path-db {db_path} --backup-path-files {files_path} --backup-path-private-files {private_path} --backup-path-conf {conf_path}""", kwargs, ) self.assertEqual(self.returncode, 0) for path in kwargs.values(): self.assertTrue(os.path.exists(path)) def test_backup_compress_files(self): """Take a compressed backup (--compress)""" self.execute("bench --site {site} backup --with-files --compress") self.assertEqual(self.returncode, 0) compressed_files = glob(f"{self.site_backup_path}/*.tgz") self.assertGreater(len(compressed_files), 0) def test_backup_verbose(self): """Take a verbose backup (--verbose)""" self.execute("bench --site {site} backup --verbose") self.assertEqual(self.returncode, 0) def test_backup_only_specific_doctypes(self): """Take a backup with (include) backup options set in the site config `frappe.conf.backup.includes`""" self.execute( "bench --site {site} set-config backup '{includes}' --parse", {"includes": json.dumps(self.backup_map["includes"])}, ) self.execute("bench --site {site} backup --verbose") self.assertEqual(self.returncode, 0) database = fetch_latest_backups(partial=True)["database"] self.assertEqual([], missing_in_backup(self.backup_map["includes"]["includes"], database)) def test_backup_excluding_specific_doctypes(self): """Take a backup with (exclude) backup options set (`frappe.conf.backup.excludes`, `--exclude`)""" # test 1: take a backup with frappe.conf.backup.excludes self.execute( "bench --site {site} set-config backup '{excludes}' --parse", {"excludes": json.dumps(self.backup_map["excludes"])}, ) self.execute("bench --site {site} backup --verbose") self.assertEqual(self.returncode, 0) database = fetch_latest_backups(partial=True)["database"] self.assertFalse(exists_in_backup(self.backup_map["excludes"]["excludes"], database)) self.assertEqual([], missing_in_backup(self.backup_map["includes"]["includes"], database)) # test 2: take a backup with --exclude self.execute( "bench --site {site} backup --exclude '{exclude}'", {"exclude": ",".join(self.backup_map["excludes"]["excludes"])}, ) self.assertEqual(self.returncode, 0) database = fetch_latest_backups(partial=True)["database"] self.assertFalse(exists_in_backup(self.backup_map["excludes"]["excludes"], database)) def test_selective_backup_priority_resolution(self): """Take a backup with conflicting backup options set (`frappe.conf.excludes`, `--include`)""" self.execute( "bench --site {site} backup --include '{include}'", {"include": ",".join(self.backup_map["includes"]["includes"])}, ) self.assertEqual(self.returncode, 0) database = fetch_latest_backups(partial=True)["database"] self.assertEqual([], missing_in_backup(self.backup_map["includes"]["includes"], database)) def test_dont_backup_conf(self): """Take a backup ignoring frappe.conf.backup settings (with --ignore-backup-conf option)""" self.execute("bench --site {site} backup --ignore-backup-conf") self.assertEqual(self.returncode, 0) database = fetch_latest_backups()["database"] self.assertEqual([], missing_in_backup(self.backup_map["excludes"]["excludes"], database)) class TestRemoveApp(unittest.TestCase): def test_delete_modules(self): from frappe.installer import ( _delete_doctypes, _delete_modules, _get_module_linked_doctype_field_map, ) test_module = frappe.new_doc("Module Def") test_module.update({"module_name": "RemoveThis", "app_name": "frappe"}) test_module.save() module_def_linked_doctype = frappe.get_doc( { "doctype": "DocType", "name": "Doctype linked with module def", "module": "RemoveThis", "custom": 1, "fields": [ {"label": "Modulen't", "fieldname": "notmodule", "fieldtype": "Link", "options": "Module Def"} ], } ).insert() doctype_to_link_field_map = _get_module_linked_doctype_field_map() self.assertIn("Report", doctype_to_link_field_map) self.assertIn(module_def_linked_doctype.name, doctype_to_link_field_map) self.assertEqual(doctype_to_link_field_map[module_def_linked_doctype.name], "notmodule") self.assertNotIn("DocType", doctype_to_link_field_map) doctypes_to_delete = _delete_modules([test_module.module_name], dry_run=False) self.assertEqual(len(doctypes_to_delete), 1) _delete_doctypes(doctypes_to_delete, dry_run=False) self.assertFalse(frappe.db.exists("Module Def", test_module.module_name)) self.assertFalse(frappe.db.exists("DocType", module_def_linked_doctype.name)) def test_dry_run(self): """Check if dry run in not destructive.""" # nothing to assert, if this fails rest of the test suite will crumble. remove_app("frappe", dry_run=True, yes=True, no_backup=True) class TestSiteMigration(BaseTestCommands): def test_migrate_cli(self): with cli(frappe.commands.site.migrate) as result: self.assertTrue(TEST_SITE in result.stdout) self.assertEqual(result.exit_code, 0) self.assertEqual(result.exception, None) class TestBenchBuild(BaseTestCommands): def test_build_assets(self): with cli(frappe.commands.utils.build) as result: self.assertEqual(result.exit_code, 0) self.assertEqual(result.exception, None)