diff --git a/frappe/core/page/background_jobs/background_jobs.js b/frappe/core/page/background_jobs/background_jobs.js index cabe91375f..0b4d6792dc 100644 --- a/frappe/core/page/background_jobs/background_jobs.js +++ b/frappe/core/page/background_jobs/background_jobs.js @@ -28,6 +28,16 @@ class BackgroundJobs { } }); + // add a "Remove Failed Jobs button" + this.remove_failed_button = this.page.add_inner_button(__("Remove Failed Jobs"), () => { + frappe.call({ + method: 'frappe.core.page.background_jobs.background_jobs.remove_failed_jobs', + callback: () => { + this.refresh_jobs(); + } + }); + }); + $(frappe.render_template('background_jobs_outer')).appendTo(this.page.body); this.content = $(this.page.body).find('.table-area'); } @@ -62,4 +72,4 @@ class BackgroundJobs { } }); } -} \ No newline at end of file +} diff --git a/frappe/core/page/background_jobs/background_jobs.py b/frappe/core/page/background_jobs/background_jobs.py index 4a94de4ace..847b23bd3e 100644 --- a/frappe/core/page/background_jobs/background_jobs.py +++ b/frappe/core/page/background_jobs/background_jobs.py @@ -1,58 +1,88 @@ # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # MIT License. See license.txt -from __future__ import unicode_literals -import frappe +import json +from typing import TYPE_CHECKING, Dict, List from rq import Queue, Worker -from frappe.utils.background_jobs import get_redis_conn -from frappe.utils import format_datetime, cint, convert_utc_to_user_timezone -from frappe.utils.scheduler import is_scheduler_inactive -from frappe import _ -colors = { +import frappe +from frappe import _ +from frappe.utils import convert_utc_to_user_timezone, format_datetime +from frappe.utils.background_jobs import get_redis_conn +from frappe.utils.scheduler import is_scheduler_inactive + +if TYPE_CHECKING: + from rq.job import Job + +JOB_COLORS = { 'queued': 'orange', 'failed': 'red', 'started': 'blue', 'finished': 'green' } + @frappe.whitelist() -def get_info(show_failed=False): +def get_info(show_failed=False) -> List[Dict]: + if isinstance(show_failed, str): + show_failed = json.loads(show_failed) + conn = get_redis_conn() queues = Queue.all(conn) workers = Worker.all(conn) jobs = [] - def add_job(j, name): - if j.kwargs.get('site')==frappe.local.site: - jobs.append({ - 'job_name': j.kwargs.get('kwargs', {}).get('playbook_method') \ - or j.kwargs.get('kwargs', {}).get('job_type') \ - or str(j.kwargs.get('job_name')), - 'status': j.get_status(), 'queue': name, - 'creation': format_datetime(convert_utc_to_user_timezone(j.created_at)), - 'color': colors[j.get_status()] - }) - if j.exc_info: - jobs[-1]['exc_info'] = j.exc_info + def add_job(job: 'Job', name: str) -> None: + if job.kwargs.get('site') == frappe.local.site: + job_info = { + 'job_name': job.kwargs.get('kwargs', {}).get('playbook_method') + or job.kwargs.get('kwargs', {}).get('job_type') + or str(job.kwargs.get('job_name')), + 'status': job.get_status(), + 'queue': name, + 'creation': format_datetime(convert_utc_to_user_timezone(job.created_at)), + 'color': JOB_COLORS[job.get_status()] + } - for w in workers: - j = w.get_current_job() - if j: - add_job(j, w.name) + if job.exc_info: + job_info['exc_info'] = job.exc_info - for q in queues: - if q.name != 'failed': - for j in q.get_jobs(): add_job(j, q.name) + jobs.append(job_info) - if cint(show_failed): - for q in queues: - if q.name == 'failed': - for j in q.get_jobs()[:10]: add_job(j, q.name) + # show worker jobs + for worker in workers: + job = worker.get_current_job() + if job: + add_job(job, worker.name) + + for queue in queues: + # show active queued jobs + if queue.name != 'failed': + for job in queue.jobs: + add_job(job, queue.name) + + # show failed jobs, if requested + if show_failed: + fail_registry = queue.failed_job_registry + for job_id in fail_registry.get_job_ids(): + job = queue.fetch_job(job_id) + add_job(job, queue.name) return jobs + +@frappe.whitelist() +def remove_failed_jobs(): + conn = get_redis_conn() + queues = Queue.all(conn) + for queue in queues: + fail_registry = queue.failed_job_registry + for job_id in fail_registry.get_job_ids(): + job = queue.fetch_job(job_id) + fail_registry.remove(job, delete_job=True) + + @frappe.whitelist() def get_scheduler_status(): if is_scheduler_inactive(): diff --git a/frappe/tests/test_background_jobs.py b/frappe/tests/test_background_jobs.py new file mode 100644 index 0000000000..88783f14f1 --- /dev/null +++ b/frappe/tests/test_background_jobs.py @@ -0,0 +1,33 @@ +import unittest + +from rq import Queue + +import frappe +from frappe.core.page.background_jobs.background_jobs import remove_failed_jobs +from frappe.utils.background_jobs import get_redis_conn +import time + + +class TestBackgroundJobs(unittest.TestCase): + def test_remove_failed_jobs(self): + frappe.enqueue(method="frappe.tests.test_background_jobs.fail_function", queue="short") + # wait for enqueued job to execute + time.sleep(2) + conn = get_redis_conn() + queues = Queue.all(conn) + + for queue in queues: + if queue.name == "short": + fail_registry = queue.failed_job_registry + self.assertGreater(fail_registry.count, 0) + + remove_failed_jobs() + + for queue in queues: + if queue.name == "short": + fail_registry = queue.failed_job_registry + self.assertEqual(fail_registry.count, 0) + + +def fail_function(): + return 1 / 0