fix(CliRunner): Test suite for Cli

* Manage states
* Add formatting on test failures
* Patch pass_context
This commit is contained in:
Gavin D'souza 2022-02-04 12:47:58 +05:30
parent d18f4e717f
commit f343e77ecf

View file

@ -13,26 +13,29 @@ import unittest
from contextlib import contextmanager
from functools import wraps
from glob import glob
from typing import List
from typing import List, Optional
from unittest.case import skipIf
from unittest.mock import patch
# imports - third party imports
import click
from click.testing import CliRunner
from click.testing import CliRunner, Result
from click import Command
# imports - module imports
import frappe
import frappe.commands.site
import frappe.commands.utils
import frappe.recorder
from frappe.installer import add_to_installed_apps, remove_app
from frappe.utils import add_to_date, get_bench_path, get_bench_relative_path, now
from frappe.utils.backups import fetch_latest_backups
TEST_SITE = "commands-site.test"
_result: Optional[Result] = None
TEST_SITE = "commands-site-O4PN2QKA.test" # added random string tag to avoid collisions
CLI_CONTEXT = frappe._dict(sites=[TEST_SITE])
def clean(value) -> str:
"""Strips and converts bytes to str
@ -85,22 +88,20 @@ def exists_in_backup(doctypes: List, file: os.PathLike) -> bool:
return len(missing_doctypes) == 0
def restore_locals(f):
@wraps(f)
def decorated_function(*args, **kwargs):
pre_site = frappe.local.site
pre_flags = frappe.local.flags.copy()
pre_db = frappe.local.db
ret = f(*args, **kwargs)
@contextmanager
def maintain_locals():
pre_site = frappe.local.site
pre_flags = frappe.local.flags.copy()
pre_db = frappe.local.db
try:
yield
finally:
post_site = getattr(frappe.local, "site", None)
if not post_site or post_site != pre_site:
frappe.init(site=pre_site)
frappe.local.db = pre_db
frappe.local.flags = pre_flags
return ret
return decorated_function
frappe.local.flags.update(pre_flags)
def pass_test_context(f):
@ -111,22 +112,36 @@ def pass_test_context(f):
@contextmanager
def cli(cmd: Command):
patch_ctx = patch("frappe.commands.pass_context", pass_test_context)
_module = cmd.callback.__module__
_cmd = cmd.callback.__qualname__
def cli(cmd: Command, args: Optional[List] = None):
with maintain_locals():
global _result
patch_ctx.start()
importlib.reload(frappe.get_module(_module))
click_cmd = frappe.get_attr(f"{_module}.{_cmd}")
patch_ctx = patch("frappe.commands.pass_context", pass_test_context)
_module = cmd.callback.__module__
_cmd = cmd.callback.__qualname__
try:
yield CliRunner().invoke(click_cmd)
finally:
patch_ctx.stop()
__module = importlib.import_module(_module)
patch_ctx.start()
importlib.reload(__module)
click_cmd = getattr(__module, _cmd)
try:
_result = CliRunner().invoke(click_cmd, args=args)
_result.command = str(cmd)
yield _result
finally:
patch_ctx.stop()
__module = importlib.import_module(_module)
importlib.reload(__module)
importlib.invalidate_caches()
class BaseTestCommands(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.setup_test_site()
return super().setUpClass()
@classmethod
def execute(self, command, kwargs=None):
site = {"site": frappe.local.site}
@ -153,16 +168,48 @@ class BaseTestCommands(unittest.TestCase):
self.stderr = clean(self._proc.stderr)
self.returncode = clean(self._proc.returncode)
@classmethod
def setup_test_site(cls):
cmd_config = {
"test_site": TEST_SITE,
"admin_password": frappe.conf.admin_password,
"root_login": frappe.conf.root_login,
"root_password": frappe.conf.root_password,
"db_type": frappe.conf.db_type,
}
if not os.path.exists(
os.path.join(TEST_SITE, "site_config.json")
):
cls.execute(
"bench new-site {test_site} --admin-password {admin_password} --db-type"
" {db_type}",
cmd_config,
)
def _formatMessage(self, msg, standardMsg):
output = super(BaseTestCommands, self)._formatMessage(msg, standardMsg)
if not hasattr(self, "command") and _result:
command = _result.command
stdout = _result.stdout_bytes.decode() if _result.stdout_bytes else None
stderr = _result.stderr_bytes.decode() if _result.stderr_bytes else None
returncode = _result.exit_code
else:
command = self.command
stdout = self.stdout
stderr = self.stderr
returncode = self.returncode
cmd_execution_summary = "\n".join([
"-" * 70,
"Last Command Execution Summary:",
"Command: {}".format(self.command) if self.command else "",
"Standard Output: {}".format(self.stdout) if self.stdout else "",
"Standard Error: {}".format(self.stderr) if self.stderr else "",
"Return Code: {}".format(self.returncode) if self.returncode else "",
"Command: {}".format(command) if command else "",
"Standard Output: {}".format(stdout) if stdout else "",
"Standard Error: {}".format(stderr) if stderr else "",
"Return Code: {}".format(returncode) if returncode else "",
]).strip()
return "{}\n\n{}".format(output, cmd_execution_summary)
@ -198,11 +245,6 @@ class TestCommands(BaseTestCommands):
for key, value in global_config.items():
if value:
self.execute(f"bench set-config {key} {value} -g")
self.execute(
"bench new-site {test_site} --admin-password {admin_password} --db-type"
" {db_type}",
site_data,
)
# test 1: bench restore from full backup
self.execute("bench --site {test_site} backup --ignore-backup-conf", site_data)
@ -409,7 +451,7 @@ class TestCommands(BaseTestCommands):
)
def test_bench_drop_site_should_archive_site(self):
# TODO: Make this test postgres compatible
site = 'test_site.localhost'
site = TEST_SITE
self.execute(
f"bench new-site {site} --force --verbose "
@ -639,27 +681,6 @@ class TestRemoveApp(unittest.TestCase):
class TestSiteMigration(BaseTestCommands):
@classmethod
def setUpClass(cls) -> None:
cmd_config = {
"test_site": TEST_SITE,
"admin_password": frappe.conf.admin_password,
"root_login": frappe.conf.root_login,
"root_password": frappe.conf.root_password,
"db_type": frappe.conf.db_type,
}
if not os.path.exists(
os.path.join(TEST_SITE, "site_config.json")
):
cls.execute(
"bench new-site {test_site} --admin-password {admin_password} --db-type"
" {db_type}",
cmd_config,
)
return super().setUpClass()
@restore_locals
def test_migrate_cli(self):
with cli(frappe.commands.site.migrate) as result:
self.assertTrue(TEST_SITE in result.stdout)