diff --git a/frappe/core/doctype/rq_job/__init__.py b/frappe/core/doctype/rq_job/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/frappe/core/doctype/rq_job/rq_job.js b/frappe/core/doctype/rq_job/rq_job.js new file mode 100644 index 0000000000..06ef3a3ad3 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.js @@ -0,0 +1,9 @@ +// Copyright (c) 2022, Frappe Technologies and contributors +// For license information, please see license.txt + +frappe.ui.form.on("RQ Job", { + refresh: function (frm) { + // Nothing in this form is supposed to be editable. + frm.disable_form(); + }, +}); diff --git a/frappe/core/doctype/rq_job/rq_job.json b/frappe/core/doctype/rq_job/rq_job.json new file mode 100644 index 0000000000..00ad6aa58e --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.json @@ -0,0 +1,152 @@ +{ + "actions": [], + "allow_copy": 1, + "autoname": "field:job_id", + "creation": "2022-09-10 16:19:37.934903", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "job_info_section", + "job_id", + "job_name", + "queue", + "timeout", + "column_break_5", + "arguments", + "job_status_section", + "status", + "time_taken", + "column_break_11", + "started_at", + "ended_at", + "exception_section", + "exc_info" + ], + "fields": [ + { + "fieldname": "queue", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Queue", + "options": "default\nshort\nlong" + }, + { + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Status", + "options": "queued\nstarted\nfinished\nfailed\ndeferred\nscheduled\ncanceled" + }, + { + "fieldname": "job_id", + "fieldtype": "Data", + "label": "Job ID", + "unique": 1 + }, + { + "fieldname": "exc_info", + "fieldtype": "Code", + "label": "Exception" + }, + { + "fieldname": "job_name", + "fieldtype": "Data", + "label": "Job Name" + }, + { + "fieldname": "arguments", + "fieldtype": "Code", + "label": "Arguments" + }, + { + "fieldname": "timeout", + "fieldtype": "Duration", + "label": "Timeout" + }, + { + "fieldname": "time_taken", + "fieldtype": "Duration", + "label": "Time Taken" + }, + { + "fieldname": "started_at", + "fieldtype": "Datetime", + "label": "Started At" + }, + { + "fieldname": "ended_at", + "fieldtype": "Datetime", + "label": "Ended At" + }, + { + "fieldname": "job_info_section", + "fieldtype": "Section Break", + "label": "Job Info" + }, + { + "fieldname": "job_status_section", + "fieldtype": "Section Break", + "label": "Job Status" + }, + { + "fieldname": "column_break_5", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_11", + "fieldtype": "Column Break" + }, + { + "fieldname": "exception_section", + "fieldtype": "Section Break" + } + ], + "in_create": 1, + "is_virtual": 1, + "links": [], + "modified": "2022-09-11 04:39:35.771198", + "modified_by": "Administrator", + "module": "Core", + "name": "RQ Job", + "naming_rule": "By fieldname", + "owner": "Administrator", + "permissions": [ + { + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1 + } + ], + "sort_field": "modified", + "sort_order": "DESC", + "states": [ + { + "color": "Yellow", + "title": "queued" + }, + { + "color": "Blue", + "title": "started" + }, + { + "color": "Red", + "title": "failed" + }, + { + "color": "Green", + "title": "finished" + }, + { + "color": "Orange", + "title": "cancelled" + } + ], + "title_field": "job_name" +} \ No newline at end of file diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py new file mode 100644 index 0000000000..8623824b82 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -0,0 +1,135 @@ +# Copyright (c) 2022, Frappe Technologies and contributors +# For license information, please see license.txt + +from rq.job import Job +from rq.queue import Queue + +import frappe +from frappe.model.document import Document +from frappe.utils import ( + cint, + compare, + convert_utc_to_user_timezone, + create_batch, + make_filter_dict, +) +from frappe.utils.background_jobs import get_queues, get_redis_conn + +QUEUES = ["default", "long", "short"] +JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"] + + +class RQJob(Document): + def load_from_db(self): + job = Job.fetch(self.name, connection=get_redis_conn()) + super(Document, self).__init__(serialize_job(job)) + + @staticmethod + def get_list(args): + + start = cint(args.get("start")) + page_length = cint(args.get("page_length")) + + order_desc = "desc" in args.get("order_by", "") + + matched_job_ids = RQJob.get_matching_job_ids(args) + + jobs = [] + for job_ids in create_batch(matched_job_ids, 100): + jobs.extend( + serialize_job(job) + for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()) + if job and for_current_site(job) + ) + if len(jobs) > start + page_length: + # we have fetched enough. This is inefficient but because of site filtering TINA + break + + return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length] + + @staticmethod + def get_matching_job_ids(args): + filters = make_filter_dict(args.get("filters")) + + queues = _eval_filters(filters.get("queue"), QUEUES) + statuses = _eval_filters(filters.get("status"), JOB_STATUSES) + + matched_job_ids = [] + for queue in get_queues(): + if not queue.name.endswith(tuple(queues)): + continue + for status in statuses: + matched_job_ids.extend(fetch_job_ids(queue, status)) + + return matched_job_ids + + @staticmethod + def get_count(args) -> int: + # Can not be implemented efficiently due to site filtering hence ignored. + return 0 + + # None of these methods apply to virtual job doctype, overriden for sanity. + @staticmethod + def get_stats(args): + return {} + + def db_insert(self, *args, **kwargs): + pass + + def db_update(self, *args, **kwargs): + pass + + def delete(self): + pass + + +def serialize_job(job: Job) -> frappe._dict: + if not for_current_site(job): + return frappe._dict() + + modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at + + return frappe._dict( + name=job.id, + job_id=job.id, + queue=job.origin.rsplit(":", 1)[1], + job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")), + status=job.get_status(refresh=job.get_status() == "queued"), + started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "", + ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "", + time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "", + exc_info=job.exc_info, + arguments=frappe.as_json(job.kwargs), + timeout=job.timeout, + creation=convert_utc_to_user_timezone(job.created_at), + modified=convert_utc_to_user_timezone(modified), + _comment_count=0, + ) + + +def for_current_site(job: Job) -> bool: + return job.kwargs.get("site") == frappe.local.site + + +def _eval_filters(filter, values: list[str]) -> list[str]: + if filter: + operator, operand = filter + return [val for val in values if compare(val, operator, operand)] + return values + + +def fetch_job_ids(queue: Queue, status: str) -> list[str | None]: + registry_map = { + "queued": queue, # self + "started": queue.started_job_registry, + "finished": queue.finished_job_registry, + "failed": queue.failed_job_registry, + "deferred": queue.deferred_job_registry, + "scheduled": queue.scheduled_job_registry, + } + + registry = registry_map.get(status) + if registry: + job_ids = registry.get_job_ids() + return [j for j in job_ids if j] + return [] diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py new file mode 100644 index 0000000000..e29c94cec2 --- /dev/null +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -0,0 +1,9 @@ +# Copyright (c) 2022, Frappe Technologies and Contributors +# See license.txt + +import frappe +from frappe.tests.utils import FrappeTestCase + + +class TestRQJob(FrappeTestCase): + pass diff --git a/frappe/core/doctype/rq_worker/rq_worker.json b/frappe/core/doctype/rq_worker/rq_worker.json index e44be0d6e3..d2e0a34bb6 100644 --- a/frappe/core/doctype/rq_worker/rq_worker.json +++ b/frappe/core/doctype/rq_worker/rq_worker.json @@ -106,7 +106,7 @@ "in_create": 1, "is_virtual": 1, "links": [], - "modified": "2022-09-10 16:09:14.720291", + "modified": "2022-09-11 04:38:30.606019", "modified_by": "Administrator", "module": "Core", "name": "RQ Worker", @@ -124,5 +124,14 @@ ], "sort_field": "modified", "sort_order": "DESC", - "states": [] -} + "states": [ + { + "color": "Blue", + "title": "idle" + }, + { + "color": "Yellow", + "title": "busy" + } + ] +} \ No newline at end of file diff --git a/frappe/core/doctype/rq_worker/rq_worker.py b/frappe/core/doctype/rq_worker/rq_worker.py index 7c8cca9de2..7141d1b693 100644 --- a/frappe/core/doctype/rq_worker/rq_worker.py +++ b/frappe/core/doctype/rq_worker/rq_worker.py @@ -62,4 +62,6 @@ def serialize_worker(worker: Worker) -> frappe._dict: failed_job_count=worker.failed_job_count, total_working_time=worker.total_working_time, _comment_count=0, + modified=convert_utc_to_user_timezone(worker.last_heartbeat), + creation=convert_utc_to_user_timezone(worker.birth_date), )