From 73bca16d77acbdf9c78eb38f0e4132a0518094ff Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 Jun 2023 17:59:28 +0530 Subject: [PATCH] feat: RQ `WorkerPool` RQ now has experimental support for workerpools. When to use this? Roughly when you have more than 2 workers a workerpool might make sense, below 2 it's overhead as master "pool" process will need to run to manager workerpool itself. Why is it any better? Currently we just let supervisor duplicate the worker process N number of times. This is inefficient from shared memory POV. Forking the original process to create workers enables sharing of more memory thus leading upwards of 60-70% reduction in memory usage with pool size of 8 workers. --- frappe/commands/scheduler.py | 22 +++++++++++ frappe/core/doctype/rq_job/test_rq_job.py | 11 ++++++ frappe/utils/background_jobs.py | 46 +++++++++++++++++++++-- 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index 36fa81f8a5..6af3a2403e 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -215,6 +215,27 @@ def start_worker( ) +@click.command("worker-pool") +@click.option( + "--queue", + type=str, + help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.", +) +@click.option("--num-workers", type=int, default=2, help="Number of workers to spawn in pool.") +@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs") +@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.") +def start_worker_pool(queue, quiet=False, num_workers=2, burst=False): + """Start a backgrond worker""" + from frappe.utils.background_jobs import start_worker_pool + + start_worker_pool( + queue=queue, + quiet=quiet, + burst=burst, + num_workers=num_workers, + ) + + @click.command("ready-for-migration") @click.option("--site", help="site name") @pass_context @@ -251,5 +272,6 @@ commands = [ show_pending_jobs, start_scheduler, start_worker, + start_worker_pool, trigger_scheduler_event, ] diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index c39717cfd8..6512902fb3 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -96,6 +96,17 @@ class TestRQJob(FrappeTestCase): _, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True) self.assertIn("quitting", cstr(stderr)) + @timeout(20) + def test_multi_queue_burst_consumption_worker_pool(self): + for _ in range(3): + for q in ["default", "short"]: + frappe.enqueue(self.BG_JOB, sleep=1, queue=q) + + _, stderr = execute_in_shell( + "bench worker-pool --queue short,default --burst --num-workers=4", check_exit_code=True + ) + self.assertIn("quitting", cstr(stderr)) + @timeout(20) def test_job_id_dedup(self): job_id = "test_dedup" diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index a713163a7b..9008ba00ee 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -14,6 +14,7 @@ from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers from rq.worker import DequeueStrategy +from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import frappe @@ -237,9 +238,7 @@ def start_worker( if not strategy: strategy = DequeueStrategy.DEFAULT - if frappe._tune_gc: - gc.collect() - gc.freeze() + _freeze_gc() with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json @@ -267,6 +266,47 @@ def start_worker( ) +def start_worker_pool( + queue: str | None = None, + num_workers: int = 1, + quiet: bool = False, + burst: bool = False, +) -> NoReturn: + """Start worker pool with specified number of workers. + + WARNING: This feature is considered "EXPERIMENTAL". + """ + + _freeze_gc() + + with frappe.init_site(): + redis_connection = get_redis_conn() + + if queue: + queue = [q.strip() for q in queue.split(",")] + queues = get_queue_list(queue, build_queue_name=True) + + if os.environ.get("CI"): + setup_loghandlers("ERROR") + + logging_level = "INFO" + if quiet: + logging_level = "WARNING" + + pool = WorkerPool( + queues=queues, + connection=redis_connection, + num_workers=num_workers, + ) + pool.start(logging_level=logging_level, burst=burst) + + +def _freeze_gc(): + if frappe._tune_gc: + gc.collect() + gc.freeze() + + def get_worker_name(queue): """When limiting worker to a specific queue, also append queue name to default worker name""" name = None