perf: Merge worker and scheduler

This commit is contained in:
Ankush Menat 2024-05-26 11:08:01 +05:30
parent 43c8ae0049
commit f7ff829ea7
2 changed files with 36 additions and 11 deletions

View file

@ -6,6 +6,7 @@ from collections import defaultdict
from collections.abc import Callable
from contextlib import suppress
from functools import lru_cache
from threading import Thread
from typing import Any, NoReturn
from uuid import uuid4
@ -276,6 +277,22 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
frappe.destroy()
class FrappeWorker(Worker):
def work(self, *args, **kwargs):
self.start_frappe_scheduler()
return super().work(*args, **kwargs)
def run_maintenance_tasks(self, *args, **kwargs):
"""Attempt to start a scheduler in case the worker doing scheduling died."""
self.start_frappe_scheduler()
return super().run_maintenance_tasks(*args, **kwargs)
def start_frappe_scheduler(self):
from frappe.utils.scheduler import start_scheduler
Thread(target=start_scheduler, daemon=True).start()
def start_worker(
queue: str | None = None,
quiet: bool = False,
@ -283,6 +300,7 @@ def start_worker(
rq_password: str | None = None,
burst: bool = False,
strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT,
no_scheduler=False,
) -> None: # pragma: no cover
"""Wrapper to start rq worker. Connects to redis and monitors these queues."""
@ -309,7 +327,7 @@ def start_worker(
if quiet:
logging_level = "WARNING"
worker = Worker(queues, connection=redis_connection)
worker = FrappeWorker(queues, connection=redis_connection)
worker.work(
logging_level=logging_level,
burst=burst,
@ -363,6 +381,7 @@ def start_worker_pool(
queues=queues,
connection=redis_connection,
num_workers=num_workers,
worker_class=FrappeWorker,
)
pool.start(logging_level=logging_level, burst=burst)

View file

@ -8,7 +8,6 @@ Events:
weekly
"""
# imports - standard imports
import os
import random
import time
@ -16,12 +15,11 @@ from typing import NoReturn
import setproctitle
from croniter import CroniterBadCronError
from filelock import FileLock, Timeout
# imports - module imports
import frappe
from frappe.utils import cint, get_datetime, get_sites, now_datetime
from frappe.utils import cint, get_bench_path, get_datetime, get_sites, now_datetime
from frappe.utils.background_jobs import set_niceness
from frappe.utils.synchronization import filelock
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
@ -36,7 +34,7 @@ def cprint(*args, **kwargs):
def _proctitle(message):
setproctitle.setproctitle(f"frappe-scheduler: {message}")
setproctitle.setthreadtitle(f"frappe-scheduler: {message}")
def start_scheduler() -> NoReturn:
@ -46,11 +44,19 @@ def start_scheduler() -> NoReturn:
tick = get_scheduler_tick()
set_niceness()
with filelock("scheduler_process", timeout=1, is_global=True):
while True:
_proctitle("idle")
time.sleep(tick)
enqueue_events_for_all_sites()
lock_path = os.path.abspath(os.path.join(get_bench_path(), "config", "scheduler_process"))
try:
lock = FileLock(lock_path)
lock.acquire(blocking=False)
except Timeout:
frappe.logger("scheduler").debug("Scheduler already running")
return
while True:
_proctitle("idle")
time.sleep(tick)
enqueue_events_for_all_sites()
def enqueue_events_for_all_sites() -> None: