fix: Exit worker on timeout exception

- Timeout in same process can leave bad state, so start a new worker.
- This isn't exactly costly, earlier we were forking all the time.
- Timeouts by definition can't happen too frequently.
This commit is contained in:
Ankush Menat 2025-02-17 17:57:21 +05:30
parent 3b4e1bbb48
commit 0cebc61de1

View file

@ -1,4 +1,3 @@
import gc
import os
import socket
import time
@ -18,7 +17,8 @@ from rq.defaults import DEFAULT_WORKER_TTL
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.logutils import setup_loghandlers
from rq.worker import DequeueStrategy, WorkerStatus
from rq.timeouts import JobTimeoutException
from rq.worker import DequeueStrategy, StopRequested, WorkerStatus
from rq.worker_pool import WorkerPool
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
@ -337,6 +337,7 @@ class FrappeWorker(Worker):
self.disable_forking = disable_forking
self.start_frappe_scheduler()
kwargs["with_scheduler"] = False # Always disable RQ scheduler
self.push_exc_handler(self.no_fork_exception_handler)
return super().work(*args, **kwargs)
def run_maintenance_tasks(self, *args, **kwargs):
@ -387,6 +388,12 @@ class FrappeWorker(Worker):
else:
return int(job.timeout or DEFAULT_WORKER_TTL) + 60
def no_fork_exception_handler(self, job, exc_type, exc_value, traceback):
if self.disable_forking and isinstance(exc_value, JobTimeoutException):
# This is done to avoid polluting global state from partial executions.
# More such cases MIGHT surface and this is where they should be handled.
raise StopRequested
def start_worker_pool(
queue: str | None = None,