From 1668ba7d9eb7aa4dcff3233e95195a8720cec162 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 16:35:10 +0530 Subject: [PATCH 1/6] feat: Namespace all RQ jobs to site --- frappe/utils/background_jobs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 7de7ef6692..8254a6fb87 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -84,9 +84,8 @@ def enqueue( # To handle older implementations is_async = kwargs.pop("async", is_async) - if job_id: - # namespace job ids to sites - job_id = create_job_id(job_id) + # namespace job ids to sites + job_id = create_job_id(job_id) if job_name: deprecation_warning("Using enqueue with `job_name` is deprecated, use `job_id` instead.") @@ -481,6 +480,9 @@ def test_job(s): def create_job_id(job_id: str) -> str: """Generate unique job id for deduplication""" + + if not job_id: + job_id = str(uuid4()) return f"{frappe.local.site}::{job_id}" From 1092eef7bd8b71e741109db6f835af30af22a6c0 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 16:42:37 +0530 Subject: [PATCH 2/6] perf: faster pending jobs check --- frappe/commands/scheduler.py | 4 ++-- frappe/tests/test_commands.py | 15 +++++++++++++++ frappe/utils/doctor.py | 9 +++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/frappe/commands/scheduler.py b/frappe/commands/scheduler.py index 6af3a2403e..5d453e3568 100755 --- a/frappe/commands/scheduler.py +++ b/frappe/commands/scheduler.py @@ -240,14 +240,14 @@ def start_worker_pool(queue, quiet=False, num_workers=2, burst=False): @click.option("--site", help="site name") @pass_context def ready_for_migration(context, site=None): - from frappe.utils.doctor import get_pending_jobs + from frappe.utils.doctor import any_job_pending if not site: site = get_site(context) try: frappe.init(site=site) - pending_jobs = get_pending_jobs(site=site) + pending_jobs = any_job_pending(site=site) if pending_jobs: print(f"NOT READY for migration: site {site} has pending background jobs") diff --git a/frappe/tests/test_commands.py b/frappe/tests/test_commands.py index 7bda577bb6..cdf93a1870 100644 --- a/frappe/tests/test_commands.py +++ b/frappe/tests/test_commands.py @@ -20,9 +20,11 @@ from unittest.mock import patch import click from click import Command from click.testing import CliRunner, Result +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed # imports - module imports import frappe +import frappe.commands.scheduler import frappe.commands.site import frappe.commands.utils import frappe.recorder @@ -760,6 +762,19 @@ class TestBenchBuild(BaseTestCommands): ) +class TestSchedulerUtils(BaseTestCommands): + # Retry just in case there are stuck queued jobs + @retry( + retry=retry_if_exception_type(AssertionError), + stop=stop_after_attempt(3), + wait=wait_fixed(3), + reraise=True, + ) + def test_ready_for_migrate(self): + with cli(frappe.commands.scheduler.ready_for_migration) as result: + self.assertEqual(result.exit_code, 0) + + class TestCommandUtils(FrappeTestCase): def test_bench_helper(self): from frappe.utils.bench_helper import get_app_groups diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index 5b12a01990..002fd8b154 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -79,6 +79,15 @@ def get_pending_jobs(site=None): return jobs_per_queue +def any_job_pending(site: str) -> bool: + for queue in get_queue_list(): + q = get_queue(queue) + for job_id in q.get_job_ids(): + if job_id.startswith(site): + return True + return False + + def check_number_of_workers(): return len(get_workers()) From 3ae2d19073598afad874344d35813b9937916889 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 16:56:59 +0530 Subject: [PATCH 3/6] perf: efficient RQ jobs filters Assuming site's job start with site prefix it's much easier to filter jobs by looking at job IDs instead of fetching entire job in memory. --- frappe/core/doctype/rq_job/rq_job.py | 37 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index 391ccd8dfd..9a67fae586 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -63,23 +63,15 @@ class RQJob(Document): order_desc = "desc" in args.get("order_by", "") - matched_job_ids = RQJob.get_matching_job_ids(args) + matched_job_ids = RQJob.get_matching_job_ids(args)[start : start + page_length] - jobs = [] - for job_ids in create_batch(matched_job_ids, 100): - jobs.extend( - serialize_job(job) - for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()) - if job and for_current_site(job) - ) - if len(jobs) > start + page_length: - # we have fetched enough. This is inefficient but because of site filtering TINA - break + conn = get_redis_conn() + jobs = [serialize_job(job) for job in Job.fetch_many(job_ids=matched_job_ids, connection=conn)] - return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length] + return sorted(jobs, key=lambda j: j.modified, reverse=order_desc) @staticmethod - def get_matching_job_ids(args): + def get_matching_job_ids(args) -> list[str]: filters = make_filter_dict(args.get("filters")) queues = _eval_filters(filters.get("queue"), QUEUES) @@ -92,7 +84,7 @@ class RQJob(Document): for status in statuses: matched_job_ids.extend(fetch_job_ids(queue, status)) - return matched_job_ids + return filter_current_site_jobs(matched_job_ids) @check_permissions def delete(self): @@ -155,6 +147,12 @@ def for_current_site(job: Job) -> bool: return job.kwargs.get("site") == frappe.local.site +def filter_current_site_jobs(job_ids: list[str]) -> list[str]: + site = frappe.local.site + + return [j for j in job_ids if j.startswith(site)] + + def _eval_filters(filter, values: list[str]) -> list[str]: if filter: operator, operand = filter @@ -186,10 +184,13 @@ def remove_failed_jobs(): frappe.only_for("System Manager") for queue in get_queues(): fail_registry = queue.failed_job_registry - for job_ids in create_batch(fail_registry.get_job_ids(), 100): - for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()): - if job and for_current_site(job): - fail_registry.remove(job, delete_job=True) + failed_jobs = filter_current_site_jobs(fail_registry.get_job_ids()) + + # Delete in batches to avoid loading too many things in memory + conn = get_redis_conn() + for job_ids in create_batch(failed_jobs, 100): + for job in Job.fetch_many(job_ids=job_ids, connection=conn): + job and fail_registry.remove(job, delete_job=True) def get_all_queued_jobs(): From d57c552e26aba55e26701b20e52677e757e33811 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 17:20:48 +0530 Subject: [PATCH 4/6] feat: frappe.enqueue with deduplication use deduplicate=True and set job_id for automatic and mostly sane job deduplication. --- frappe/core/doctype/rq_job/test_rq_job.py | 16 +++++++++++++- frappe/utils/background_jobs.py | 26 ++++++++++++++++++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index 6512902fb3..f5d5f89ed4 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -108,13 +108,27 @@ class TestRQJob(FrappeTestCase): self.assertIn("quitting", cstr(stderr)) @timeout(20) - def test_job_id_dedup(self): + def test_job_id_manual_dedup(self): job_id = "test_dedup" job = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id) self.assertTrue(is_job_enqueued(job_id)) self.check_status(job, "finished") self.assertFalse(is_job_enqueued(job_id)) + @timeout(20) + def test_auto_job_dedup(self): + job_id = "test_dedup" + job1 = frappe.enqueue(self.BG_JOB, sleep=2, job_id=job_id, deduplicate=True) + job2 = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id, deduplicate=True) + self.assertIsNone(job2) + self.check_status(job1, "finished") # wait + + # Failed jobs last longer, subsequent job should still pass with same ID. + job3 = frappe.enqueue(self.BG_JOB, fail=True, job_id=job_id, deduplicate=True) + self.check_status(job3, "failed") + job4 = frappe.enqueue(self.BG_JOB, sleep=1, job_id=job_id, deduplicate=True) + self.check_status(job4, "finished") + @timeout(20) def test_enqueue_after_commit(self): job_id = frappe.generate_hash() diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 8254a6fb87..be5291b771 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -66,6 +66,7 @@ def enqueue( on_failure: Callable = None, at_front: bool = False, job_id: str = None, + deduplicate=False, **kwargs, ) -> Job | Any: """ @@ -79,11 +80,26 @@ def enqueue( :param job_name: [DEPRECATED] can be used to name an enqueue call, which can be used to prevent duplicate calls :param now: if now=True, the method is executed via frappe.call :param kwargs: keyword arguments to be passed to the method + :param deduplicate: do not re-queue job if it's already queued, requires job_id. :param job_id: Assigning unique job id, which can be checked using `is_job_enqueued` """ # To handle older implementations is_async = kwargs.pop("async", is_async) + if deduplicate: + if not job_id: + frappe.throw(_("`job_id` paramater is required for deduplication.")) + job = get_job(job_id) + if job and job.get_status() in (JobStatus.QUEUED, JobStatus.STARTED): + frappe.logger().debug(f"Not queueing job {job.id} because it is in queue already") + return + elif job: + # delete job to avoid argument issues related to job args + # https://github.com/rq/rq/issues/793 + job.delete() + + # If job exists and is completed then delete it before re-queue + # namespace job ids to sites job_id = create_job_id(job_id) @@ -492,9 +508,13 @@ def is_job_enqueued(job_id: str) -> bool: def get_job_status(job_id: str) -> JobStatus | None: """Get RQ job status, returns None if job is not found.""" + job = get_job(job_id) + if job: + return job.get_status() + + +def get_job(job_id: str) -> Job: try: - job = Job.fetch(create_job_id(job_id), connection=get_redis_conn()) + return Job.fetch(create_job_id(job_id), connection=get_redis_conn()) except NoSuchJobError: return None - - return job.get_status() From 31d05b466a0b0cf0ae0b0bc4a107c1ca97e84395 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 17:14:53 +0530 Subject: [PATCH 5/6] perf: Email queue dedup using job id instead of name --- frappe/email/queue.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/frappe/email/queue.py b/frappe/email/queue.py index 0df88ebd5c..500a126f72 100755 --- a/frappe/email/queue.py +++ b/frappe/email/queue.py @@ -132,7 +132,6 @@ def return_unsubscribed_page(email, doctype, name): def flush(from_test=False): """flush email queue, every time: called from scheduler""" from frappe.email.doctype.email_queue.email_queue import send_mail - from frappe.utils.background_jobs import get_jobs # To avoid running jobs inside unit tests if frappe.are_emails_muted(): @@ -142,24 +141,16 @@ def flush(from_test=False): if cint(frappe.db.get_default("suspend_email_queue")) == 1: return - try: - queued_jobs = set(get_jobs(site=frappe.local.site, key="job_name")[frappe.local.site]) - except Exception: - queued_jobs = set() - for row in get_queue(): try: - job_name = f"email_queue_sendmail_{row.name}" - if job_name not in queued_jobs: - frappe.enqueue( - method=send_mail, - email_queue_name=row.name, - now=from_test, - job_name=job_name, - queue="short", - ) - else: - frappe.logger().debug(f"Not queueing job {job_name} because it is in queue already") + frappe.enqueue( + method=send_mail, + email_queue_name=row.name, + now=from_test, + job_id=f"email_queue_sendmail_{row.name}", + queue="short", + dedupicate=True, + ) except Exception: frappe.get_doc("Email Queue", row.name).log_error() From a52485cc53269e0c5d73b1c3d174e6dad4fcb6fe Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 17:34:29 +0530 Subject: [PATCH 6/6] feat: RQ jobs can show count --- frappe/core/doctype/rq_job/rq_job.py | 3 +-- frappe/core/doctype/rq_job/rq_job_list.js | 3 +-- frappe/tests/test_background_jobs.py | 6 ++++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index 9a67fae586..659c81e5c4 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -99,8 +99,7 @@ class RQJob(Document): @staticmethod def get_count(args) -> int: - # Can not be implemented efficiently due to site filtering hence ignored. - return 0 + return len(RQJob.get_matching_job_ids(args)) # None of these methods apply to virtual job doctype, overriden for sanity. @staticmethod diff --git a/frappe/core/doctype/rq_job/rq_job_list.js b/frappe/core/doctype/rq_job/rq_job_list.js index aa05b411ba..7d140d668f 100644 --- a/frappe/core/doctype/rq_job/rq_job_list.js +++ b/frappe/core/doctype/rq_job/rq_job_list.js @@ -15,7 +15,6 @@ frappe.listview_settings["RQ Job"] = { ); if (listview.list_view_settings) { - listview.list_view_settings.disable_count = 1; listview.list_view_settings.disable_sidebar_stats = 1; } @@ -57,6 +56,6 @@ frappe.listview_settings["RQ Job"] = { } listview.refresh(); - }, 5000); + }, 15000); }, }; diff --git a/frappe/tests/test_background_jobs.py b/frappe/tests/test_background_jobs.py index b6c1a0d694..99373a84a6 100644 --- a/frappe/tests/test_background_jobs.py +++ b/frappe/tests/test_background_jobs.py @@ -10,6 +10,7 @@ from frappe.tests.utils import FrappeTestCase from frappe.utils.background_jobs import ( RQ_JOB_FAILURE_TTL, RQ_RESULTS_TTL, + create_job_id, execute_job, generate_qname, get_redis_conn, @@ -54,11 +55,12 @@ class TestBackgroundJobs(FrappeTestCase): def test_enqueue_call(self): with patch.object(Queue, "enqueue_call") as mock_enqueue_call: - frappe.enqueue( + job = frappe.enqueue( "frappe.handler.ping", queue="short", timeout=300, kwargs={"site": frappe.local.site}, + job_id="test", ) mock_enqueue_call.assert_called_once_with( @@ -78,7 +80,7 @@ class TestBackgroundJobs(FrappeTestCase): at_front=False, failure_ttl=RQ_JOB_FAILURE_TTL, result_ttl=RQ_RESULTS_TTL, - job_id=None, + job_id=create_job_id("test"), ) def test_job_hooks(self):