Merge pull request #31489 from ankush/backpressure

fix: Implement backpressure for background jobs
This commit is contained in:
Ankush Menat 2025-03-05 11:18:39 +05:30 committed by GitHub
commit 6ea1d2c7cb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 37 additions and 3 deletions

View file

@ -67,8 +67,8 @@ frappe.ui.form.on("System Health Report", {
const style = document.createElement("style");
style.innerText = `.health-check-failed {
font-weight: bold;
color: var(--text-colour);
background-color: var(--bg-red);
color: var(--text-colour) !important;
background-color: var(--bg-red) !important;
}`;
document.head.appendChild(style);

View file

@ -263,6 +263,10 @@ class SessionBootFailed(ValidationError):
http_status_code = 500
class QueueOverloaded(ValidationError):
http_status_code = 503
class PrintFormatError(ValidationError):
pass

View file

@ -27,7 +27,8 @@ from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fi
import frappe
import frappe.monitor
from frappe import _
from frappe.utils import CallbackManager, cint, get_bench_id
from frappe.utils import CallbackManager, cint, get_bench_id, get_sites
from frappe.utils.caching import site_cache
from frappe.utils.commands import log
from frappe.utils.data import sbool
from frappe.utils.redis_queue import RedisQueue
@ -40,6 +41,8 @@ RQ_RESULTS_TTL = 10 * 60
RQ_MAX_JOBS = 5000 # Restart NOFORK workers after every N number of jobs
RQ_MAX_JOBS_JITTER = 50 # Random difference in max jobs to avoid restarting at same time
MAX_QUEUED_JOBS = 500 # frappe.enqueue will start failing when these many jobs exist in queue.
_redis_queue_conn = None
@ -154,6 +157,8 @@ def enqueue(
raise
_check_queue_size(q)
if not timeout:
timeout = get_queues_timeout().get(queue) or 300
@ -723,6 +728,31 @@ def flush_telemetry():
ph and ph.flush()
def _check_queue_size(q: Queue):
max_jobs = cint(frappe.conf.max_queued_jobs) or MAX_QUEUED_JOBS
# Workaround for arbitrarily sized benches,
# TODO: Some concept of site-based fairness on consumption of queue
max_jobs += _site_count() * 50
if cint(q.count) >= max_jobs:
primary_action = {
"label": "Monitor System Health",
"client_action": "frappe.set_route",
"args": ["Form", "System Health Report"],
}
frappe.throw(
_("Too many queued background jobs ({0}). Please retry after some time.").format(max_jobs),
title=_("Queue Overloaded"),
exc=frappe.QueueOverloaded,
primary_action=primary_action if frappe.has_permission("System Health Report") else None,
)
@site_cache(ttl=10 * 60)
def _site_count() -> int:
return len(get_sites())
def _start_sentry():
sentry_dsn = os.getenv("FRAPPE_SENTRY_DSN")
if not sentry_dsn: