test: deduplication with unique job id
This commit is contained in:
parent
bc3871b657
commit
ecce4ba5a7
2 changed files with 12 additions and 4 deletions
|
|
@ -11,7 +11,7 @@ import frappe
|
|||
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job
|
||||
from frappe.tests.utils import FrappeTestCase, timeout
|
||||
from frappe.utils import cstr, execute_in_shell
|
||||
from frappe.utils.background_jobs import is_job_queued
|
||||
from frappe.utils.background_jobs import is_job_enqueued, is_job_queued
|
||||
|
||||
|
||||
class TestRQJob(FrappeTestCase):
|
||||
|
|
@ -107,6 +107,13 @@ class TestRQJob(FrappeTestCase):
|
|||
_, stderr = execute_in_shell("bench worker --queue short,default --burst", check_exit_code=True)
|
||||
self.assertIn("quitting", cstr(stderr))
|
||||
|
||||
@timeout(20)
|
||||
def test_job_id_dedup(self):
|
||||
job_id = "test_dedup"
|
||||
job = frappe.enqueue(self.BG_JOB, sleep=10, job_id=job_id)
|
||||
self.assertTrue(is_job_enqueued(job_id))
|
||||
stop_job(job.id)
|
||||
|
||||
|
||||
def test_func(fail=False, sleep=0):
|
||||
if fail:
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import frappe.monitor
|
|||
from frappe import _
|
||||
from frappe.utils import cstr, get_bench_id
|
||||
from frappe.utils.commands import log
|
||||
from frappe.utils.deprecations import deprecation_warning
|
||||
from frappe.utils.redis_queue import RedisQueue
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -452,9 +451,11 @@ def create_job_id(job_id: str) -> str:
|
|||
|
||||
|
||||
def is_job_enqueued(job_id: str) -> str:
|
||||
from rq.job import Job
|
||||
|
||||
try:
|
||||
job = Job.fetch(job_id, connection=get_redis_conn())
|
||||
job = Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
except NoSuchJobError:
|
||||
return False
|
||||
|
||||
return job.status in ("queued", "started")
|
||||
return job.get_status() in ("queued", "started")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue