seitime-frappe/frappe/commands/scheduler.py

279 lines
7.7 KiB
Python
Executable file

import sys
import click
import frappe
from frappe.commands import get_site, pass_context
from frappe.exceptions import SiteNotSpecifiedError
from frappe.utils.bench_helper import CliCtxObj
@click.command("trigger-scheduler-event", help="Trigger a scheduler event")
@click.argument("event")
@pass_context
def trigger_scheduler_event(context: CliCtxObj, event):
import frappe.utils.scheduler
exit_code = 0
for site in context.sites:
try:
frappe.init(site)
frappe.connect()
try:
frappe.get_doc("Scheduled Job Type", {"method": event}).execute()
except frappe.DoesNotExistError:
click.secho(f"Event {event} does not exist!", fg="red")
exit_code = 1
finally:
frappe.destroy()
if not context.sites:
raise SiteNotSpecifiedError
sys.exit(exit_code)
@click.command("enable-scheduler")
@pass_context
def enable_scheduler(context: CliCtxObj):
"Enable scheduler"
import frappe.utils.scheduler
for site in context.sites:
try:
frappe.init(site)
frappe.connect()
frappe.utils.scheduler.enable_scheduler()
frappe.db.commit()
print("Enabled for", site)
finally:
frappe.destroy()
if not context.sites:
raise SiteNotSpecifiedError
@click.command("disable-scheduler")
@pass_context
def disable_scheduler(context: CliCtxObj):
"Disable scheduler"
import frappe.utils.scheduler
for site in context.sites:
try:
frappe.init(site)
frappe.connect()
frappe.utils.scheduler.disable_scheduler()
frappe.db.commit()
print("Disabled for", site)
finally:
frappe.destroy()
if not context.sites:
raise SiteNotSpecifiedError
@click.command("scheduler")
@click.option("--site", help="site name")
@click.argument("state", type=click.Choice(["pause", "resume", "disable", "enable", "status"]))
@click.option("--format", "-f", default="text", type=click.Choice(["json", "text"]), help="Output format")
@click.option("--verbose", "-v", is_flag=True, help="Verbose output")
@pass_context
def scheduler(context: CliCtxObj, state: str, format: str, verbose: bool = False, site: str | None = None):
"""Control scheduler state."""
import frappe
from frappe.utils.scheduler import is_scheduler_inactive, toggle_scheduler
site = site or get_site(context)
output = {
"text": "Scheduler is {status} for site {site}",
"json": '{{"status": "{status}", "site": "{site}"}}',
}
with frappe.init_site(site=site):
match state:
case "status":
frappe.connect()
status = "disabled" if is_scheduler_inactive(verbose=verbose) else "enabled"
return print(output[format].format(status=status, site=site))
case "pause" | "resume":
from frappe.installer import update_site_config
update_site_config("pause_scheduler", state == "pause")
case "enable" | "disable":
frappe.connect()
toggle_scheduler(state == "enable")
frappe.db.commit()
print(output[format].format(status=f"{state}d", site=site))
@click.command("set-maintenance-mode")
@click.option("--site", help="site name")
@click.argument("state", type=click.Choice(["on", "off"]))
@pass_context
def set_maintenance_mode(context: CliCtxObj, state, site=None):
"""Put the site in maintenance mode for upgrades."""
from frappe.installer import update_site_config
if not site:
site = get_site(context)
try:
frappe.init(site)
update_site_config("maintenance_mode", 1 if (state == "on") else 0)
finally:
frappe.destroy()
@click.command("doctor") # Passing context always gets a site and if there is no use site it breaks
@click.option("--site", help="site name")
@pass_context
def doctor(context: CliCtxObj, site=None):
"Get diagnostic info about background workers"
from frappe.utils.doctor import doctor as _doctor
if not site:
site = get_site(context, raise_err=False)
return _doctor(site=site)
@click.command("show-pending-jobs")
@click.option("--site", help="site name")
@pass_context
def show_pending_jobs(context: CliCtxObj, site=None):
"Get diagnostic info about background jobs"
from frappe.utils.doctor import pending_jobs as _pending_jobs
if not site:
site = get_site(context)
with frappe.init_site(site):
pending_jobs = _pending_jobs(site=site)
return pending_jobs
@click.command("purge-jobs")
@click.option("--site", help="site name")
@click.option("--queue", default=None, help='one of "low", "default", "high')
@click.option(
"--event",
default=None,
help='one of "all", "weekly", "monthly", "hourly", "daily", "weekly_long", "daily_long"',
)
def purge_jobs(site=None, queue=None, event=None):
"Purge any pending periodic tasks, if event option is not given, it will purge everything for the site"
from frappe.utils.doctor import purge_pending_jobs
frappe.init(site or "")
count = purge_pending_jobs(event=event, site=site, queue=queue)
print(f"Purged {count} jobs")
@click.command("schedule")
def start_scheduler():
"""Start scheduler process which is responsible for enqueueing the scheduled job types."""
import time
from frappe.utils.scheduler import start_scheduler
time.sleep(0.5) # Delayed start. TODO: find better way to handle this.
start_scheduler()
@click.command("worker")
@click.option(
"--queue",
type=str,
help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.",
)
@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs")
@click.option("-u", "--rq-username", default=None, help="Redis ACL user")
@click.option("-p", "--rq-password", default=None, help="Redis ACL user password")
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
@click.option(
"--strategy",
required=False,
type=click.Choice(["round_robin", "random"]),
help="Dequeuing strategy to use",
)
def start_worker(queue, quiet=False, rq_username=None, rq_password=None, burst=False, strategy=None):
"""Start a background worker"""
from frappe.utils.background_jobs import start_worker
start_worker(
queue,
quiet=quiet,
rq_username=rq_username,
rq_password=rq_password,
burst=burst,
strategy=strategy,
)
@click.command("worker-pool")
@click.option(
"--queue",
type=str,
help="Queue to consume from. Multiple queues can be specified using comma-separated string. If not specified all queues are consumed.",
)
@click.option("--num-workers", type=int, default=2, help="Number of workers to spawn in pool.")
@click.option("--quiet", is_flag=True, default=False, help="Hide Log Outputs")
@click.option("--burst", is_flag=True, default=False, help="Run Worker in Burst mode.")
def start_worker_pool(queue, quiet=False, num_workers=2, burst=False):
"""Start a pool of background workers"""
from frappe.utils.background_jobs import start_worker_pool
start_worker_pool(queue=queue, quiet=quiet, burst=burst, num_workers=num_workers)
@click.command("ready-for-migration")
@click.option("--site", help="site name")
@pass_context
def ready_for_migration(context: CliCtxObj, site=None):
import time
from frappe.utils.doctor import any_job_pending
if not site:
site = get_site(context)
try:
frappe.init(site)
pending_jobs = False
# HACK: Check at least 3 times, 1 second apart.
# Rare edge case: Scheduler hasn't seen 'maintenance_mode=1` yet
# and takes more than 3 second to schedule.
for _ in range(3):
pending_jobs |= any_job_pending(site=site)
time.sleep(1)
if pending_jobs:
print(f"NOT READY for migration: site {site} has pending background jobs")
sys.exit(1)
else:
print(f"READY for migration: site {site} does not have any background jobs")
return 0
finally:
frappe.destroy()
commands = [
disable_scheduler,
doctor,
enable_scheduler,
purge_jobs,
ready_for_migration,
scheduler,
set_maintenance_mode,
show_pending_jobs,
start_scheduler,
start_worker,
start_worker_pool,
trigger_scheduler_event,
]