test: test without adding hacky flags

This commit is contained in:
Ankush Menat 2022-12-14 12:40:06 +05:30 committed by Ankush Menat
parent 82d699a801
commit 2b050f9fc3
3 changed files with 24 additions and 25 deletions

View file

@ -25,20 +25,13 @@ class ScheduledJobType(Document):
def enqueue(self, force=False):
# enqueue event if last execution is done
if self.is_event_due() or force:
if frappe.flags.enqueued_jobs:
frappe.flags.enqueued_jobs.append(self.method)
if frappe.flags.execute_job:
self.execute()
else:
if not self.is_job_in_queue():
enqueue(
"frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job",
queue=self.get_queue_name(),
job_type=self.method,
)
return True
if not self.is_job_in_queue():
enqueue(
"frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job",
queue=self.get_queue_name(),
job_type=self.method,
)
return True
return False
def is_event_due(self, current_time=None):

View file

@ -34,18 +34,19 @@ class TestScheduler(TestCase):
if not frappe.get_all("Scheduled Job Type", limit=1):
sync_jobs()
def tearDown(self):
purge_pending_jobs()
def test_enqueue_jobs(self):
frappe.db.sql("update `tabScheduled Job Type` set last_execution = '2010-01-01 00:00:00'")
frappe.flags.execute_job = True
enqueue_events(site=frappe.local.site)
frappe.flags.execute_job = False
enqueued_jobs = enqueue_events(site=frappe.local.site)
self.assertTrue("frappe.email.queue.set_expiry_for_email_queue", frappe.flags.enqueued_jobs)
self.assertTrue("frappe.utils.change_log.check_for_update", frappe.flags.enqueued_jobs)
self.assertTrue(
self.assertIn("frappe.email.queue.set_expiry_for_email_queue", enqueued_jobs)
self.assertIn("frappe.utils.change_log.check_for_update", enqueued_jobs)
self.assertIn(
"frappe.email.doctype.auto_email_report.auto_email_report.send_monthly",
frappe.flags.enqueued_jobs,
enqueued_jobs,
)
def test_queue_peeking(self):

View file

@ -81,14 +81,19 @@ def enqueue_events_for_site(site: str) -> None:
frappe.destroy()
def enqueue_events(site: str) -> None:
def enqueue_events(site: str) -> list[str] | None:
if schedule_jobs_based_on_activity():
frappe.flags.enqueued_jobs = []
queued_jobs = get_jobs(site=site, key="job_type").get(site) or []
for job_type in frappe.get_all("Scheduled Job Type", ("name", "method"), dict(stopped=0)):
enqueued_jobs = []
for job_type in frappe.get_all("Scheduled Job Type", ("name", "method"), {"stopped": 0}):
if not job_type.method in queued_jobs:
# don't add it to queue if still pending
frappe.get_cached_doc("Scheduled Job Type", job_type.name).enqueue()
job_type = frappe.get_cached_doc("Scheduled Job Type", job_type.name)
if job_type.enqueue():
enqueued_jobs.append(job_type.method)
return enqueued_jobs
def is_scheduler_inactive() -> bool: