diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 2306bc9280..a9d65e9f39 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,4 +1,5 @@ import os +import random import signal import socket import time @@ -36,6 +37,9 @@ RQ_JOB_FAILURE_TTL = 7 * 24 * 60 * 60 # 7 days instead of 1 year (default) RQ_FAILED_JOBS_LIMIT = 1000 # Only keep these many recent failed jobs around RQ_RESULTS_TTL = 10 * 60 +RQ_MAX_JOBS = 5000 # Restart NOFORK workers after every N number of jobs +RQ_MAX_JOBS_JITTER = 50 # Random difference in max jobs to avoid restarting at same time + _redis_queue_conn = None @@ -376,6 +380,10 @@ class FrappeWorkerNoFork(FrappeWorker): super().__init__(*args, **kwargs) self.push_exc_handler(self.no_fork_exception_handler) + def work(self, *args, **kwargs): + kwargs["max_jobs"] = RQ_MAX_JOBS + random.randint(0, RQ_MAX_JOBS_JITTER) + return super().work(*args, **kwargs) + def execute_job(self, job: "Job", queue: "Queue"): """Execute job in same thread/process, do not fork()""" self.prepare_execution(job) @@ -437,13 +445,10 @@ def start_worker_pool( if quiet: logging_level = "WARNING" - worker_klass = ( - FrappeWorkerNoFork - # TODO: Make this true by default eventually. It's limited to RQ WorkerPool - if sbool(os.environ.get("FRAPPE_BACKGROUND_WORKERS_NOFORK", False)) - else FrappeWorker - ) + # TODO: Make this true by default eventually. It's limited to RQ WorkerPool + no_fork = sbool(os.environ.get("FRAPPE_BACKGROUND_WORKERS_NOFORK", False)) + worker_klass = FrappeWorkerNoFork if no_fork else FrappeWorker pool = WorkerPool( queues=queues, connection=redis_connection,