From f7ff829ea797ed58a2071ef2341a441096f5deb4 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Sun, 26 May 2024 11:08:01 +0530 Subject: [PATCH] perf: Merge worker and scheduler --- frappe/utils/background_jobs.py | 21 ++++++++++++++++++++- frappe/utils/scheduler.py | 26 ++++++++++++++++---------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index eb505c9652..ff7968375e 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -6,6 +6,7 @@ from collections import defaultdict from collections.abc import Callable from contextlib import suppress from functools import lru_cache +from threading import Thread from typing import Any, NoReturn from uuid import uuid4 @@ -276,6 +277,22 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, frappe.destroy() +class FrappeWorker(Worker): + def work(self, *args, **kwargs): + self.start_frappe_scheduler() + return super().work(*args, **kwargs) + + def run_maintenance_tasks(self, *args, **kwargs): + """Attempt to start a scheduler in case the worker doing scheduling died.""" + self.start_frappe_scheduler() + return super().run_maintenance_tasks(*args, **kwargs) + + def start_frappe_scheduler(self): + from frappe.utils.scheduler import start_scheduler + + Thread(target=start_scheduler, daemon=True).start() + + def start_worker( queue: str | None = None, quiet: bool = False, @@ -283,6 +300,7 @@ def start_worker( rq_password: str | None = None, burst: bool = False, strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT, + no_scheduler=False, ) -> None: # pragma: no cover """Wrapper to start rq worker. Connects to redis and monitors these queues.""" @@ -309,7 +327,7 @@ def start_worker( if quiet: logging_level = "WARNING" - worker = Worker(queues, connection=redis_connection) + worker = FrappeWorker(queues, connection=redis_connection) worker.work( logging_level=logging_level, burst=burst, @@ -363,6 +381,7 @@ def start_worker_pool( queues=queues, connection=redis_connection, num_workers=num_workers, + worker_class=FrappeWorker, ) pool.start(logging_level=logging_level, burst=burst) diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 0f09f23033..09a3e399c5 100644 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -8,7 +8,6 @@ Events: weekly """ -# imports - standard imports import os import random import time @@ -16,12 +15,11 @@ from typing import NoReturn import setproctitle from croniter import CroniterBadCronError +from filelock import FileLock, Timeout -# imports - module imports import frappe -from frappe.utils import cint, get_datetime, get_sites, now_datetime +from frappe.utils import cint, get_bench_path, get_datetime, get_sites, now_datetime from frappe.utils.background_jobs import set_niceness -from frappe.utils.synchronization import filelock DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -36,7 +34,7 @@ def cprint(*args, **kwargs): def _proctitle(message): - setproctitle.setproctitle(f"frappe-scheduler: {message}") + setproctitle.setthreadtitle(f"frappe-scheduler: {message}") def start_scheduler() -> NoReturn: @@ -46,11 +44,19 @@ def start_scheduler() -> NoReturn: tick = get_scheduler_tick() set_niceness() - with filelock("scheduler_process", timeout=1, is_global=True): - while True: - _proctitle("idle") - time.sleep(tick) - enqueue_events_for_all_sites() + lock_path = os.path.abspath(os.path.join(get_bench_path(), "config", "scheduler_process")) + + try: + lock = FileLock(lock_path) + lock.acquire(blocking=False) + except Timeout: + frappe.logger("scheduler").debug("Scheduler already running") + return + + while True: + _proctitle("idle") + time.sleep(tick) + enqueue_events_for_all_sites() def enqueue_events_for_all_sites() -> None: