perf: Support running RQ worker without forking

Frappe does everything required to run jobs without forking.
- We cleanup locals
- Global caches are invalidated appropriately
- We destroy connections and open a new one for job/request

So we don't have any safety benefit much from forking for every job.
There can still be some code out in wild that relies on this behaviour
though!

Pros:
- Almost zero-overhead background jobs, just like web requests.

Cons:
- Timeout need to be implemented separately now
- Unknown side effects!
- Some features like "stop job" will need separate implementations.
This commit is contained in:
Ankush Menat 2025-02-17 09:57:12 +05:30
parent d0c3a8ee56
commit 3b4e1bbb48

View file

@ -14,10 +14,11 @@ import redis
import setproctitle
from redis.exceptions import BusyLoadingError, ConnectionError
from rq import Callback, Queue, Worker
from rq.defaults import DEFAULT_WORKER_TTL
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.logutils import setup_loghandlers
from rq.worker import DequeueStrategy
from rq.worker import DequeueStrategy, WorkerStatus
from rq.worker_pool import WorkerPool
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
@ -332,7 +333,8 @@ def start_worker(
class FrappeWorker(Worker):
def work(self, *args, **kwargs):
def work(self, *args, disable_forking=True, **kwargs):
self.disable_forking = disable_forking
self.start_frappe_scheduler()
kwargs["with_scheduler"] = False # Always disable RQ scheduler
return super().work(*args, **kwargs)
@ -367,6 +369,24 @@ class FrappeWorker(Worker):
self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload})
self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=2, daemon=True)
def execute_job(self, job: "Job", queue: "Queue"):
"""Execute job in same thread/process, do not fork()"""
if not self.disable_forking:
return super().execute_job(job, queue)
self.prepare_execution(job)
self.perform_job(job, queue)
self.set_state(WorkerStatus.IDLE)
def get_heartbeat_ttl(self, job: "Job") -> int:
if not self.disable_forking:
return super().get_heartbeat_ttl(job)
if job.timeout == -1:
return DEFAULT_WORKER_TTL
else:
return int(job.timeout or DEFAULT_WORKER_TTL) + 60
def start_worker_pool(
queue: str | None = None,