fix: handle expired jobs while deduping (#31803)
* fix: Race condition between job fetch and expiry * fix: don't refetch status
This commit is contained in:
parent
6be2ada30c
commit
acbac07d01
1 changed files with 4 additions and 4 deletions
|
|
@ -16,7 +16,7 @@ import setproctitle
|
|||
from redis.exceptions import BusyLoadingError, ConnectionError
|
||||
from rq import Callback, Queue, Worker
|
||||
from rq.defaults import DEFAULT_WORKER_TTL
|
||||
from rq.exceptions import NoSuchJobError
|
||||
from rq.exceptions import InvalidJobOperation, NoSuchJobError
|
||||
from rq.job import Job, JobStatus
|
||||
from rq.logutils import setup_loghandlers
|
||||
from rq.timeouts import JobTimeoutException
|
||||
|
|
@ -114,7 +114,7 @@ def enqueue(
|
|||
if not job_id:
|
||||
frappe.throw(_("`job_id` paramater is required for deduplication."))
|
||||
job = get_job(job_id)
|
||||
if job and job.get_status() in (JobStatus.QUEUED, JobStatus.STARTED):
|
||||
if job and job.get_status(refresh=False) in (JobStatus.QUEUED, JobStatus.STARTED):
|
||||
frappe.logger().debug(f"Not queueing job {job.id} because it is in queue already")
|
||||
return
|
||||
elif job:
|
||||
|
|
@ -672,13 +672,13 @@ def is_job_enqueued(job_id: str) -> bool:
|
|||
def get_job_status(job_id: str) -> JobStatus | None:
|
||||
"""Get RQ job status, returns None if job is not found."""
|
||||
if job := get_job(job_id):
|
||||
return job.get_status()
|
||||
return job.get_status(refresh=False)
|
||||
|
||||
|
||||
def get_job(job_id: str) -> Job | None:
|
||||
try:
|
||||
return Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
except NoSuchJobError:
|
||||
except (NoSuchJobError, InvalidJobOperation):
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue