diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 86b71c91a7..d1fb1428da 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -14,10 +14,11 @@ import redis import setproctitle from redis.exceptions import BusyLoadingError, ConnectionError from rq import Callback, Queue, Worker +from rq.defaults import DEFAULT_WORKER_TTL from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import DequeueStrategy +from rq.worker import DequeueStrategy, WorkerStatus from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -332,7 +333,8 @@ def start_worker( class FrappeWorker(Worker): - def work(self, *args, **kwargs): + def work(self, *args, disable_forking=True, **kwargs): + self.disable_forking = disable_forking self.start_frappe_scheduler() kwargs["with_scheduler"] = False # Always disable RQ scheduler return super().work(*args, **kwargs) @@ -367,6 +369,24 @@ class FrappeWorker(Worker): self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload}) self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=2, daemon=True) + def execute_job(self, job: "Job", queue: "Queue"): + """Execute job in same thread/process, do not fork()""" + if not self.disable_forking: + return super().execute_job(job, queue) + + self.prepare_execution(job) + self.perform_job(job, queue) + self.set_state(WorkerStatus.IDLE) + + def get_heartbeat_ttl(self, job: "Job") -> int: + if not self.disable_forking: + return super().get_heartbeat_ttl(job) + + if job.timeout == -1: + return DEFAULT_WORKER_TTL + else: + return int(job.timeout or DEFAULT_WORKER_TTL) + 60 + def start_worker_pool( queue: str | None = None,