Merge pull request #11643 from Alchez/feat-remove-failed-queue

feat: Clear failed jobs queue
This commit is contained in:
mergify[bot] 2021-03-10 05:20:53 +00:00 committed by GitHub
commit 7eee5c1a35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 32 deletions

View file

@ -28,6 +28,16 @@ class BackgroundJobs {
}
});
// add a "Remove Failed Jobs button"
this.remove_failed_button = this.page.add_inner_button(__("Remove Failed Jobs"), () => {
frappe.call({
method: 'frappe.core.page.background_jobs.background_jobs.remove_failed_jobs',
callback: () => {
this.refresh_jobs();
}
});
});
$(frappe.render_template('background_jobs_outer')).appendTo(this.page.body);
this.content = $(this.page.body).find('.table-area');
}
@ -62,4 +72,4 @@ class BackgroundJobs {
}
});
}
}
}

View file

@ -1,58 +1,88 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import unicode_literals
import frappe
import json
from typing import TYPE_CHECKING, Dict, List
from rq import Queue, Worker
from frappe.utils.background_jobs import get_redis_conn
from frappe.utils import format_datetime, cint, convert_utc_to_user_timezone
from frappe.utils.scheduler import is_scheduler_inactive
from frappe import _
colors = {
import frappe
from frappe import _
from frappe.utils import convert_utc_to_user_timezone, format_datetime
from frappe.utils.background_jobs import get_redis_conn
from frappe.utils.scheduler import is_scheduler_inactive
if TYPE_CHECKING:
from rq.job import Job
JOB_COLORS = {
'queued': 'orange',
'failed': 'red',
'started': 'blue',
'finished': 'green'
}
@frappe.whitelist()
def get_info(show_failed=False):
def get_info(show_failed=False) -> List[Dict]:
if isinstance(show_failed, str):
show_failed = json.loads(show_failed)
conn = get_redis_conn()
queues = Queue.all(conn)
workers = Worker.all(conn)
jobs = []
def add_job(j, name):
if j.kwargs.get('site')==frappe.local.site:
jobs.append({
'job_name': j.kwargs.get('kwargs', {}).get('playbook_method') \
or j.kwargs.get('kwargs', {}).get('job_type') \
or str(j.kwargs.get('job_name')),
'status': j.get_status(), 'queue': name,
'creation': format_datetime(convert_utc_to_user_timezone(j.created_at)),
'color': colors[j.get_status()]
})
if j.exc_info:
jobs[-1]['exc_info'] = j.exc_info
def add_job(job: 'Job', name: str) -> None:
if job.kwargs.get('site') == frappe.local.site:
job_info = {
'job_name': job.kwargs.get('kwargs', {}).get('playbook_method')
or job.kwargs.get('kwargs', {}).get('job_type')
or str(job.kwargs.get('job_name')),
'status': job.get_status(),
'queue': name,
'creation': format_datetime(convert_utc_to_user_timezone(job.created_at)),
'color': JOB_COLORS[job.get_status()]
}
for w in workers:
j = w.get_current_job()
if j:
add_job(j, w.name)
if job.exc_info:
job_info['exc_info'] = job.exc_info
for q in queues:
if q.name != 'failed':
for j in q.get_jobs(): add_job(j, q.name)
jobs.append(job_info)
if cint(show_failed):
for q in queues:
if q.name == 'failed':
for j in q.get_jobs()[:10]: add_job(j, q.name)
# show worker jobs
for worker in workers:
job = worker.get_current_job()
if job:
add_job(job, worker.name)
for queue in queues:
# show active queued jobs
if queue.name != 'failed':
for job in queue.jobs:
add_job(job, queue.name)
# show failed jobs, if requested
if show_failed:
fail_registry = queue.failed_job_registry
for job_id in fail_registry.get_job_ids():
job = queue.fetch_job(job_id)
add_job(job, queue.name)
return jobs
@frappe.whitelist()
def remove_failed_jobs():
conn = get_redis_conn()
queues = Queue.all(conn)
for queue in queues:
fail_registry = queue.failed_job_registry
for job_id in fail_registry.get_job_ids():
job = queue.fetch_job(job_id)
fail_registry.remove(job, delete_job=True)
@frappe.whitelist()
def get_scheduler_status():
if is_scheduler_inactive():

View file

@ -0,0 +1,33 @@
import unittest
from rq import Queue
import frappe
from frappe.core.page.background_jobs.background_jobs import remove_failed_jobs
from frappe.utils.background_jobs import get_redis_conn
import time
class TestBackgroundJobs(unittest.TestCase):
def test_remove_failed_jobs(self):
frappe.enqueue(method="frappe.tests.test_background_jobs.fail_function", queue="short")
# wait for enqueued job to execute
time.sleep(2)
conn = get_redis_conn()
queues = Queue.all(conn)
for queue in queues:
if queue.name == "short":
fail_registry = queue.failed_job_registry
self.assertGreater(fail_registry.count, 0)
remove_failed_jobs()
for queue in queues:
if queue.name == "short":
fail_registry = queue.failed_job_registry
self.assertEqual(fail_registry.count, 0)
def fail_function():
return 1 / 0