From 70e56b2e145b160ac370cb23b23ee20b2c10b5bf Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Wed, 23 Apr 2025 11:54:21 +0530 Subject: [PATCH] perf: Selectively switch to LIFO ordering when queue is starved (#32226) When queue is overloaded every job gets delayed by size of the queue, this means even interactive jobs like prepared reports face significant wait times. This flag allows developer to selectively enable LIFO on such jobs where ordering doesn't matter. Any time we observe queue to be too large, we'll insert the job at front so it gets highest priority. This is a common strategy to deal with queue starvation, we are only applying it explicitly because job execution order matters for correctness in some cases. --- frappe/core/doctype/prepared_report/prepared_report.py | 1 + frappe/desk/page/setup_wizard/setup_wizard.py | 2 +- frappe/model/workflow.py | 1 + frappe/utils/background_jobs.py | 9 +++++++++ frappe/utils/print_format.py | 1 + 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/frappe/core/doctype/prepared_report/prepared_report.py b/frappe/core/doctype/prepared_report/prepared_report.py index b54bfcc430..32e5f3ea40 100644 --- a/frappe/core/doctype/prepared_report/prepared_report.py +++ b/frappe/core/doctype/prepared_report/prepared_report.py @@ -80,6 +80,7 @@ class PreparedReport(Document): prepared_report=self.name, timeout=timeout or REPORT_TIMEOUT, enqueue_after_commit=True, + at_front_when_starved=True, ) def get_prepared_data(self, with_file_name=False): diff --git a/frappe/desk/page/setup_wizard/setup_wizard.py b/frappe/desk/page/setup_wizard/setup_wizard.py index 588bfe2da9..ce4db123c5 100755 --- a/frappe/desk/page/setup_wizard/setup_wizard.py +++ b/frappe/desk/page/setup_wizard/setup_wizard.py @@ -55,7 +55,7 @@ def setup_complete(args): is_background_task = frappe.conf.get("trigger_site_setup_in_background") if is_background_task: - process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True) + process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True, at_front=True) return {"status": "registered"} else: return process_setup_stages(stages, args) diff --git a/frappe/model/workflow.py b/frappe/model/workflow.py index 1c490cfeb7..69eafe6c6b 100644 --- a/frappe/model/workflow.py +++ b/frappe/model/workflow.py @@ -248,6 +248,7 @@ def bulk_workflow_approval(docnames, doctype, action): action=action, queue="short", timeout=1000, + at_front_when_starved=True, ) else: frappe.throw(_("Bulk approval only support up to 500 documents."), title=_("Too Many Documents")) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 480e634d97..f42fef8032 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -41,6 +41,9 @@ RQ_MAX_JOBS = 5000 # Restart NOFORK workers after every N number of jobs RQ_MAX_JOBS_JITTER = 50 # Random difference in max jobs to avoid restarting at same time MAX_QUEUED_JOBS = 500 # frappe.enqueue will start failing when these many jobs exist in queue. +# When too many jobs are pending in queue, order can be selectively flipped to LIFO to give better +# response latencies to interactive jobs. +QUEUE_STARVATION_THRESHOLD = 16 _redis_queue_conn = None @@ -84,6 +87,7 @@ def enqueue( at_front: bool = False, job_id: str | None = None, deduplicate=False, + at_front_when_starved=False, **kwargs, ) -> Job | Any: """ @@ -105,6 +109,8 @@ def enqueue( :param kwargs: keyword arguments to be passed to the method :param deduplicate: do not re-queue job if it's already queued, requires job_id. :param job_id: Assigning unique job id, which can be checked using `is_job_enqueued` + :param at_front_when_starved: If the queue appears to be starved then new jobs are + automatically inserted in LIFO fashion. """ # To handle older implementations is_async = kwargs.pop("async", is_async) @@ -179,6 +185,9 @@ def enqueue( on_failure = on_failure or truncate_failed_registry + if at_front_when_starved and q.count > QUEUE_STARVATION_THRESHOLD: + at_front = True + def enqueue_call(): return q.enqueue_call( "frappe.utils.background_jobs.execute_job", diff --git a/frappe/utils/print_format.py b/frappe/utils/print_format.py index e64ad10545..379c037e03 100644 --- a/frappe/utils/print_format.py +++ b/frappe/utils/print_format.py @@ -64,6 +64,7 @@ def download_multi_pdf_async( letterhead=letterhead, options=options, queue="long" if doc_count > 20 else "short", + at_front_when_starved=True, ) frappe.local.response["http_status_code"] = http.HTTPStatus.CREATED return {"task_id": task_id}