refactor: Simplify dequeue_strategy selection

Classes arent required anymore, it can just be a parm to worker class
isntead.
This commit is contained in:
Ankush Menat 2023-06-26 17:42:18 +05:30
parent ca95b591ae
commit 7fbc6e8175

View file

@ -4,7 +4,7 @@ import socket
import time
from collections import defaultdict
from functools import lru_cache
from typing import Any, Callable, Literal, NoReturn
from typing import Any, Callable, NoReturn
from uuid import uuid4
import redis
@ -13,7 +13,7 @@ from rq import Queue, Worker
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.logutils import setup_loghandlers
from rq.worker import RandomWorker, RoundRobinWorker
from rq.worker import DequeueStrategy
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
import frappe
@ -230,10 +230,12 @@ def start_worker(
rq_username: str | None = None,
rq_password: str | None = None,
burst: bool = False,
strategy: Literal["round_robin", "random"] | None = None,
strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT,
) -> NoReturn | None: # pragma: no cover
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
DEQUEUE_STRATEGIES = {"round_robin": RoundRobinWorker, "random": RandomWorker}
if not strategy:
strategy = DequeueStrategy.DEFAULT
if frappe._tune_gc:
gc.collect()
@ -251,17 +253,17 @@ def start_worker(
if os.environ.get("CI"):
setup_loghandlers("ERROR")
WorkerKlass = DEQUEUE_STRATEGIES.get(strategy, Worker)
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
worker = WorkerKlass(queues, name=get_worker_name(queue_name), connection=redis_connection)
worker = Worker(queues, name=get_worker_name(queue_name), connection=redis_connection)
worker.work(
logging_level=logging_level,
burst=burst,
date_format="%Y-%m-%d %H:%M:%S",
log_format="%(asctime)s,%(msecs)03d %(message)s",
dequeue_strategy=strategy,
)