build: bump RQ to latest (#26576)

* build: Bump RQ

Latest version simplifies workerpool extension a bit

* fix: Disable RQ's scheduler

It's now enabled by default with no easy way to disable it except
upstream change or overriding the run_worker method. So better to
disable it with custom worker class.
This commit is contained in:
Ankush Menat 2024-05-27 19:24:34 +05:30 committed by GitHub
parent 394227a90f
commit 148b3f771d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 19 additions and 18 deletions

View file

@ -277,22 +277,6 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
frappe.destroy() frappe.destroy()
class FrappeWorker(Worker):
def work(self, *args, **kwargs):
self.start_frappe_scheduler()
return super().work(*args, **kwargs)
def run_maintenance_tasks(self, *args, **kwargs):
"""Attempt to start a scheduler in case the worker doing scheduling died."""
self.start_frappe_scheduler()
return super().run_maintenance_tasks(*args, **kwargs)
def start_frappe_scheduler(self):
from frappe.utils.scheduler import start_scheduler
Thread(target=start_scheduler, daemon=True).start()
def start_worker( def start_worker(
queue: str | None = None, queue: str | None = None,
quiet: bool = False, quiet: bool = False,
@ -300,7 +284,7 @@ def start_worker(
rq_password: str | None = None, rq_password: str | None = None,
burst: bool = False, burst: bool = False,
strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT, strategy: DequeueStrategy | None = DequeueStrategy.DEFAULT,
) -> None: # pragma: no cover ) -> NoReturn: # pragma: no cover
"""Wrapper to start rq worker. Connects to redis and monitors these queues.""" """Wrapper to start rq worker. Connects to redis and monitors these queues."""
if not strategy: if not strategy:
@ -336,6 +320,23 @@ def start_worker(
) )
class FrappeWorker(Worker):
def work(self, *args, **kwargs):
self.start_frappe_scheduler()
kwargs["with_scheduler"] = False # Always disable RQ scheduler
return super().work(*args, **kwargs)
def run_maintenance_tasks(self, *args, **kwargs):
"""Attempt to start a scheduler in case the worker doing scheduling died."""
self.start_frappe_scheduler()
return super().run_maintenance_tasks(*args, **kwargs)
def start_frappe_scheduler(self):
from frappe.utils.scheduler import start_scheduler
Thread(target=start_scheduler, daemon=True).start()
def start_worker_pool( def start_worker_pool(
queue: str | None = None, queue: str | None = None,
num_workers: int = 1, num_workers: int = 1,

View file

@ -63,7 +63,7 @@ dependencies = [
"setproctitle~=1.3.3", "setproctitle~=1.3.3",
"requests-oauthlib~=1.3.1", "requests-oauthlib~=1.3.1",
"requests~=2.32.0", "requests~=2.32.0",
"rq==1.15.1", "rq==1.16.2",
"rsa>=4.1", "rsa>=4.1",
"semantic-version~=2.10.0", "semantic-version~=2.10.0",
"sentry-sdk~=1.37.1", "sentry-sdk~=1.37.1",