feat: kill a running background job -f

Also refactor permission checks
This commit is contained in:
Ankush Menat 2022-09-11 18:53:08 +05:30
parent 2267d40420
commit 11936a76df
5 changed files with 59 additions and 15 deletions

View file

@ -7,7 +7,7 @@ import frappe
from frappe import _
from frappe.core.doctype.data_import.exporter import Exporter
from frappe.core.doctype.data_import.importer import Importer
from frappe.core.doctype.rq_job.rq_job import get_all_queued_job
from frappe.core.doctype.rq_job.rq_job import get_all_queued_jobs
from frappe.model.document import Document
from frappe.modules.import_file import import_file_by_path
from frappe.utils.background_jobs import enqueue
@ -65,7 +65,7 @@ class DataImport(Document):
if is_scheduler_inactive() and not frappe.flags.in_test:
frappe.throw(_("Scheduler is inactive. Cannot import data."), title=_("Scheduler Inactive"))
enqueued_jobs = {job.get("job_name", "") for job in get_all_queued_job()}
enqueued_jobs = {job.get("job_name", "") for job in get_all_queued_jobs()}
if self.name not in enqueued_jobs:
enqueue(

View file

@ -5,5 +5,26 @@ frappe.ui.form.on("RQ Job", {
refresh: function (frm) {
// Nothing in this form is supposed to be editable.
frm.disable_form();
frm.dashboard.set_headline_alert(
"This is a virtual doctype and data is cleared periodically."
);
if (["started", "queued"].includes(frm.doc.status)) {
frm.add_custom_button(__("Force Stop job"), () => {
frappe.confirm(
"This will terminate the job immediately and might be dangerous, are you sure? ",
() => {
frappe
.xcall("frappe.core.doctype.rq_job.rq_job.stop_job", {
job_id: frm.doc.name,
})
.then((r) => {
frappe.show_alert("Job Stopped Succefully");
frm.reload_doc();
});
}
);
});
}
},
});

View file

@ -1,6 +1,9 @@
# Copyright (c) 2022, Frappe Technologies and contributors
# For license information, please see license.txt
import functools
from rq.command import send_stop_job_command
from rq.job import Job
from rq.queue import Queue
@ -19,6 +22,19 @@ QUEUES = ["default", "long", "short"]
JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"]
def check_permissions(method):
@functools.wraps(method)
def wrapper(*args, **kwargs):
frappe.only_for("System Manager")
job = args[0].job
if not for_current_site(job):
raise frappe.PermissionError
return method(*args, **kwargs)
return wrapper
class RQJob(Document):
def load_from_db(self):
job = Job.fetch(self.name, connection=get_redis_conn())
@ -70,10 +86,13 @@ class RQJob(Document):
return matched_job_ids
@check_permissions
def delete(self):
frappe.only_for("System Manager")
if self._job_obj and for_current_site(self._job_obj):
self._job_obj.delete()
self.job.delete()
@check_permissions
def stop_job(self):
send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id)
@staticmethod
def get_count(args) -> int:
@ -93,9 +112,6 @@ class RQJob(Document):
def serialize_job(job: Job) -> frappe._dict:
if not for_current_site(job):
return frappe._dict()
modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at
return frappe._dict(
@ -103,7 +119,7 @@ def serialize_job(job: Job) -> frappe._dict:
job_id=job.id,
queue=job.origin.rsplit(":", 1)[1],
job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")),
status=job.get_status(refresh=job.get_status() == "queued"),
status=job.get_status(),
started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "",
ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "",
time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "",
@ -127,7 +143,7 @@ def _eval_filters(filter, values: list[str]) -> list[str]:
return values
def fetch_job_ids(queue: Queue, status: str) -> list[str | None]:
def fetch_job_ids(queue: Queue, status: str) -> list[str]:
registry_map = {
"queued": queue, # self
"started": queue.started_job_registry,
@ -157,9 +173,14 @@ def remove_failed_jobs():
fail_registry.remove(job, delete_job=True)
def get_all_queued_job():
def get_all_queued_jobs():
jobs = []
for q in get_queues():
jobs.extend(q.get_jobs())
return [job for job in jobs if for_current_site(job)]
@frappe.whitelist()
def stop_job(job_id):
frappe.get_doc("RQ Job", job_id).stop_job()

View file

@ -8,7 +8,7 @@ from rq import exceptions as rq_exc
from rq.job import Job
import frappe
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job
from frappe.tests.utils import FrappeTestCase, timeout
@ -21,7 +21,7 @@ class TestRQJob(FrappeTestCase):
if wait:
while True:
if job.is_queued or job.is_started:
time.sleep(0.5)
time.sleep(0.2)
else:
break
self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status)
@ -65,10 +65,11 @@ class TestRQJob(FrappeTestCase):
self.assertGreaterEqual(len(non_failed_jobs), 1)
# Create a slow job and check if it's stuck in "Started"
job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=10)
job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000)
time.sleep(3)
self.check_status(job, "started", wait=False)
self.check_status(job, "finished", wait=True)
stop_job(job_id=job.id)
self.check_status(job, "stopped")
def test_delete_doc(self):
job = frappe.enqueue(method=self.BG_JOB, queue="short")

View file

@ -9,6 +9,7 @@ from uuid import uuid4
import redis
from redis.exceptions import BusyLoadingError, ConnectionError
from rq import Connection, Queue, Worker
from rq.command import send_stop_job_command
from rq.logutils import setup_loghandlers
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed