feat: RQ WorkerPool

RQ now has experimental support for workerpools.

When to use this?

Roughly when you have more than 2 workers a workerpool might make
sense, below 2 it's overhead as master "pool" process will need to run
to manager workerpool itself.

Why is it any better?

Currently we just let supervisor duplicate the worker process N number
of times. This is inefficient from shared memory POV. Forking the
original process to create workers enables sharing of more memory thus
leading upwards of 60-70% reduction in memory usage with pool size of 8
workers.
This commit is contained in:
Ankush Menat 2023-06-26 17:59:28 +05:30
parent 7fbc6e8175
commit 73bca16d77
3 changed files with 76 additions and 3 deletions

View file

@ -215,6 +215,27 @@ def start_worker(
)
@click.command("worker-pool")
@click.option(
"--queue",
type=str,
help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.",
)
@click.option("--num-workers", type=int, default=2, help="Number of workers to spawn in pool.")
@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs")
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
def start_worker_pool(queue, quiet=False, num_workers=2, burst=False):
"""Start a backgrond worker"""
from frappe.utils.background_jobs import start_worker_pool
start_worker_pool(
queue=queue,
quiet=quiet,
burst=burst,
num_workers=num_workers,
)
@click.command("ready-for-migration")
@click.option("--site", help="site name")
@pass_context
@ -251,5 +272,6 @@ commands = [
show_pending_jobs,
start_scheduler,
start_worker,
start_worker_pool,
trigger_scheduler_event,
]

View file

@ -96,6 +96,17 @@ class TestRQJob(FrappeTestCase):
_, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True)
self.assertIn("quitting", cstr(stderr))
@timeout(20)
def test_multi_queue_burst_consumption_worker_pool(self):
for _ in range(3):
for q in ["default", "short"]:
frappe.enqueue(self.BG_JOB, sleep=1, queue=q)
_, stderr = execute_in_shell(
"bench worker-pool --queue short,default --burst --num-workers=4", check_exit_code=True
)
self.assertIn("quitting", cstr(stderr))
@timeout(20)
def test_job_id_dedup(self):
job_id = "test_dedup"

View file

@ -14,6 +14,7 @@ from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.logutils import setup_loghandlers
from rq.worker import DequeueStrategy
from rq.worker_pool import WorkerPool
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
import frappe
@ -237,9 +238,7 @@ def start_worker(
if not strategy:
strategy = DequeueStrategy.DEFAULT
if frappe._tune_gc:
gc.collect()
gc.freeze()
_freeze_gc()
with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
@ -267,6 +266,47 @@ def start_worker(
)
def start_worker_pool(
queue: str | None = None,
num_workers: int = 1,
quiet: bool = False,
burst: bool = False,
) -> NoReturn:
"""Start worker pool with specified number of workers.
WARNING: This feature is considered "EXPERIMENTAL".
"""
_freeze_gc()
with frappe.init_site():
redis_connection = get_redis_conn()
if queue:
queue = [q.strip() for q in queue.split(",")]
queues = get_queue_list(queue, build_queue_name=True)
if os.environ.get("CI"):
setup_loghandlers("ERROR")
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
pool = WorkerPool(
queues=queues,
connection=redis_connection,
num_workers=num_workers,
)
pool.start(logging_level=logging_level, burst=burst)
def _freeze_gc():
if frappe._tune_gc:
gc.collect()
gc.freeze()
def get_worker_name(queue):
"""When limiting worker to a specific queue, also append queue name to default worker name"""
name = None