diff --git a/frappe/core/doctype/prepared_report/prepared_report.py b/frappe/core/doctype/prepared_report/prepared_report.py index b54bfcc430..32e5f3ea40 100644 --- a/frappe/core/doctype/prepared_report/prepared_report.py +++ b/frappe/core/doctype/prepared_report/prepared_report.py @@ -80,6 +80,7 @@ class PreparedReport(Document): prepared_report=self.name, timeout=timeout or REPORT_TIMEOUT, enqueue_after_commit=True, + at_front_when_starved=True, ) def get_prepared_data(self, with_file_name=False): diff --git a/frappe/desk/page/setup_wizard/setup_wizard.py b/frappe/desk/page/setup_wizard/setup_wizard.py index 588bfe2da9..ce4db123c5 100755 --- a/frappe/desk/page/setup_wizard/setup_wizard.py +++ b/frappe/desk/page/setup_wizard/setup_wizard.py @@ -55,7 +55,7 @@ def setup_complete(args): is_background_task = frappe.conf.get("trigger_site_setup_in_background") if is_background_task: - process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True) + process_setup_stages.enqueue(stages=stages, user_input=args, is_background_task=True, at_front=True) return {"status": "registered"} else: return process_setup_stages(stages, args) diff --git a/frappe/model/workflow.py b/frappe/model/workflow.py index 1c490cfeb7..69eafe6c6b 100644 --- a/frappe/model/workflow.py +++ b/frappe/model/workflow.py @@ -248,6 +248,7 @@ def bulk_workflow_approval(docnames, doctype, action): action=action, queue="short", timeout=1000, + at_front_when_starved=True, ) else: frappe.throw(_("Bulk approval only support up to 500 documents."), title=_("Too Many Documents")) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 480e634d97..f42fef8032 100644 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -41,6 +41,9 @@ RQ_MAX_JOBS = 5000 # Restart NOFORK workers after every N number of jobs RQ_MAX_JOBS_JITTER = 50 # Random difference in max jobs to avoid restarting at same time MAX_QUEUED_JOBS = 500 # frappe.enqueue will start failing when these many jobs exist in queue. +# When too many jobs are pending in queue, order can be selectively flipped to LIFO to give better +# response latencies to interactive jobs. +QUEUE_STARVATION_THRESHOLD = 16 _redis_queue_conn = None @@ -84,6 +87,7 @@ def enqueue( at_front: bool = False, job_id: str | None = None, deduplicate=False, + at_front_when_starved=False, **kwargs, ) -> Job | Any: """ @@ -105,6 +109,8 @@ def enqueue( :param kwargs: keyword arguments to be passed to the method :param deduplicate: do not re-queue job if it's already queued, requires job_id. :param job_id: Assigning unique job id, which can be checked using `is_job_enqueued` + :param at_front_when_starved: If the queue appears to be starved then new jobs are + automatically inserted in LIFO fashion. """ # To handle older implementations is_async = kwargs.pop("async", is_async) @@ -179,6 +185,9 @@ def enqueue( on_failure = on_failure or truncate_failed_registry + if at_front_when_starved and q.count > QUEUE_STARVATION_THRESHOLD: + at_front = True + def enqueue_call(): return q.enqueue_call( "frappe.utils.background_jobs.execute_job", diff --git a/frappe/utils/print_format.py b/frappe/utils/print_format.py index e64ad10545..379c037e03 100644 --- a/frappe/utils/print_format.py +++ b/frappe/utils/print_format.py @@ -64,6 +64,7 @@ def download_multi_pdf_async( letterhead=letterhead, options=options, queue="long" if doc_count > 20 else "short", + at_front_when_starved=True, ) frappe.local.response["http_status_code"] = http.HTTPStatus.CREATED return {"task_id": task_id}