diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index be16450c54..ad1635cb57 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -187,11 +187,25 @@ def start_scheduler(): @click.option("-u", "--rq-username", default=None, help="Redis ACL user") @click.option("-p", "--rq-password", default=None, help="Redis ACL user password") @click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.") -def start_worker(queue, quiet=False, rq_username=None, rq_password=None, burst=False): - """Site is used to find redis credentials.""" +@click.option( + "--strategy", + required=False, + type=click.Choice(["round_robbin", "random"]), + help="dequeuing strategy to use.", +) +def start_worker( + queue, quiet=False, rq_username=None, rq_password=None, burst=False, strategy=None +): from frappe.utils.background_jobs import start_worker - start_worker(queue, quiet=quiet, rq_username=rq_username, rq_password=rq_password, burst=burst) + start_worker( + queue, + quiet=quiet, + rq_username=rq_username, + rq_password=rq_password, + burst=burst, + strategy=strategy, + ) @click.command("ready-for-migration") diff --git a/frappe/commands/site.py b/frappe/commands/site.py index 13f642ea76..37b1a36c6c 100644 --- a/frappe/commands/site.py +++ b/frappe/commands/site.py @@ -1119,7 +1119,7 @@ def build_search_index(context): @click.command("clear-log-table") -@click.option("--doctype", default="text", type=click.Choice(LOG_DOCTYPES), help="Log DocType") +@click.option("--doctype", required=True, type=click.Choice(LOG_DOCTYPES), help="Log DocType") @click.option("--days", type=int, help="Keep records for days") @click.option("--no-backup", is_flag=True, default=False, help="Do not backup the table") @pass_context diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 1457a821ec..3d1c33d258 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -3,13 +3,14 @@ import socket import time from collections import defaultdict from functools import lru_cache -from typing import TYPE_CHECKING, Any, NoReturn, Union +from typing import TYPE_CHECKING, Any, Literal, NoReturn, Union from uuid import uuid4 import redis from redis.exceptions import BusyLoadingError, ConnectionError from rq import Connection, Queue, Worker from rq.logutils import setup_loghandlers +from rq.worker import RandomWorker, RoundRobinWorker from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import frappe @@ -217,8 +218,11 @@ def start_worker( rq_username: str | None = None, rq_password: str | None = None, burst: bool = False, + strategy: Literal["round_robbin", "random"] | None = None, ) -> NoReturn | None: """Wrapper to start rq worker. Connects to redis and monitors these queues.""" + DEQUEUE_STRATEGIES = {"round_robbin": RoundRobinWorker, "random": RandomWorker} + with frappe.init_site(): # empty init is required to get redis_queue from common_site_config.json redis_connection = get_redis_conn(username=rq_username, password=rq_password) @@ -231,11 +235,16 @@ def start_worker( if os.environ.get("CI"): setup_loghandlers("ERROR") + WorkerKlass = Worker + if strategy and strategy in DEQUEUE_STRATEGIES: + WorkerKlass = DEQUEUE_STRATEGIES[strategy] + with Connection(redis_connection): logging_level = "INFO" if quiet: logging_level = "WARNING" - Worker(queues, name=get_worker_name(queue_name)).work(logging_level=logging_level, burst=burst) + worker = WorkerKlass(queues, name=get_worker_name(queue_name)) + worker.work(logging_level=logging_level, burst=burst) def get_worker_name(queue):