test: bg jobs test cleanup (#18767)
* test: fix flaky RQ job tests Sometimes stop_job doesn't succeed and causes tests to timeout. Reduced sleep time to avoid this in tests. We are still testing all the important features - monitoring. * build(deps): Bump RQ to latest version Minor bugfixes that affect us ref: https://github.com/rq/rq/releases * test: sanity tests for scheduled job types * test(test_runner): dont set bench_id globally * refactor: stop_job shouldn't throw error The intention of use here is to stop stuck jobs or long running jobs, if for some reason they were stopped by the time command gets executed, there's no need to throw error.
This commit is contained in:
parent
ba25e97d4e
commit
032df946be
6 changed files with 45 additions and 11 deletions
|
|
@ -5,10 +5,12 @@ import functools
|
|||
import re
|
||||
|
||||
from rq.command import send_stop_job_command
|
||||
from rq.exceptions import InvalidJobOperation
|
||||
from rq.job import Job
|
||||
from rq.queue import Queue
|
||||
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.model.document import Document
|
||||
from frappe.utils import (
|
||||
cint,
|
||||
|
|
@ -93,7 +95,10 @@ class RQJob(Document):
|
|||
|
||||
@check_permissions
|
||||
def stop_job(self):
|
||||
send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id)
|
||||
try:
|
||||
send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id)
|
||||
except InvalidJobOperation:
|
||||
frappe.msgprint(_("Job is not running."), title=_("Invalid Operation"))
|
||||
|
||||
@staticmethod
|
||||
def get_count(args) -> int:
|
||||
|
|
|
|||
|
|
@ -19,12 +19,11 @@ class TestRQJob(FrappeTestCase):
|
|||
|
||||
@timeout(seconds=20)
|
||||
def check_status(self, job: Job, status, wait=True):
|
||||
if wait:
|
||||
while True:
|
||||
if job.is_queued or job.is_started:
|
||||
time.sleep(0.2)
|
||||
else:
|
||||
break
|
||||
while wait:
|
||||
if not (job.is_queued or job.is_started):
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status)
|
||||
|
||||
def test_serialization(self):
|
||||
|
|
@ -69,7 +68,7 @@ class TestRQJob(FrappeTestCase):
|
|||
self.assertGreaterEqual(len(non_failed_jobs), 1)
|
||||
|
||||
# Create a slow job and check if it's stuck in "Started"
|
||||
job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000)
|
||||
job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=10)
|
||||
time.sleep(3)
|
||||
self.check_status(job, "started", wait=False)
|
||||
stop_job(job_id=job.id)
|
||||
|
|
@ -84,8 +83,8 @@ class TestRQJob(FrappeTestCase):
|
|||
|
||||
def test_is_enqueued(self):
|
||||
|
||||
dummy_job = frappe.enqueue(self.BG_JOB, sleep=10, queue="short")
|
||||
job_name = "uniq_test_job"
|
||||
dummy_job = frappe.enqueue(self.BG_JOB, sleep=100, queue="short")
|
||||
actual_job = frappe.enqueue(self.BG_JOB, job_name=job_name, queue="short")
|
||||
|
||||
self.assertTrue(is_job_queued(job_name))
|
||||
|
|
|
|||
|
|
@ -86,7 +86,6 @@ def main(
|
|||
frappe.utils.scheduler.disable_scheduler()
|
||||
|
||||
set_test_email_config()
|
||||
frappe.conf.update({"bench_id": "test_bench", "use_rq_auth": False})
|
||||
|
||||
if not frappe.flags.skip_before_tests:
|
||||
if verbose:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import functools
|
||||
from unittest.mock import patch
|
||||
|
||||
import redis
|
||||
|
||||
|
|
@ -30,12 +31,14 @@ def skip_if_redis_version_lt(version):
|
|||
|
||||
class TestRedisAuth(FrappeTestCase):
|
||||
@skip_if_redis_version_lt("6.0")
|
||||
@patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False})
|
||||
def test_rq_gen_acllist(self):
|
||||
"""Make sure that ACL list is genrated"""
|
||||
acl_list = RedisQueue.gen_acl_list()
|
||||
self.assertEqual(acl_list[1]["bench"][0], get_bench_id())
|
||||
|
||||
@skip_if_redis_version_lt("6.0")
|
||||
@patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False})
|
||||
def test_adding_redis_user(self):
|
||||
acl_list = RedisQueue.gen_acl_list()
|
||||
username, password = acl_list[1]["bench"]
|
||||
|
|
@ -47,6 +50,7 @@ class TestRedisAuth(FrappeTestCase):
|
|||
conn.acl_deluser(username)
|
||||
|
||||
@skip_if_redis_version_lt("6.0")
|
||||
@patch.dict(frappe.conf, {"bench_id": "test_bench", "use_rq_auth": False})
|
||||
def test_rq_namespace(self):
|
||||
"""Make sure that user can access only their respective namespace."""
|
||||
# Current bench ID
|
||||
|
|
|
|||
27
frappe/tests/test_zbg_job_sanity_test.py
Normal file
27
frappe/tests/test_zbg_job_sanity_test.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
""" smoak tests to check that all registered background jobs execute without error.
|
||||
|
||||
Note: Filename is intentional to run this test roughly at end. Don't change."""
|
||||
|
||||
import time
|
||||
|
||||
import frappe
|
||||
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs
|
||||
from frappe.tests.utils import FrappeTestCase, timeout
|
||||
|
||||
|
||||
class TestScheduledJobSanity(FrappeTestCase):
|
||||
def setUp(self):
|
||||
remove_failed_jobs()
|
||||
|
||||
@timeout(90)
|
||||
def test_bg_jobs_run(self):
|
||||
"""Enqueue all scheduled jobs, wait for finish and verify that none failed."""
|
||||
for scheduled_job_type in frappe.get_all("Scheduled Job Type", pluck="name"):
|
||||
frappe.get_doc("Scheduled Job Type", scheduled_job_type).enqueue(force=True)
|
||||
|
||||
while RQJob.get_list({"filters": [["RQ Job", "status", "in", ("queued", "started")]]}):
|
||||
time.sleep(0.5)
|
||||
|
||||
# Check no failed, if failed print full details
|
||||
failed_jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]})
|
||||
self.assertEqual(len(failed_jobs), 0, "Jobs failed: " + str(failed_jobs))
|
||||
|
|
@ -57,7 +57,7 @@ dependencies = [
|
|||
"hiredis~=2.0.0",
|
||||
"requests-oauthlib~=1.3.0",
|
||||
"requests~=2.27.1",
|
||||
"rq~=1.10.1",
|
||||
"rq~=1.11.1",
|
||||
"rsa>=4.1",
|
||||
"semantic-version~=2.10.0",
|
||||
"sqlparse~=0.4.1",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue