diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index d998cc85fe..be16450c54 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -186,11 +186,12 @@ def start_scheduler(): @click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs") @click.option("-u", "--rq-username", default=None, help="Redis ACL user") @click.option("-p", "--rq-password", default=None, help="Redis ACL user password") -def start_worker(queue, quiet=False, rq_username=None, rq_password=None): +@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.") +def start_worker(queue, quiet=False, rq_username=None, rq_password=None, burst=False): """Site is used to find redis credentials.""" from frappe.utils.background_jobs import start_worker - start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password) + start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password, burst=burst) @click.command("ready-for-migration") diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index 460aa08941..654cdcd21f 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -10,6 +10,7 @@ from rq.job import Job import frappe from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job from frappe.tests.utils import FrappeTestCase, timeout +from frappe.utils import cstr, execute_in_shell from frappe.utils.background_jobs import is_job_queued @@ -92,6 +93,15 @@ class TestRQJob(FrappeTestCase): self.check_status(actual_job, "finished") self.assertFalse(is_job_queued(job_name)) + @timeout(20) + def test_multi_queue_burst_consumption(self): + for _ in range(3): + for q in ["default", "short"]: + frappe.enqueue(self.BG_JOB, sleep=1, queue=q) + + _, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True) + self.assertIn("quitting", cstr(stderr)) + def test_func(fail=False, sleep=0): if fail: diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 873a91dd14..1457a821ec 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -216,7 +216,8 @@ def start_worker( quiet: bool = False, rq_username: str | None = None, rq_password: str | None = None, -) -> NoReturn: + burst: bool = False, +) -> NoReturn | None: """Wrapper to start rq worker. Connects to redis and monitors these queues.""" with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json @@ -234,7 +235,7 @@ def start_worker( logging_level = "INFO" if quiet: logging_level = "WARNING" - Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level) + Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level, burst=burst) def get_worker_name(queue):