From 3b4e1bbb48606efa1458b606cdc8877c4f41005e Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 17 Feb 2025 09:57:12 +0530 Subject: [PATCH] perf: Support running RQ worker without forking Frappe does everything required to run jobs without forking. - We cleanup locals - Global caches are invalidated appropriately - We destroy connections and open a new one for job/request So we don't have any safety benefit much from forking for every job. There can still be some code out in wild that relies on this behaviour though! Pros: - Almost zero-overhead background jobs, just like web requests. Cons: - Timeout need to be implemented separately now - Unknown side effects! - Some features like "stop job" will need separate implementations. --- frappe/utils/background_jobs.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 86b71c91a7..d1fb1428da 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -14,10 +14,11 @@ import redis import setproctitle from redis.exceptions import BusyLoadingError, ConnectionError from rq import Callback, Queue, Worker +from rq.defaults import DEFAULT_WORKER_TTL from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import DequeueStrategy +from rq.worker import DequeueStrategy, WorkerStatus from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -332,7 +333,8 @@ def start_worker( class FrappeWorker(Worker): - def work(self, *args, **kwargs): + def work(self, *args, disable_forking=True, **kwargs): + self.disable_forking = disable_forking self.start_frappe_scheduler() kwargs["with_scheduler"] = False # Always disable RQ scheduler return super().work(*args, **kwargs) @@ -367,6 +369,24 @@ class FrappeWorker(Worker): self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload}) self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=2, daemon=True) + def execute_job(self, job: "Job", queue: "Queue"): + """Execute job in same thread/process, do not fork()""" + if not self.disable_forking: + return super().execute_job(job, queue) + + self.prepare_execution(job) + self.perform_job(job, queue) + self.set_state(WorkerStatus.IDLE) + + def get_heartbeat_ttl(self, job: "Job") -> int: + if not self.disable_forking: + return super().get_heartbeat_ttl(job) + + if job.timeout == -1: + return DEFAULT_WORKER_TTL + else: + return int(job.timeout or DEFAULT_WORKER_TTL) + 60 + def start_worker_pool( queue: str | None = None,