From 7fbc6e8175da239a230428d936b2af7f58ceeff9 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 Jun 2023 17:42:18 +0530 Subject: [PATCH] refactor: Simplify dequeue_strategy selection Classes arent required anymore, it can just be a parm to worker class isntead. --- frappe/utils/background_jobs.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index b2a3fbab24..a713163a7b 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -4,7 +4,7 @@ import socket import time from collections import defaultdict from functools import lru_cache -from typing import Any, Callable, Literal, NoReturn +from typing import Any, Callable, NoReturn from uuid import uuid4 import redis @@ -13,7 +13,7 @@ from rq import Queue, Worker from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import RandomWorker, RoundRobinWorker +from rq.worker import DequeueStrategy from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import frappe @@ -230,10 +230,12 @@ def start_worker( rq_username: str | None = None, rq_password: str | None = None, burst: bool = False, - strategy: Literal["round_robin", "random"] | None = None, + strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT, ) -> NoReturn | None: # pragma: no cover """Wrapper to start rq worker. Connects to redis and monitors these queues.""" - DEQUEUE_STRATEGIES = {"round_robin": RoundRobinWorker, "random": RandomWorker} + + if not strategy: + strategy = DequeueStrategy.DEFAULT if frappe._tune_gc: gc.collect() @@ -251,17 +253,17 @@ def start_worker( if os.environ.get("CI"): setup_loghandlers("ERROR") - WorkerKlass = DEQUEUE_STRATEGIES.get(strategy, Worker) - logging_level = "INFO" if quiet: logging_level = "WARNING" - worker = WorkerKlass(queues, name=get_worker_name(queue_name), connection=redis_connection) + + worker = Worker(queues, name=get_worker_name(queue_name), connection=redis_connection) worker.work( logging_level=logging_level, burst=burst, date_format="%Y-%m-%d %H:%M:%S", log_format="%(asctime)s,%(msecs)03d %(message)s", + dequeue_strategy=strategy, )