feat: RQ job doctype

This commit is contained in:
Ankush Menat 2022-09-11 01:52:17 +05:30
parent 330bd08210
commit fc0ff7bd2e
7 changed files with 319 additions and 3 deletions

View file

View file

@ -0,0 +1,9 @@
// Copyright (c) 2022, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("RQ Job", {
refresh: function (frm) {
// Nothing in this form is supposed to be editable.
frm.disable_form();
},
});

View file

@ -0,0 +1,152 @@
{
"actions": [],
"allow_copy": 1,
"autoname": "field:job_id",
"creation": "2022-09-10 16:19:37.934903",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"job_info_section",
"job_id",
"job_name",
"queue",
"timeout",
"column_break_5",
"arguments",
"job_status_section",
"status",
"time_taken",
"column_break_11",
"started_at",
"ended_at",
"exception_section",
"exc_info"
],
"fields": [
{
"fieldname": "queue",
"fieldtype": "Select",
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Queue",
"options": "default\nshort\nlong"
},
{
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Status",
"options": "queued\nstarted\nfinished\nfailed\ndeferred\nscheduled\ncanceled"
},
{
"fieldname": "job_id",
"fieldtype": "Data",
"label": "Job ID",
"unique": 1
},
{
"fieldname": "exc_info",
"fieldtype": "Code",
"label": "Exception"
},
{
"fieldname": "job_name",
"fieldtype": "Data",
"label": "Job Name"
},
{
"fieldname": "arguments",
"fieldtype": "Code",
"label": "Arguments"
},
{
"fieldname": "timeout",
"fieldtype": "Duration",
"label": "Timeout"
},
{
"fieldname": "time_taken",
"fieldtype": "Duration",
"label": "Time Taken"
},
{
"fieldname": "started_at",
"fieldtype": "Datetime",
"label": "Started At"
},
{
"fieldname": "ended_at",
"fieldtype": "Datetime",
"label": "Ended At"
},
{
"fieldname": "job_info_section",
"fieldtype": "Section Break",
"label": "Job Info"
},
{
"fieldname": "job_status_section",
"fieldtype": "Section Break",
"label": "Job Status"
},
{
"fieldname": "column_break_5",
"fieldtype": "Column Break"
},
{
"fieldname": "column_break_11",
"fieldtype": "Column Break"
},
{
"fieldname": "exception_section",
"fieldtype": "Section Break"
}
],
"in_create": 1,
"is_virtual": 1,
"links": [],
"modified": "2022-09-11 04:39:35.771198",
"modified_by": "Administrator",
"module": "Core",
"name": "RQ Job",
"naming_rule": "By fieldname",
"owner": "Administrator",
"permissions": [
{
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1
}
],
"sort_field": "modified",
"sort_order": "DESC",
"states": [
{
"color": "Yellow",
"title": "queued"
},
{
"color": "Blue",
"title": "started"
},
{
"color": "Red",
"title": "failed"
},
{
"color": "Green",
"title": "finished"
},
{
"color": "Orange",
"title": "cancelled"
}
],
"title_field": "job_name"
}

View file

@ -0,0 +1,135 @@
# Copyright (c) 2022, Frappe Technologies and contributors
# For license information, please see license.txt
from rq.job import Job
from rq.queue import Queue
import frappe
from frappe.model.document import Document
from frappe.utils import (
cint,
compare,
convert_utc_to_user_timezone,
create_batch,
make_filter_dict,
)
from frappe.utils.background_jobs import get_queues, get_redis_conn
QUEUES = ["default", "long", "short"]
JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"]
class RQJob(Document):
def load_from_db(self):
job = Job.fetch(self.name, connection=get_redis_conn())
super(Document, self).__init__(serialize_job(job))
@staticmethod
def get_list(args):
start = cint(args.get("start"))
page_length = cint(args.get("page_length"))
order_desc = "desc" in args.get("order_by", "")
matched_job_ids = RQJob.get_matching_job_ids(args)
jobs = []
for job_ids in create_batch(matched_job_ids, 100):
jobs.extend(
serialize_job(job)
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn())
if job and for_current_site(job)
)
if len(jobs) > start + page_length:
# we have fetched enough. This is inefficient but because of site filtering TINA
break
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length]
@staticmethod
def get_matching_job_ids(args):
filters = make_filter_dict(args.get("filters"))
queues = _eval_filters(filters.get("queue"), QUEUES)
statuses = _eval_filters(filters.get("status"), JOB_STATUSES)
matched_job_ids = []
for queue in get_queues():
if not queue.name.endswith(tuple(queues)):
continue
for status in statuses:
matched_job_ids.extend(fetch_job_ids(queue, status))
return matched_job_ids
@staticmethod
def get_count(args) -> int:
# Can not be implemented efficiently due to site filtering hence ignored.
return 0
# None of these methods apply to virtual job doctype, overriden for sanity.
@staticmethod
def get_stats(args):
return {}
def db_insert(self, *args, **kwargs):
pass
def db_update(self, *args, **kwargs):
pass
def delete(self):
pass
def serialize_job(job: Job) -> frappe._dict:
if not for_current_site(job):
return frappe._dict()
modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at
return frappe._dict(
name=job.id,
job_id=job.id,
queue=job.origin.rsplit(":", 1)[1],
job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")),
status=job.get_status(refresh=job.get_status() == "queued"),
started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "",
ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "",
time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "",
exc_info=job.exc_info,
arguments=frappe.as_json(job.kwargs),
timeout=job.timeout,
creation=convert_utc_to_user_timezone(job.created_at),
modified=convert_utc_to_user_timezone(modified),
_comment_count=0,
)
def for_current_site(job: Job) -> bool:
return job.kwargs.get("site") == frappe.local.site
def _eval_filters(filter, values: list[str]) -> list[str]:
if filter:
operator, operand = filter
return [val for val in values if compare(val, operator, operand)]
return values
def fetch_job_ids(queue: Queue, status: str) -> list[str | None]:
registry_map = {
"queued": queue, # self
"started": queue.started_job_registry,
"finished": queue.finished_job_registry,
"failed": queue.failed_job_registry,
"deferred": queue.deferred_job_registry,
"scheduled": queue.scheduled_job_registry,
}
registry = registry_map.get(status)
if registry:
job_ids = registry.get_job_ids()
return [j for j in job_ids if j]
return []

View file

@ -0,0 +1,9 @@
# Copyright (c) 2022, Frappe Technologies and Contributors
# See license.txt
import frappe
from frappe.tests.utils import FrappeTestCase
class TestRQJob(FrappeTestCase):
pass

View file

@ -106,7 +106,7 @@
"in_create": 1,
"is_virtual": 1,
"links": [],
"modified": "2022-09-10 16:09:14.720291",
"modified": "2022-09-11 04:38:30.606019",
"modified_by": "Administrator",
"module": "Core",
"name": "RQ Worker",
@ -124,5 +124,14 @@
],
"sort_field": "modified",
"sort_order": "DESC",
"states": []
}
"states": [
{
"color": "Blue",
"title": "idle"
},
{
"color": "Yellow",
"title": "busy"
}
]
}

View file

@ -62,4 +62,6 @@ def serialize_worker(worker: Worker) -> frappe._dict:
failed_job_count=worker.failed_job_count,
total_working_time=worker.total_working_time,
_comment_count=0,
modified=convert_utc_to_user_timezone(worker.last_heartbeat),
creation=convert_utc_to_user_timezone(worker.birth_date),
)