Merge pull request #21526 from ankush/namespace_all_jobs
refactor: Namespace all RQ jobs
This commit is contained in:
commit
810185d531
9 changed files with 102 additions and 50 deletions
|
|
@ -240,14 +240,14 @@ def start_worker_pool(queue, quiet=False, num_workers=2, burst=False):
|
|||
@click.option("--site", help="site name")
|
||||
@pass_context
|
||||
def ready_for_migration(context, site=None):
|
||||
from frappe.utils.doctor import get_pending_jobs
|
||||
from frappe.utils.doctor import any_job_pending
|
||||
|
||||
if not site:
|
||||
site = get_site(context)
|
||||
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
pending_jobs = get_pending_jobs(site=site)
|
||||
pending_jobs = any_job_pending(site=site)
|
||||
|
||||
if pending_jobs:
|
||||
print(f"NOT READY for migration: site {site} has pending background jobs")
|
||||
|
|
|
|||
|
|
@ -63,23 +63,15 @@ class RQJob(Document):
|
|||
|
||||
order_desc = "desc" in args.get("order_by", "")
|
||||
|
||||
matched_job_ids = RQJob.get_matching_job_ids(args)
|
||||
matched_job_ids = RQJob.get_matching_job_ids(args)[start : start + page_length]
|
||||
|
||||
jobs = []
|
||||
for job_ids in create_batch(matched_job_ids, 100):
|
||||
jobs.extend(
|
||||
serialize_job(job)
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn())
|
||||
if job and for_current_site(job)
|
||||
)
|
||||
if len(jobs) > start + page_length:
|
||||
# we have fetched enough. This is inefficient but because of site filtering TINA
|
||||
break
|
||||
conn = get_redis_conn()
|
||||
jobs = [serialize_job(job) for job in Job.fetch_many(job_ids=matched_job_ids, connection=conn)]
|
||||
|
||||
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length]
|
||||
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)
|
||||
|
||||
@staticmethod
|
||||
def get_matching_job_ids(args):
|
||||
def get_matching_job_ids(args) -> list[str]:
|
||||
filters = make_filter_dict(args.get("filters"))
|
||||
|
||||
queues = _eval_filters(filters.get("queue"), QUEUES)
|
||||
|
|
@ -92,7 +84,7 @@ class RQJob(Document):
|
|||
for status in statuses:
|
||||
matched_job_ids.extend(fetch_job_ids(queue, status))
|
||||
|
||||
return matched_job_ids
|
||||
return filter_current_site_jobs(matched_job_ids)
|
||||
|
||||
@check_permissions
|
||||
def delete(self):
|
||||
|
|
@ -107,8 +99,7 @@ class RQJob(Document):
|
|||
|
||||
@staticmethod
|
||||
def get_count(args) -> int:
|
||||
# Can not be implemented efficiently due to site filtering hence ignored.
|
||||
return 0
|
||||
return len(RQJob.get_matching_job_ids(args))
|
||||
|
||||
# None of these methods apply to virtual job doctype, overriden for sanity.
|
||||
@staticmethod
|
||||
|
|
@ -155,6 +146,12 @@ def for_current_site(job: Job) -> bool:
|
|||
return job.kwargs.get("site") == frappe.local.site
|
||||
|
||||
|
||||
def filter_current_site_jobs(job_ids: list[str]) -> list[str]:
|
||||
site = frappe.local.site
|
||||
|
||||
return [j for j in job_ids if j.startswith(site)]
|
||||
|
||||
|
||||
def _eval_filters(filter, values: list[str]) -> list[str]:
|
||||
if filter:
|
||||
operator, operand = filter
|
||||
|
|
@ -186,10 +183,13 @@ def remove_failed_jobs():
|
|||
frappe.only_for("System Manager")
|
||||
for queue in get_queues():
|
||||
fail_registry = queue.failed_job_registry
|
||||
for job_ids in create_batch(fail_registry.get_job_ids(), 100):
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()):
|
||||
if job and for_current_site(job):
|
||||
fail_registry.remove(job, delete_job=True)
|
||||
failed_jobs = filter_current_site_jobs(fail_registry.get_job_ids())
|
||||
|
||||
# Delete in batches to avoid loading too many things in memory
|
||||
conn = get_redis_conn()
|
||||
for job_ids in create_batch(failed_jobs, 100):
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=conn):
|
||||
job and fail_registry.remove(job, delete_job=True)
|
||||
|
||||
|
||||
def get_all_queued_jobs():
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ frappe.listview_settings["RQ Job"] = {
|
|||
);
|
||||
|
||||
if (listview.list_view_settings) {
|
||||
listview.list_view_settings.disable_count = 1;
|
||||
listview.list_view_settings.disable_sidebar_stats = 1;
|
||||
}
|
||||
|
||||
|
|
@ -57,6 +56,6 @@ frappe.listview_settings["RQ Job"] = {
|
|||
}
|
||||
|
||||
listview.refresh();
|
||||
}, 5000);
|
||||
}, 15000);
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -108,13 +108,27 @@ class TestRQJob(FrappeTestCase):
|
|||
self.assertIn("quitting", cstr(stderr))
|
||||
|
||||
@timeout(20)
|
||||
def test_job_id_dedup(self):
|
||||
def test_job_id_manual_dedup(self):
|
||||
job_id = "test_dedup"
|
||||
job = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id)
|
||||
self.assertTrue(is_job_enqueued(job_id))
|
||||
self.check_status(job, "finished")
|
||||
self.assertFalse(is_job_enqueued(job_id))
|
||||
|
||||
@timeout(20)
|
||||
def test_auto_job_dedup(self):
|
||||
job_id = "test_dedup"
|
||||
job1 = frappe.enqueue(self.BG_JOB, sleep=2, job_id=job_id, deduplicate=True)
|
||||
job2 = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id, deduplicate=True)
|
||||
self.assertIsNone(job2)
|
||||
self.check_status(job1, "finished") # wait
|
||||
|
||||
# Failed jobs last longer, subsequent job should still pass with same ID.
|
||||
job3 = frappe.enqueue(self.BG_JOB, fail=True, job_id=job_id, deduplicate=True)
|
||||
self.check_status(job3, "failed")
|
||||
job4 = frappe.enqueue(self.BG_JOB, sleep=1, job_id=job_id, deduplicate=True)
|
||||
self.check_status(job4, "finished")
|
||||
|
||||
@timeout(20)
|
||||
def test_enqueue_after_commit(self):
|
||||
job_id = frappe.generate_hash()
|
||||
|
|
|
|||
|
|
@ -132,7 +132,6 @@ def return_unsubscribed_page(email, doctype, name):
|
|||
def flush(from_test=False):
|
||||
"""flush email queue, every time: called from scheduler"""
|
||||
from frappe.email.doctype.email_queue.email_queue import send_mail
|
||||
from frappe.utils.background_jobs import get_jobs
|
||||
|
||||
# To avoid running jobs inside unit tests
|
||||
if frappe.are_emails_muted():
|
||||
|
|
@ -142,24 +141,16 @@ def flush(from_test=False):
|
|||
if cint(frappe.db.get_default("suspend_email_queue")) == 1:
|
||||
return
|
||||
|
||||
try:
|
||||
queued_jobs = set(get_jobs(site=frappe.local.site, key="job_name")[frappe.local.site])
|
||||
except Exception:
|
||||
queued_jobs = set()
|
||||
|
||||
for row in get_queue():
|
||||
try:
|
||||
job_name = f"email_queue_sendmail_{row.name}"
|
||||
if job_name not in queued_jobs:
|
||||
frappe.enqueue(
|
||||
method=send_mail,
|
||||
email_queue_name=row.name,
|
||||
now=from_test,
|
||||
job_name=job_name,
|
||||
queue="short",
|
||||
)
|
||||
else:
|
||||
frappe.logger().debug(f"Not queueing job {job_name} because it is in queue already")
|
||||
frappe.enqueue(
|
||||
method=send_mail,
|
||||
email_queue_name=row.name,
|
||||
now=from_test,
|
||||
job_id=f"email_queue_sendmail_{row.name}",
|
||||
queue="short",
|
||||
dedupicate=True,
|
||||
)
|
||||
except Exception:
|
||||
frappe.get_doc("Email Queue", row.name).log_error()
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from frappe.tests.utils import FrappeTestCase
|
|||
from frappe.utils.background_jobs import (
|
||||
RQ_JOB_FAILURE_TTL,
|
||||
RQ_RESULTS_TTL,
|
||||
create_job_id,
|
||||
execute_job,
|
||||
generate_qname,
|
||||
get_redis_conn,
|
||||
|
|
@ -54,11 +55,12 @@ class TestBackgroundJobs(FrappeTestCase):
|
|||
|
||||
def test_enqueue_call(self):
|
||||
with patch.object(Queue, "enqueue_call") as mock_enqueue_call:
|
||||
frappe.enqueue(
|
||||
job = frappe.enqueue(
|
||||
"frappe.handler.ping",
|
||||
queue="short",
|
||||
timeout=300,
|
||||
kwargs={"site": frappe.local.site},
|
||||
job_id="test",
|
||||
)
|
||||
|
||||
mock_enqueue_call.assert_called_once_with(
|
||||
|
|
@ -78,7 +80,7 @@ class TestBackgroundJobs(FrappeTestCase):
|
|||
at_front=False,
|
||||
failure_ttl=RQ_JOB_FAILURE_TTL,
|
||||
result_ttl=RQ_RESULTS_TTL,
|
||||
job_id=None,
|
||||
job_id=create_job_id("test"),
|
||||
)
|
||||
|
||||
def test_job_hooks(self):
|
||||
|
|
|
|||
|
|
@ -20,9 +20,11 @@ from unittest.mock import patch
|
|||
import click
|
||||
from click import Command
|
||||
from click.testing import CliRunner, Result
|
||||
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
|
||||
|
||||
# imports - module imports
|
||||
import frappe
|
||||
import frappe.commands.scheduler
|
||||
import frappe.commands.site
|
||||
import frappe.commands.utils
|
||||
import frappe.recorder
|
||||
|
|
@ -760,6 +762,19 @@ class TestBenchBuild(BaseTestCommands):
|
|||
)
|
||||
|
||||
|
||||
class TestSchedulerUtils(BaseTestCommands):
|
||||
# Retry just in case there are stuck queued jobs
|
||||
@retry(
|
||||
retry=retry_if_exception_type(AssertionError),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_fixed(3),
|
||||
reraise=True,
|
||||
)
|
||||
def test_ready_for_migrate(self):
|
||||
with cli(frappe.commands.scheduler.ready_for_migration) as result:
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
|
||||
|
||||
class TestCommandUtils(FrappeTestCase):
|
||||
def test_bench_helper(self):
|
||||
from frappe.utils.bench_helper import get_app_groups
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ def enqueue(
|
|||
on_failure: Callable = None,
|
||||
at_front: bool = False,
|
||||
job_id: str = None,
|
||||
deduplicate=False,
|
||||
**kwargs,
|
||||
) -> Job | Any:
|
||||
"""
|
||||
|
|
@ -79,14 +80,28 @@ def enqueue(
|
|||
:param job_name: [DEPRECATED] can be used to name an enqueue call, which can be used to prevent duplicate calls
|
||||
:param now: if now=True, the method is executed via frappe.call
|
||||
:param kwargs: keyword arguments to be passed to the method
|
||||
:param deduplicate: do not re-queue job if it's already queued, requires job_id.
|
||||
:param job_id: Assigning unique job id, which can be checked using `is_job_enqueued`
|
||||
"""
|
||||
# To handle older implementations
|
||||
is_async = kwargs.pop("async", is_async)
|
||||
|
||||
if job_id:
|
||||
# namespace job ids to sites
|
||||
job_id = create_job_id(job_id)
|
||||
if deduplicate:
|
||||
if not job_id:
|
||||
frappe.throw(_("`job_id` paramater is required for deduplication."))
|
||||
job = get_job(job_id)
|
||||
if job and job.get_status() in (JobStatus.QUEUED, JobStatus.STARTED):
|
||||
frappe.logger().debug(f"Not queueing job {job.id} because it is in queue already")
|
||||
return
|
||||
elif job:
|
||||
# delete job to avoid argument issues related to job args
|
||||
# https://github.com/rq/rq/issues/793
|
||||
job.delete()
|
||||
|
||||
# If job exists and is completed then delete it before re-queue
|
||||
|
||||
# namespace job ids to sites
|
||||
job_id = create_job_id(job_id)
|
||||
|
||||
if job_name:
|
||||
deprecation_warning("Using enqueue with `job_name` is deprecated, use `job_id` instead.")
|
||||
|
|
@ -481,6 +496,9 @@ def test_job(s):
|
|||
|
||||
def create_job_id(job_id: str) -> str:
|
||||
"""Generate unique job id for deduplication"""
|
||||
|
||||
if not job_id:
|
||||
job_id = str(uuid4())
|
||||
return f"{frappe.local.site}::{job_id}"
|
||||
|
||||
|
||||
|
|
@ -490,9 +508,13 @@ def is_job_enqueued(job_id: str) -> bool:
|
|||
|
||||
def get_job_status(job_id: str) -> JobStatus | None:
|
||||
"""Get RQ job status, returns None if job is not found."""
|
||||
job = get_job(job_id)
|
||||
if job:
|
||||
return job.get_status()
|
||||
|
||||
|
||||
def get_job(job_id: str) -> Job:
|
||||
try:
|
||||
job = Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
return Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
except NoSuchJobError:
|
||||
return None
|
||||
|
||||
return job.get_status()
|
||||
|
|
|
|||
|
|
@ -79,6 +79,15 @@ def get_pending_jobs(site=None):
|
|||
return jobs_per_queue
|
||||
|
||||
|
||||
def any_job_pending(site: str) -> bool:
|
||||
for queue in get_queue_list():
|
||||
q = get_queue(queue)
|
||||
for job_id in q.get_job_ids():
|
||||
if job_id.startswith(site):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def check_number_of_workers():
|
||||
return len(get_workers())
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue