feat: support dequeuing strategies for worker

This commit is contained in:
Ankush Menat 2022-11-24 16:27:21 +05:30
parent aece93fbc5
commit a8bf86ef75
3 changed files with 29 additions and 6 deletions

View file

@ -187,11 +187,25 @@ def start_scheduler():
@click.option("-u", "--rq-username", default=None, help="Redis ACL user")
@click.option("-p", "--rq-password", default=None, help="Redis ACL user password")
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
def start_worker(queue, quiet=False, rq_username=None, rq_password=None, burst=False):
"""Site is used to find redis credentials."""
@click.option(
"--strategy",
required=False,
type=click.Choice(["round_robbin", "random"]),
help="dequeuing strategy to use.",
)
def start_worker(
queue, quiet=False, rq_username=None, rq_password=None, burst=False, strategy=None
):
from frappe.utils.background_jobs import start_worker
start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password, burst=burst)
start_worker(
queue,
quiet=quiet,
rq_username=rq_username,
rq_password=rq_password,
burst=burst,
strategy=strategy,
)
@click.command("ready-for-migration")

View file

@ -1119,7 +1119,7 @@ def build_search_index(context):
@click.command("clear-log-table")
@click.option("--doctype", default="text", type=click.Choice(LOG_DOCTYPES), help="Log DocType")
@click.option("--doctype", required=True, type=click.Choice(LOG_DOCTYPES), help="Log DocType")
@click.option("--days", type=int, help="Keep records for days")
@click.option("--no-backup", is_flag=True, default=False, help="Do not backup the table")
@pass_context

View file

@ -3,13 +3,14 @@ import socket
import time
from collections import defaultdict
from functools import lru_cache
from typing import TYPE_CHECKING, Any, NoReturn, Union
from typing import TYPE_CHECKING, Any, Literal, NoReturn, Union
from uuid import uuid4
import redis
from redis.exceptions import BusyLoadingError, ConnectionError
from rq import Connection, Queue, Worker
from rq.logutils import setup_loghandlers
from rq.worker import RandomWorker, RoundRobinWorker
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
import frappe
@ -217,8 +218,11 @@ def start_worker(
rq_username: str | None = None,
rq_password: str | None = None,
burst: bool = False,
strategy: Literal["round_robbin", "random"] | None = None,
) -> NoReturn | None:
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
DEQUEUE_STRATEGIES = {"round_robbin": RoundRobinWorker, "random": RandomWorker}
with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
redis_connection = get_redis_conn(username=rq_username, password=rq_password)
@ -231,11 +235,16 @@ def start_worker(
if os.environ.get("CI"):
setup_loghandlers("ERROR")
WorkerKlass = Worker
if strategy and strategy in DEQUEUE_STRATEGIES:
WorkerKlass = DEQUEUE_STRATEGIES[strategy]
with Connection(redis_connection):
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level, burst=burst)
worker = WorkerKlass(queues, name=get_worker_name(queue_name))
worker.work(logging_level=logging_level, burst=burst)
def get_worker_name(queue):