seitime-frappe/frappe/utils/scheduler.py
Ankush Menat 5a5ad3032c
fix: procline crash (#31845)
* fix: Skip rollback if no DB

* fix: drop proctitle from dependency

- While it improves our own processes, it makes gunicorn process titles worse
- It also crashes on mac because of some unknown reason related to fork-safety.

Best to not use this.
2025-03-21 12:30:19 +05:30

266 lines
6.8 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
"""
Events:
always
daily
monthly
weekly
"""
import datetime
import os
import random
import time
from typing import NoReturn
from croniter import CroniterBadCronError
from filelock import FileLock, Timeout
import frappe
from frappe.utils import cint, get_bench_path, get_datetime, get_sites, now_datetime
from frappe.utils.background_jobs import set_niceness
from frappe.utils.caching import redis_cache
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_SCHEDULER_TICK = 4 * 60
def cprint(*args, **kwargs):
"""Prints only if called from STDOUT"""
try:
os.get_terminal_size()
print(*args, **kwargs)
except Exception:
pass
def start_scheduler() -> NoReturn:
"""Run enqueue_events_for_all_sites based on scheduler tick.
Specify scheduler_interval in seconds in common_site_config.json"""
tick = get_scheduler_tick()
set_niceness()
lock_path = _get_scheduler_lock_file()
try:
lock = FileLock(lock_path)
lock.acquire(blocking=False)
except Timeout:
frappe.logger("scheduler").debug("Scheduler already running")
return
while True:
time.sleep(sleep_duration(tick))
enqueue_events_for_all_sites()
def _get_scheduler_lock_file() -> True:
return os.path.abspath(os.path.join(get_bench_path(), "config", "scheduler_process"))
def is_schduler_process_running() -> bool:
"""Checks if any other process is holding the lock.
Note: FLOCK is held by process until it exits, this function just checks if process is
running or not. We can't determine if process is stuck somehwere.
"""
try:
lock = FileLock(_get_scheduler_lock_file())
lock.acquire(blocking=False)
lock.release()
return False
except Timeout:
return True
def sleep_duration(tick):
if tick != DEFAULT_SCHEDULER_TICK:
# Assuming user knows what they want.
return tick
# Sleep until next multiple of tick.
# This makes scheduler aligned with real clock,
# so event scheduled at 12:00 happen at 12:00 and not 12:00:35.
minutes = tick // 60
now = datetime.datetime.now(datetime.timezone.utc)
left_minutes = minutes - now.minute % minutes
next_execution = now.replace(second=0) + datetime.timedelta(minutes=left_minutes)
return (next_execution - now).total_seconds()
def enqueue_events_for_all_sites() -> None:
"""Loop through sites and enqueue events that are not already queued"""
with frappe.init_site():
sites = get_sites()
# Sites are sorted in alphabetical order, shuffle to randomize priorities
random.shuffle(sites)
for site in sites:
try:
enqueue_events_for_site(site=site)
except Exception:
frappe.logger("scheduler").debug(f"Failed to enqueue events for site: {site}", exc_info=True)
def enqueue_events_for_site(site: str) -> None:
def log_exc():
frappe.logger("scheduler").error(f"Exception in Enqueue Events for Site {site}", exc_info=True)
try:
frappe.init(site)
frappe.connect()
if is_scheduler_inactive():
return
enqueue_events()
frappe.logger("scheduler").debug(f"Queued events for site {site}")
except Exception as e:
if frappe.db.is_access_denied(e):
frappe.logger("scheduler").debug(f"Access denied for site {site}")
log_exc()
finally:
frappe.destroy()
def enqueue_events() -> list[str] | None:
if schedule_jobs_based_on_activity():
enqueued_jobs = []
all_jobs = frappe.get_all("Scheduled Job Type", filters={"stopped": 0}, fields="*")
random.shuffle(all_jobs)
for job_type in all_jobs:
job_type = frappe.get_doc(doctype="Scheduled Job Type", **job_type)
try:
if job_type.enqueue():
enqueued_jobs.append(job_type.method)
except CroniterBadCronError:
frappe.logger("scheduler").error(
f"Invalid Job on {frappe.local.site} - {job_type.name}", exc_info=True
)
return enqueued_jobs
def is_scheduler_inactive(verbose=True) -> bool:
if frappe.local.conf.maintenance_mode:
if verbose:
cprint(f"{frappe.local.site}: Maintenance mode is ON")
return True
if frappe.local.conf.pause_scheduler:
if verbose:
cprint(f"{frappe.local.site}: frappe.conf.pause_scheduler is SET")
return True
if is_scheduler_disabled(verbose=verbose):
return True
return False
def is_scheduler_disabled(verbose=True) -> bool:
if frappe.conf.disable_scheduler:
if verbose:
cprint(f"{frappe.local.site}: frappe.conf.disable_scheduler is SET")
return True
scheduler_disabled = not frappe.utils.cint(
frappe.db.get_single_value("System Settings", "enable_scheduler")
)
if scheduler_disabled:
if verbose:
cprint(f"{frappe.local.site}: SystemSettings.enable_scheduler is UNSET")
return scheduler_disabled
def toggle_scheduler(enable):
frappe.db.set_single_value("System Settings", "enable_scheduler", int(enable))
def enable_scheduler():
toggle_scheduler(True)
def disable_scheduler():
toggle_scheduler(False)
@redis_cache(ttl=60 * 60)
def schedule_jobs_based_on_activity(check_time=None):
"""Return True for active sites as defined by `Activity Log`.
Also return True for inactive sites once every 24 hours based on `Scheduled Job Log`."""
if is_dormant(check_time=check_time):
# ensure last job is one day old
last_job_timestamp = _get_last_creation_timestamp("Scheduled Job Log")
if not last_job_timestamp:
return True
else:
if ((check_time or now_datetime()) - last_job_timestamp).total_seconds() >= 86400:
# one day is passed since jobs are run, so lets do this
return True
else:
# schedulers run in the last 24 hours, do nothing
return False
else:
# site active, lets run the jobs
return True
@redis_cache(ttl=60 * 60)
def is_dormant(check_time=None):
from frappe.utils.frappecloud import on_frappecloud
if frappe.conf.developer_mode or not on_frappecloud():
return False
threshold = cint(frappe.get_system_settings("dormant_days")) * 86400
if not threshold:
return False
last_activity = frappe.db.get_value(
"User", filters={}, fieldname="last_active", order_by="last_active desc"
)
if not last_activity:
return True
if ((check_time or now_datetime()) - last_activity).total_seconds() >= threshold:
return True
return False
def _get_last_creation_timestamp(doctype):
timestamp = frappe.db.get_value(doctype, filters={}, fieldname="creation", order_by="creation desc")
if timestamp:
return get_datetime(timestamp)
@frappe.whitelist()
def activate_scheduler():
from frappe.installer import update_site_config
frappe.only_for("Administrator")
if frappe.local.conf.maintenance_mode:
frappe.throw(frappe._("Scheduler can not be re-enabled when maintenance mode is active."))
if is_scheduler_disabled():
enable_scheduler()
if frappe.conf.pause_scheduler:
update_site_config("pause_scheduler", 0)
@frappe.whitelist()
def get_scheduler_status():
if is_scheduler_inactive():
return {"status": "inactive"}
return {"status": "active"}
def get_scheduler_tick() -> int:
conf = frappe.get_conf()
return cint(conf.scheduler_tick_interval) or DEFAULT_SCHEDULER_TICK