From ca95b591ae8581a48dfc28f3560670da36f8da0e Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 Jun 2023 17:36:53 +0530 Subject: [PATCH 1/3] refactor: Pass redis connection directly --- frappe/utils/background_jobs.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 6b0249e720..b2a3fbab24 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -9,7 +9,7 @@ from uuid import uuid4 import redis from redis.exceptions import BusyLoadingError, ConnectionError -from rq import Connection, Queue, Worker +from rq import Queue, Worker from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers @@ -253,17 +253,16 @@ def start_worker( WorkerKlass = DEQUEUE_STRATEGIES.get(strategy, Worker) - with Connection(redis_connection): - logging_level = "INFO" - if quiet: - logging_level = "WARNING" - worker = WorkerKlass(queues, name=get_worker_name(queue_name)) - worker.work( - logging_level=logging_level, - burst=burst, - date_format="%Y-%m-%d %H:%M:%S", - log_format="%(asctime)s,%(msecs)03d %(message)s", - ) + logging_level = "INFO" + if quiet: + logging_level = "WARNING" + worker = WorkerKlass(queues, name=get_worker_name(queue_name), connection=redis_connection) + worker.work( + logging_level=logging_level, + burst=burst, + date_format="%Y-%m-%d %H:%M:%S", + log_format="%(asctime)s,%(msecs)03d %(message)s", + ) def get_worker_name(queue): From 7fbc6e8175da239a230428d936b2af7f58ceeff9 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 Jun 2023 17:42:18 +0530 Subject: [PATCH 2/3] refactor: Simplify dequeue_strategy selection Classes arent required anymore, it can just be a parm to worker class isntead. --- frappe/utils/background_jobs.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index b2a3fbab24..a713163a7b 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -4,7 +4,7 @@ import socket import time from collections import defaultdict from functools import lru_cache -from typing import Any, Callable, Literal, NoReturn +from typing import Any, Callable, NoReturn from uuid import uuid4 import redis @@ -13,7 +13,7 @@ from rq import Queue, Worker from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import RandomWorker, RoundRobinWorker +from rq.worker import DequeueStrategy from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import frappe @@ -230,10 +230,12 @@ def start_worker( rq_username: str | None = None, rq_password: str | None = None, burst: bool = False, - strategy: Literal["round_robin", "random"] | None = None, + strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT, ) -> NoReturn | None: # pragma: no cover """Wrapper to start rq worker. Connects to redis and monitors these queues.""" - DEQUEUE_STRATEGIES = {"round_robin": RoundRobinWorker, "random": RandomWorker} + + if not strategy: + strategy = DequeueStrategy.DEFAULT if frappe._tune_gc: gc.collect() @@ -251,17 +253,17 @@ def start_worker( if os.environ.get("CI"): setup_loghandlers("ERROR") - WorkerKlass = DEQUEUE_STRATEGIES.get(strategy, Worker) - logging_level = "INFO" if quiet: logging_level = "WARNING" - worker = WorkerKlass(queues, name=get_worker_name(queue_name), connection=redis_connection) + + worker = Worker(queues, name=get_worker_name(queue_name), connection=redis_connection) worker.work( logging_level=logging_level, burst=burst, date_format="%Y-%m-%d %H:%M:%S", log_format="%(asctime)s,%(msecs)03d %(message)s", + dequeue_strategy=strategy, ) From 73bca16d77acbdf9c78eb38f0e4132a0518094ff Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 Jun 2023 17:59:28 +0530 Subject: [PATCH 3/3] feat: RQ `WorkerPool` RQ now has experimental support for workerpools. When to use this? Roughly when you have more than 2 workers a workerpool might make sense, below 2 it's overhead as master "pool" process will need to run to manager workerpool itself. Why is it any better? Currently we just let supervisor duplicate the worker process N number of times. This is inefficient from shared memory POV. Forking the original process to create workers enables sharing of more memory thus leading upwards of 60-70% reduction in memory usage with pool size of 8 workers. --- frappe/commands/scheduler.py | 22 +++++++++++ frappe/core/doctype/rq_job/test_rq_job.py | 11 ++++++ frappe/utils/background_jobs.py | 46 +++++++++++++++++++++-- 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index 36fa81f8a5..6af3a2403e 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -215,6 +215,27 @@ def start_worker( ) +@click.command("worker-pool") +@click.option( + "--queue", + type=str, + help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.", +) +@click.option("--num-workers", type=int, default=2, help="Number of workers to spawn in pool.") +@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs") +@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.") +def start_worker_pool(queue, quiet=False, num_workers=2, burst=False): + """Start a backgrond worker""" + from frappe.utils.background_jobs import start_worker_pool + + start_worker_pool( + queue=queue, + quiet=quiet, + burst=burst, + num_workers=num_workers, + ) + + @click.command("ready-for-migration") @click.option("--site", help="site name") @pass_context @@ -251,5 +272,6 @@ commands = [ show_pending_jobs, start_scheduler, start_worker, + start_worker_pool, trigger_scheduler_event, ] diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index c39717cfd8..6512902fb3 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -96,6 +96,17 @@ class TestRQJob(FrappeTestCase): _, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True) self.assertIn("quitting", cstr(stderr)) + @timeout(20) + def test_multi_queue_burst_consumption_worker_pool(self): + for _ in range(3): + for q in ["default", "short"]: + frappe.enqueue(self.BG_JOB, sleep=1, queue=q) + + _, stderr = execute_in_shell( + "bench worker-pool --queue short,default --burst --num-workers=4", check_exit_code=True + ) + self.assertIn("quitting", cstr(stderr)) + @timeout(20) def test_job_id_dedup(self): job_id = "test_dedup" diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index a713163a7b..9008ba00ee 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -14,6 +14,7 @@ from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers from rq.worker import DequeueStrategy +from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import frappe @@ -237,9 +238,7 @@ def start_worker( if not strategy: strategy = DequeueStrategy.DEFAULT - if frappe._tune_gc: - gc.collect() - gc.freeze() + _freeze_gc() with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json @@ -267,6 +266,47 @@ def start_worker( ) +def start_worker_pool( + queue: str | None = None, + num_workers: int = 1, + quiet: bool = False, + burst: bool = False, +) -> NoReturn: + """Start worker pool with specified number of workers. + + WARNING: This feature is considered "EXPERIMENTAL". + """ + + _freeze_gc() + + with frappe.init_site(): + redis_connection = get_redis_conn() + + if queue: + queue = [q.strip() for q in queue.split(",")] + queues = get_queue_list(queue, build_queue_name=True) + + if os.environ.get("CI"): + setup_loghandlers("ERROR") + + logging_level = "INFO" + if quiet: + logging_level = "WARNING" + + pool = WorkerPool( + queues=queues, + connection=redis_connection, + num_workers=num_workers, + ) + pool.start(logging_level=logging_level, burst=burst) + + +def _freeze_gc(): + if frappe._tune_gc: + gc.collect() + gc.freeze() + + def get_worker_name(queue): """When limiting worker to a specific queue, also append queue name to default worker name""" name = None