diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index d1fb1428da..c9cb306f55 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,4 +1,3 @@ -import gc import os import socket import time @@ -18,7 +17,8 @@ from rq.defaults import DEFAULT_WORKER_TTL from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import DequeueStrategy, WorkerStatus +from rq.timeouts import JobTimeoutException +from rq.worker import DequeueStrategy, StopRequested, WorkerStatus from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -337,6 +337,7 @@ class FrappeWorker(Worker): self.disable_forking = disable_forking self.start_frappe_scheduler() kwargs["with_scheduler"] = False # Always disable RQ scheduler + self.push_exc_handler(self.no_fork_exception_handler) return super().work(*args, **kwargs) def run_maintenance_tasks(self, *args, **kwargs): @@ -387,6 +388,12 @@ class FrappeWorker(Worker): else: return int(job.timeout or DEFAULT_WORKER_TTL) + 60 + def no_fork_exception_handler(self, job, exc_type, exc_value, traceback): + if self.disable_forking and isinstance(exc_value, JobTimeoutException): + # This is done to avoid polluting global state from partial executions. + # More such cases MIGHT surface and this is where they should be handled. + raise StopRequested + def start_worker_pool( queue: str | None = None,