diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py index 7e1c35a0e6..f05611fe7d 100644 --- a/frappe/core/doctype/rq_job/rq_job.py +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -5,10 +5,12 @@ import functools import re from rq.command import send_stop_job_command +from rq.exceptions import InvalidJobOperation from rq.job import Job from rq.queue import Queue import frappe +from frappe import _ from frappe.model.document import Document from frappe.utils import ( cint, @@ -93,7 +95,10 @@ class RQJob(Document): @check_permissions def stop_job(self): - send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id) + try: + send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id) + except InvalidJobOperation: + frappe.msgprint(_("Job is not running."), title=_("Invalid Operation")) @staticmethod def get_count(args) -> int: diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py index ae0691fa61..460aa08941 100644 --- a/frappe/core/doctype/rq_job/test_rq_job.py +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -19,12 +19,11 @@ class TestRQJob(FrappeTestCase): @timeout(seconds=20) def check_status(self, job: Job, status, wait=True): - if wait: - while True: - if job.is_queued or job.is_started: - time.sleep(0.2) - else: - break + while wait: + if not (job.is_queued or job.is_started): + break + time.sleep(0.2) + self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status) def test_serialization(self): @@ -69,7 +68,7 @@ class TestRQJob(FrappeTestCase): self.assertGreaterEqual(len(non_failed_jobs), 1) # Create a slow job and check if it's stuck in "Started" - job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000) + job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=10) time.sleep(3) self.check_status(job, "started", wait=False) stop_job(job_id=job.id) @@ -84,8 +83,8 @@ class TestRQJob(FrappeTestCase): def test_is_enqueued(self): + dummy_job = frappe.enqueue(self.BG_JOB, sleep=10, queue="short") job_name = "uniq_test_job" - dummy_job = frappe.enqueue(self.BG_JOB, sleep=100, queue="short") actual_job = frappe.enqueue(self.BG_JOB, job_name=job_name, queue="short") self.assertTrue(is_job_queued(job_name)) diff --git a/frappe/test_runner.py b/frappe/test_runner.py index 1e3573336a..7e2c7e724f 100644 --- a/frappe/test_runner.py +++ b/frappe/test_runner.py @@ -86,7 +86,6 @@ def main( frappe.utils.scheduler.disable_scheduler() set_test_email_config() - frappe.conf.update({"bench_id": "test_bench", "use_rq_auth": False}) if not frappe.flags.skip_before_tests: if verbose: diff --git a/frappe/tests/test_redis.py b/frappe/tests/test_redis.py index ca59af98b0..19001028f6 100644 --- a/frappe/tests/test_redis.py +++ b/frappe/tests/test_redis.py @@ -1,4 +1,5 @@ import functools +from unittest.mock import patch import redis @@ -30,12 +31,14 @@ def skip_if_redis_version_lt(version): class TestRedisAuth(FrappeTestCase): @skip_if_redis_version_lt("6.0") + @patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False}) def test_rq_gen_acllist(self): """Make sure that ACL list is genrated""" acl_list = RedisQueue.gen_acl_list() self.assertEqual(acl_list[1]["bench"][0], get_bench_id()) @skip_if_redis_version_lt("6.0") + @patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False}) def test_adding_redis_user(self): acl_list = RedisQueue.gen_acl_list() username, password = acl_list[1]["bench"] @@ -47,6 +50,7 @@ class TestRedisAuth(FrappeTestCase): conn.acl_deluser(username) @skip_if_redis_version_lt("6.0") + @patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False}) def test_rq_namespace(self): """Make sure that user can access only their respective namespace.""" # Current bench ID diff --git a/frappe/tests/test_zbg_job_sanity_test.py b/frappe/tests/test_zbg_job_sanity_test.py new file mode 100644 index 0000000000..19dc168c04 --- /dev/null +++ b/frappe/tests/test_zbg_job_sanity_test.py @@ -0,0 +1,27 @@ +""" smoak tests to check that all registered background jobs execute without error. + +Note: Filename is intentional to run this test roughly at end. Don't change.""" + +import time + +import frappe +from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs +from frappe.tests.utils import FrappeTestCase, timeout + + +class TestScheduledJobSanity(FrappeTestCase): + def setUp(self): + remove_failed_jobs() + + @timeout(90) + def test_bg_jobs_run(self): + """Enqueue all scheduled jobs, wait for finish and verify that none failed.""" + for scheduled_job_type in frappe.get_all("Scheduled Job Type", pluck="name"): + frappe.get_doc("Scheduled Job Type", scheduled_job_type).enqueue(force=True) + + while RQJob.get_list({"filters": [["RQ Job", "status", "in", ("queued", "started")]]}): + time.sleep(0.5) + + # Check no failed, if failed print full details + failed_jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]}) + self.assertEqual(len(failed_jobs), 0, "Jobs failed: " + str(failed_jobs)) diff --git a/pyproject.toml b/pyproject.toml index 6f744ca186..348353003c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ dependencies = [ "hiredis~=2.0.0", "requests-oauthlib~=1.3.0", "requests~=2.27.1", - "rq~=1.10.1", + "rq~=1.11.1", "rsa>=4.1", "semantic-version~=2.10.0", "sqlparse~=0.4.1",