diff --git a/frappe/core/doctype/data_import/data_import.py b/frappe/core/doctype/data_import/data_import.py index 45fdad8290..b03f90584f 100644 --- a/frappe/core/doctype/data_import/data_import.py +++ b/frappe/core/doctype/data_import/data_import.py @@ -7,7 +7,7 @@ import frappe from frappe import _ from frappe.core.doctype.data_import.exporter import Exporter from frappe.core.doctype.data_import.importer import Importer -from frappe.core.doctype.rq_job.rq_job import get_all_queued_job +from frappe.core.doctype.rq_job.rq_job import get_all_queued_jobs from frappe.model.document import Document from frappe.modules.import_file import import_file_by_path from frappe.utils.background_jobs import enqueue @@ -65,7 +65,7 @@ class DataImport(Document): if is_scheduler_inactive() and not frappe.flags.in_test: frappe.throw(_("Scheduler is inactive. Cannot import data."), title=_("Scheduler Inactive")) - enqueued_jobs = {job.get("job_name", "") for job in get_all_queued_job()} + enqueued_jobs = {job.get("job_name", "") for job in get_all_queued_jobs()} if self.name not in enqueued_jobs: enqueue( diff --git a/frappe/core/doctype/rq_job/rq_job.js b/frappe/core/doctype/rq_job/rq_job.js index 06ef3a3ad3..3f7a1a15b7 100644 --- a/frappe/core/doctype/rq_job/rq_job.js +++ b/frappe/core/doctype/rq_job/rq_job.js @@ -5,5 +5,26 @@ frappe.ui.form.on("RQ Job", { refresh: function (frm) { // Nothing in this form is supposed to be editable. frm.disable_form(); + frm.dashboard.set_headline_alert( + "This is a virtual doctype and data is cleared periodically." + ); + + if (["started", "queued"].includes(frm.doc.status)) { + frm.add_custom_button(__("Force Stop job"), () => { + frappe.confirm( + "This will terminate the job immediately and might be dangerous, are you sure? ", + () => { + frappe + .xcall("frappe.core.doctype.rq_job.rq_job.stop_job", { + job_id: frm.doc.name, + }) + .then((r) => { + frappe.show_alert("Job Stopped Succefully"); + frm.reload_doc(); + }); + } + ); + }); + } }, }); diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index 92bab4f521..bceb93c0a4 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -1,6 +1,9 @@ # Copyright (c) 2022, Frappe Technologies and contributors # For license information, please see license.txt +import functools + +from rq.command import send_stop_job_command from rq.job import Job from rq.queue import Queue @@ -19,6 +22,19 @@ QUEUES = ["default", "long", "short"] JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"] +def check_permissions(method): + @functools.wraps(method) + def wrapper(*args, **kwargs): + frappe.only_for("System Manager") + job = args[0].job + if not for_current_site(job): + raise frappe.PermissionError + + return method(*args, **kwargs) + + return wrapper + + class RQJob(Document): def load_from_db(self): job = Job.fetch(self.name, connection=get_redis_conn()) @@ -70,10 +86,13 @@ class RQJob(Document): return matched_job_ids + @check_permissions def delete(self): - frappe.only_for("System Manager") - if self._job_obj and for_current_site(self._job_obj): - self._job_obj.delete() + self.job.delete() + + @check_permissions + def stop_job(self): + send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id) @staticmethod def get_count(args) -> int: @@ -93,9 +112,6 @@ class RQJob(Document): def serialize_job(job: Job) -> frappe._dict: - if not for_current_site(job): - return frappe._dict() - modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at return frappe._dict( @@ -103,7 +119,7 @@ def serialize_job(job: Job) -> frappe._dict: job_id=job.id, queue=job.origin.rsplit(":", 1)[1], job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")), - status=job.get_status(refresh=job.get_status() == "queued"), + status=job.get_status(), started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "", ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "", time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "", @@ -127,7 +143,7 @@ def _eval_filters(filter, values: list[str]) -> list[str]: return values -def fetch_job_ids(queue: Queue, status: str) -> list[str | None]: +def fetch_job_ids(queue: Queue, status: str) -> list[str]: registry_map = { "queued": queue, # self "started": queue.started_job_registry, @@ -157,9 +173,14 @@ def remove_failed_jobs(): fail_registry.remove(job, delete_job=True) -def get_all_queued_job(): +def get_all_queued_jobs(): jobs = [] for q in get_queues(): jobs.extend(q.get_jobs()) return [job for job in jobs if for_current_site(job)] + + +@frappe.whitelist() +def stop_job(job_id): + frappe.get_doc("RQ Job", job_id).stop_job() diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index c9dac27f47..4a51f54c0d 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -8,7 +8,7 @@ from rq import exceptions as rq_exc from rq.job import Job import frappe -from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs +from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job from frappe.tests.utils import FrappeTestCase, timeout @@ -21,7 +21,7 @@ class TestRQJob(FrappeTestCase): if wait: while True: if job.is_queued or job.is_started: - time.sleep(0.5) + time.sleep(0.2) else: break self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status) @@ -65,10 +65,11 @@ class TestRQJob(FrappeTestCase): self.assertGreaterEqual(len(non_failed_jobs), 1) # Create a slow job and check if it's stuck in "Started" - job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=10) + job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000) time.sleep(3) self.check_status(job, "started", wait=False) - self.check_status(job, "finished", wait=True) + stop_job(job_id=job.id) + self.check_status(job, "stopped") def test_delete_doc(self): job = frappe.enqueue(method=self.BG_JOB, queue="short") diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 48f26574f4..aced7d4994 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -9,6 +9,7 @@ from uuid import uuid4 import redis from redis.exceptions import BusyLoadingError, ConnectionError from rq import Connection, Queue, Worker +from rq.command import send_stop_job_command from rq.logutils import setup_loghandlers from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed