From 0cebc61de1b1e6805706fd0d30f97c4604aef00a Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 17 Feb 2025 17:57:21 +0530 Subject: [PATCH] fix: Exit worker on timeout exception - Timeout in same process can leave bad state, so start a new worker. - This isn't exactly costly, earlier we were forking all the time. - Timeouts by definition can't happen too frequently. --- frappe/utils/background_jobs.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index d1fb1428da..c9cb306f55 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -1,4 +1,3 @@ -import gc import os import socket import time @@ -18,7 +17,8 @@ from rq.defaults import DEFAULT_WORKER_TTL from rq.exceptions import NoSuchJobError from rq.job import Job, JobStatus from rq.logutils import setup_loghandlers -from rq.worker import DequeueStrategy, WorkerStatus +from rq.timeouts import JobTimeoutException +from rq.worker import DequeueStrategy, StopRequested, WorkerStatus from rq.worker_pool import WorkerPool from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -337,6 +337,7 @@ class FrappeWorker(Worker): self.disable_forking = disable_forking self.start_frappe_scheduler() kwargs["with_scheduler"] = False # Always disable RQ scheduler + self.push_exc_handler(self.no_fork_exception_handler) return super().work(*args, **kwargs) def run_maintenance_tasks(self, *args, **kwargs): @@ -387,6 +388,12 @@ class FrappeWorker(Worker): else: return int(job.timeout or DEFAULT_WORKER_TTL) + 60 + def no_fork_exception_handler(self, job, exc_type, exc_value, traceback): + if self.disable_forking and isinstance(exc_value, JobTimeoutException): + # This is done to avoid polluting global state from partial executions. + # More such cases MIGHT surface and this is where they should be handled. + raise StopRequested + def start_worker_pool( queue: str | None = None,