refactor: consider multi-queue consumption

This commit is contained in:
Ankush Menat 2022-11-24 14:59:05 +05:30
parent 40b2929c0d
commit 0ebd3945ff
3 changed files with 11 additions and 6 deletions

View file

@ -76,13 +76,13 @@
{
"fieldname": "queue",
"fieldtype": "Data",
"label": "Queue"
"label": "Queue(s)"
},
{
"fieldname": "queue_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Queue Type",
"label": "Queue Type(s)",
"options": "default\nlong\nshort"
},
{
@ -113,7 +113,7 @@
"in_create": 1,
"is_virtual": 1,
"links": [],
"modified": "2022-11-14 15:35:32.786012",
"modified": "2022-11-24 14:50:48.511706",
"modified_by": "Administrator",
"module": "Core",
"name": "RQ Worker",

View file

@ -16,8 +16,10 @@ class RQWorker(Document):
def load_from_db(self):
all_workers = get_workers()
worker = [w for w in all_workers if w.pid == cint(self.name)][0]
d = serialize_worker(worker)
workers = [w for w in all_workers if w.pid == cint(self.name)]
if not workers:
raise frappe.DoesNotExistError
d = serialize_worker(workers[0])
super(Document, self).__init__(d)
@ -53,10 +55,11 @@ class RQWorker(Document):
def serialize_worker(worker: Worker) -> frappe._dict:
queue = ", ".join(worker.queue_names())
queue_types = ",".join(q.rsplit(":", 1)[1] for q in worker.queue_names())
return frappe._dict(
name=worker.pid,
queue=queue,
queue_type=queue.rsplit(":", 1)[1],
queue_type=queue_types,
worker_name=worker.name,
status=worker.get_state(),
pid=worker.pid,

View file

@ -377,6 +377,8 @@ def generate_qname(qtype: str) -> str:
qnames are useful to define namespaces of customers.
"""
if isinstance(qtype, list):
qtype = ",".join(qtype)
return f"{get_bench_id()}:{qtype}"