perf: Selectively switch to LIFO ordering when queue is starved (#32226)

When queue is overloaded every job gets delayed by size of the queue,
this means even interactive jobs like prepared reports face significant
wait times.

This flag allows developer to selectively enable LIFO on such jobs where
ordering doesn't matter. Any time we observe queue to be too large,
we'll insert the job at front so it gets highest priority.

This is a common strategy to deal with queue starvation, we are only
applying it explicitly because job execution order matters for
correctness in some cases.
This commit is contained in:
Ankush Menat 2025-04-23 11:54:21 +05:30 committed by GitHub
parent 3cd9b577e2
commit 70e56b2e14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 13 additions and 1 deletions

View file

@ -80,6 +80,7 @@ class PreparedReport(Document):
prepared_report=self.name,
timeout=timeout or REPORT_TIMEOUT,
enqueue_after_commit=True,
at_front_when_starved=True,
)
def get_prepared_data(self, with_file_name=False):

View file

@ -55,7 +55,7 @@ def setup_complete(args):
is_background_task = frappe.conf.get("trigger_site_setup_in_background")
if is_background_task:
process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True)
process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True, at_front=True)
return {"status": "registered"}
else:
return process_setup_stages(stages, args)

View file

@ -248,6 +248,7 @@ def bulk_workflow_approval(docnames, doctype, action):
action=action,
queue="short",
timeout=1000,
at_front_when_starved=True,
)
else:
frappe.throw(_("Bulk approval only support up to 500 documents."), title=_("Too Many Documents"))

View file

@ -41,6 +41,9 @@ RQ_MAX_JOBS = 5000 # Restart NOFORK workers after every N number of jobs
RQ_MAX_JOBS_JITTER = 50 # Random difference in max jobs to avoid restarting at same time
MAX_QUEUED_JOBS = 500 # frappe.enqueue will start failing when these many jobs exist in queue.
# When too many jobs are pending in queue, order can be selectively flipped to LIFO to give better
# response latencies to interactive jobs.
QUEUE_STARVATION_THRESHOLD = 16
_redis_queue_conn = None
@ -84,6 +87,7 @@ def enqueue(
at_front: bool = False,
job_id: str | None = None,
deduplicate=False,
at_front_when_starved=False,
**kwargs,
) -> Job | Any:
"""
@ -105,6 +109,8 @@ def enqueue(
:param kwargs: keyword arguments to be passed to the method
:param deduplicate: do not re-queue job if it's already queued, requires job_id.
:param job_id: Assigning unique job id, which can be checked using `is_job_enqueued`
:param at_front_when_starved: If the queue appears to be starved then new jobs are
automatically inserted in LIFO fashion.
"""
# To handle older implementations
is_async = kwargs.pop("async", is_async)
@ -179,6 +185,9 @@ def enqueue(
on_failure = on_failure or truncate_failed_registry
if at_front_when_starved and q.count > QUEUE_STARVATION_THRESHOLD:
at_front = True
def enqueue_call():
return q.enqueue_call(
"frappe.utils.background_jobs.execute_job",

View file

@ -64,6 +64,7 @@ def download_multi_pdf_async(
letterhead=letterhead,
options=options,
queue="long" if doc_count > 20 else "short",
at_front_when_starved=True,
)
frappe.local.response["http_status_code"] = http.HTTPStatus.CREATED
return {"task_id": task_id}