diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index 6af3a2403e..5d453e3568 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -240,14 +240,14 @@ def start_worker_pool(queue, quiet=False, num_workers=2, burst=False): @click.option("--site", help="site name") @pass_context def ready_for_migration(context, site=None): - from frappe.utils.doctor import get_pending_jobs + from frappe.utils.doctor import any_job_pending if not site: site = get_site(context) try: frappe.init(site=site) - pending_jobs = get_pending_jobs(site=site) + pending_jobs = any_job_pending(site=site) if pending_jobs: print(f"NOT READY for migration: site {site} has pending background jobs") diff --git a/frappe/tests/test_commands.py b/frappe/tests/test_commands.py index 7bda577bb6..cdf93a1870 100644 --- a/frappe/tests/test_commands.py +++ b/frappe/tests/test_commands.py @@ -20,9 +20,11 @@ from unittest.mock import patch import click from click import Command from click.testing import CliRunner, Result +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed # imports - module imports import frappe +import frappe.commands.scheduler import frappe.commands.site import frappe.commands.utils import frappe.recorder @@ -760,6 +762,19 @@ class TestBenchBuild(BaseTestCommands): ) +class TestSchedulerUtils(BaseTestCommands): + # Retry just in case there are stuck queued jobs + @retry( + retry=retry_if_exception_type(AssertionError), + stop=stop_after_attempt(3), + wait=wait_fixed(3), + reraise=True, + ) + def test_ready_for_migrate(self): + with cli(frappe.commands.scheduler.ready_for_migration) as result: + self.assertEqual(result.exit_code, 0) + + class TestCommandUtils(FrappeTestCase): def test_bench_helper(self): from frappe.utils.bench_helper import get_app_groups diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index 5b12a01990..002fd8b154 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -79,6 +79,15 @@ def get_pending_jobs(site=None): return jobs_per_queue +def any_job_pending(site: str) -> bool: + for queue in get_queue_list(): + q = get_queue(queue) + for job_id in q.get_job_ids(): + if job_id.startswith(site): + return True + return False + + def check_number_of_workers(): return len(get_workers())