diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index af34955de8..391ccd8dfd 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -5,7 +5,7 @@ import functools import re from rq.command import send_stop_job_command -from rq.exceptions import InvalidJobOperation +from rq.exceptions import InvalidJobOperation, NoSuchJobError from rq.job import Job from rq.queue import Queue @@ -40,9 +40,14 @@ def check_permissions(method): class RQJob(Document): def load_from_db(self): - job = Job.fetch(self.name, connection=get_redis_conn()) + try: + job = Job.fetch(self.name, connection=get_redis_conn()) + except NoSuchJobError: + raise frappe.DoesNotExistError + if not for_current_site(job): raise frappe.PermissionError + super(Document, self).__init__(serialize_job(job)) self._job_obj = job diff --git a/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py b/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py index a8a2e25891..c789a0b6a4 100644 --- a/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py +++ b/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py @@ -30,6 +30,7 @@ class ScheduledJobType(Document): "frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job", queue=self.get_queue_name(), job_type=self.method, + job_id=self.rq_job_id, ) return True return False @@ -39,9 +40,17 @@ class ScheduledJobType(Document): # if the next scheduled event is before NOW, then its due! return self.get_next_execution() <= (current_time or now_datetime()) - def is_job_in_queue(self): - queued_jobs = get_jobs(site=frappe.local.site, key="job_type")[frappe.local.site] - return self.method in queued_jobs + def is_job_in_queue(self) -> bool: + try: + job = frappe.get_doc("RQ Job", self.rq_job_id) + return job.status in ("queued", "started") + except frappe.DoesNotExistError: + return False + + @property + def rq_job_id(self): + """Unique ID created to deduplicate jobs with single RQ call.""" + return f"scheduled_job::{frappe.local.site}::{self.method}" @property def next_execution(self): diff --git a/frappe/database/database.py b/frappe/database/database.py index b4026e58c0..2d38a6dea8 100644 --- a/frappe/database/database.py +++ b/frappe/database/database.py @@ -1302,6 +1302,7 @@ def enqueue_jobs_after_commit(): kwargs=job.get("queue_args"), failure_ttl=frappe.conf.get("rq_job_failure_ttl") or RQ_JOB_FAILURE_TTL, result_ttl=frappe.conf.get("rq_results_ttl") or RQ_RESULTS_TTL, + job_id=job.get("job_id"), ) frappe.flags.enqueue_after_commit = [] diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 613f9695dc..624cf8a6a8 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -64,6 +64,7 @@ def enqueue( enqueue_after_commit=False, *, at_front=False, + job_id=None, **kwargs, ) -> Union["Job", Any]: """ @@ -118,7 +119,13 @@ def enqueue( frappe.flags.enqueue_after_commit = [] frappe.flags.enqueue_after_commit.append( - {"queue": queue, "is_async": is_async, "timeout": timeout, "queue_args": queue_args} + { + "queue": queue, + "is_async": is_async, + "timeout": timeout, + "queue_args": queue_args, + "job_id": job_id, + } ) return frappe.flags.enqueue_after_commit @@ -131,6 +138,7 @@ def enqueue( at_front=at_front, failure_ttl=frappe.conf.get("rq_job_failure_ttl") or RQ_JOB_FAILURE_TTL, result_ttl=frappe.conf.get("rq_results_ttl") or RQ_RESULTS_TTL, + job_id=job_id, )