Merge pull request #26292 from ankush/scheduler_reliability

fix: changes for scheduler reliability
This commit is contained in:
Ankush Menat 2024-05-02 13:52:25 +05:30 committed by GitHub
commit 1948422b63
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 55 additions and 10 deletions

View file

@ -115,7 +115,7 @@ class ScheduledJobType(Document):
}
if not self.cron_format:
self.cron_format = CRON_MAP[self.frequency]
self.cron_format = CRON_MAP.get(self.frequency)
# If this is a cold start then last_execution will not be set.
# Creation is set as fallback because if very old fallback is set job might trigger
@ -157,9 +157,8 @@ class ScheduledJobType(Document):
def update_scheduler_log(self, status):
if not self.create_log:
# self.get_next_execution will work properly iff self.last_execution is properly set
if self.frequency == "All" and status == "Start":
self.db_set("last_execution", now_datetime(), update_modified=False)
frappe.db.commit()
self.db_set("last_execution", now_datetime(), update_modified=False)
frappe.db.commit()
return
if not self.scheduler_log:
self.scheduler_log = frappe.get_doc(

View file

@ -56,6 +56,7 @@ frappe.ui.form.on("System Health Report", {
val > 3 &&
frm.doc.total_outgoing_emails > 3 &&
val / frm.doc.total_outgoing_emails > 0.1,
oldest_unscheduled_job: (val) => !!val,
"queue_status.pending_jobs": (val) => val > 50,
"background_workers.utilization": (val) => val > 70,
"background_workers.failed_jobs": (val) => val > 50,
@ -72,6 +73,9 @@ frappe.ui.form.on("System Health Report", {
document.head.appendChild(style);
const update_fields = () => {
if (!frappe.get_route().includes(frm.doc.name)) {
clearInterval(interval);
}
Object.entries(conditions).forEach(([field, condition]) => {
try {
if (field.includes(".")) {
@ -93,6 +97,6 @@ frappe.ui.form.on("System Health Report", {
};
update_fields();
setInterval(update_fields, 1000);
const interval = setInterval(update_fields, 1000);
},
});

View file

@ -17,6 +17,9 @@
"background_workers",
"scheduler_section",
"scheduler_status",
"column_break_bxog",
"oldest_unscheduled_job",
"section_break_vpuw",
"failing_scheduled_jobs",
"database_section",
"database",
@ -368,6 +371,7 @@
{
"fieldname": "scheduler_section",
"fieldtype": "Section Break",
"hide_border": 1,
"label": "Scheduler"
},
{
@ -375,6 +379,20 @@
"fieldtype": "Table",
"label": "Failing Scheduled Jobs (last 7 days)",
"options": "System Health Report Failing Jobs"
},
{
"fieldname": "column_break_bxog",
"fieldtype": "Column Break"
},
{
"fieldname": "oldest_unscheduled_job",
"fieldtype": "Link",
"label": "Oldest Unscheduled Job",
"options": "Scheduled Job Type"
},
{
"fieldname": "section_break_vpuw",
"fieldtype": "Section Break"
}
],
"hide_toolbar": 1,
@ -382,7 +400,7 @@
"is_virtual": 1,
"issingle": 1,
"links": [],
"modified": "2024-04-22 11:47:52.194784",
"modified": "2024-05-02 13:32:16.495750",
"modified_by": "Administrator",
"module": "Desk",
"name": "System Health Report",

View file

@ -22,11 +22,12 @@ from collections.abc import Callable
from contextlib import contextmanager
import frappe
from frappe.core.doctype.scheduled_job_type.scheduled_job_type import ScheduledJobType
from frappe.model.document import Document
from frappe.utils.background_jobs import get_queue, get_queue_list, get_redis_conn
from frappe.utils.caching import redis_cache
from frappe.utils.data import add_to_date
from frappe.utils.scheduler import get_scheduler_status
from frappe.utils.scheduler import get_scheduler_status, get_scheduler_tick
@contextmanager
@ -103,6 +104,7 @@ class SystemHealthReport(Document):
handled_emails: DF.Int
last_10_active_users: DF.Code | None
new_users: DF.Int
oldest_unscheduled_job: DF.Link | None
onsite_backups: DF.Int
pending_emails: DF.Int
private_files_size: DF.Float
@ -204,6 +206,18 @@ class SystemHealthReport(Document):
for job in failing_jobs:
self.append("failing_scheduled_jobs", job)
threshold = add_to_date(None, seconds=-30 * get_scheduler_tick(), as_datetime=True)
for job_type in frappe.get_all(
"Scheduled Job Type",
filters={"stopped": 0, "last_execution": ("<", threshold)},
fields="*",
order_by="last_execution asc",
):
job_type: ScheduledJobType = frappe.get_doc(doctype="Scheduled Job Type", **job_type)
if job_type.is_event_due():
self.oldest_unscheduled_job = job_type.name
break
@health_check("Emails")
def fetch_email_stats(self):
threshold = add_to_date(None, days=-7, as_datetime=True)

View file

@ -15,6 +15,7 @@ import time
from typing import NoReturn
import setproctitle
from croniter import CroniterBadCronError
# imports - module imports
import frappe
@ -42,7 +43,7 @@ def start_scheduler() -> NoReturn:
"""Run enqueue_events_for_all_sites based on scheduler tick.
Specify scheduler_interval in seconds in common_site_config.json"""
tick = cint(frappe.get_conf().scheduler_tick_interval) or 60
tick = get_scheduler_tick()
set_niceness()
with filelock("scheduler_process", timeout=1, is_global=True):
@ -100,8 +101,13 @@ def enqueue_events() -> list[str] | None:
enqueued_jobs = []
for job_type in frappe.get_all("Scheduled Job Type", filters={"stopped": 0}, fields="*"):
job_type = frappe.get_doc(doctype="Scheduled Job Type", **job_type)
if job_type.enqueue():
enqueued_jobs.append(job_type.method)
try:
if job_type.enqueue():
enqueued_jobs.append(job_type.method)
except CroniterBadCronError:
frappe.logger("scheduler").error(
f"Invalid Job on {frappe.local.site} - {job_type.name}", exc_info=True
)
return enqueued_jobs
@ -206,3 +212,7 @@ def get_scheduler_status():
if is_scheduler_inactive():
return {"status": "inactive"}
return {"status": "active"}
def get_scheduler_tick() -> int:
return cint(frappe.get_conf().scheduler_tick_interval) or 60