perf: Faster scheduled job deduplication

O(n) to O(1) dedup by specifying unique IDs on scheduled RQ jobs.
This commit is contained in:
Ankush Menat 2023-05-08 18:28:09 +05:30
parent cf53b1a58e
commit 4bd02a4ed1
4 changed files with 29 additions and 6 deletions

View file

@ -5,7 +5,7 @@ import functools
import re
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.job import Job
from rq.queue import Queue
@ -40,9 +40,14 @@ def check_permissions(method):
class RQJob(Document):
def load_from_db(self):
job = Job.fetch(self.name, connection=get_redis_conn())
try:
job = Job.fetch(self.name, connection=get_redis_conn())
except NoSuchJobError:
raise frappe.DoesNotExistError
if not for_current_site(job):
raise frappe.PermissionError
super(Document, self).__init__(serialize_job(job))
self._job_obj = job

View file

@ -30,6 +30,7 @@ class ScheduledJobType(Document):
"frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job",
queue=self.get_queue_name(),
job_type=self.method,
job_id=self.rq_job_id,
)
return True
return False
@ -39,9 +40,17 @@ class ScheduledJobType(Document):
# if the next scheduled event is before NOW, then its due!
return self.get_next_execution() <= (current_time or now_datetime())
def is_job_in_queue(self):
queued_jobs = get_jobs(site=frappe.local.site, key="job_type")[frappe.local.site]
return self.method in queued_jobs
def is_job_in_queue(self) -> bool:
try:
job = frappe.get_doc("RQ Job", self.rq_job_id)
return job.status in ("queued", "started")
except frappe.DoesNotExistError:
return False
@property
def rq_job_id(self):
"""Unique ID created to deduplicate jobs with single RQ call."""
return f"scheduled_job::{frappe.local.site}::{self.method}"
@property
def next_execution(self):

View file

@ -1302,6 +1302,7 @@ def enqueue_jobs_after_commit():
kwargs=job.get("queue_args"),
failure_ttl=frappe.conf.get("rq_job_failure_ttl") or RQ_JOB_FAILURE_TTL,
result_ttl=frappe.conf.get("rq_results_ttl") or RQ_RESULTS_TTL,
job_id=job.get("job_id"),
)
frappe.flags.enqueue_after_commit = []

View file

@ -64,6 +64,7 @@ def enqueue(
enqueue_after_commit=False,
*,
at_front=False,
job_id=None,
**kwargs,
) -> Union["Job", Any]:
"""
@ -118,7 +119,13 @@ def enqueue(
frappe.flags.enqueue_after_commit = []
frappe.flags.enqueue_after_commit.append(
{"queue": queue, "is_async": is_async, "timeout": timeout, "queue_args": queue_args}
{
"queue": queue,
"is_async": is_async,
"timeout": timeout,
"queue_args": queue_args,
"job_id": job_id,
}
)
return frappe.flags.enqueue_after_commit
@ -131,6 +138,7 @@ def enqueue(
at_front=at_front,
failure_ttl=frappe.conf.get("rq_job_failure_ttl") or RQ_JOB_FAILURE_TTL,
result_ttl=frappe.conf.get("rq_results_ttl") or RQ_RESULTS_TTL,
job_id=job_id,
)