feat: burst mode in workers

https://python-rq.org/docs/workers/#burst-mode
This commit is contained in:
Ankush Menat 2022-11-24 16:02:01 +05:30
parent 0ebd3945ff
commit aece93fbc5
3 changed files with 16 additions and 4 deletions

View file

@ -186,11 +186,12 @@ def start_scheduler():
@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs")
@click.option("-u", "--rq-username", default=None, help="Redis ACL user")
@click.option("-p", "--rq-password", default=None, help="Redis ACL user password")
def start_worker(queue, quiet=False, rq_username=None, rq_password=None):
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
def start_worker(queue, quiet=False, rq_username=None, rq_password=None, burst=False):
"""Site is used to find redis credentials."""
from frappe.utils.background_jobs import start_worker
start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password)
start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password, burst=burst)
@click.command("ready-for-migration")

View file

@ -10,6 +10,7 @@ from rq.job import Job
import frappe
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job
from frappe.tests.utils import FrappeTestCase, timeout
from frappe.utils import cstr, execute_in_shell
from frappe.utils.background_jobs import is_job_queued
@ -92,6 +93,15 @@ class TestRQJob(FrappeTestCase):
self.check_status(actual_job, "finished")
self.assertFalse(is_job_queued(job_name))
@timeout(20)
def test_multi_queue_burst_consumption(self):
for _ in range(3):
for q in ["default", "short"]:
frappe.enqueue(self.BG_JOB, sleep=1, queue=q)
_, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True)
self.assertIn("quitting", cstr(stderr))
def test_func(fail=False, sleep=0):
if fail:

View file

@ -216,7 +216,8 @@ def start_worker(
quiet: bool = False,
rq_username: str | None = None,
rq_password: str | None = None,
) -> NoReturn:
burst: bool = False,
) -> NoReturn | None:
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
@ -234,7 +235,7 @@ def start_worker(
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level)
Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level, burst=burst)
def get_worker_name(queue):