fix(scheduler): implement queue peeking

This commit is contained in:
Rushabh Mehta 2019-09-24 22:44:08 +05:30
parent 0035772f8f
commit c583be6f33
4 changed files with 51 additions and 15 deletions

View file

@ -9,7 +9,7 @@ from frappe.model.document import Document
from frappe.utils import now_datetime, get_datetime
from datetime import datetime
from croniter import croniter
from frappe.utils.background_jobs import enqueue
from frappe.utils.background_jobs import enqueue, get_jobs
class ScheduledJobType(Document):
def autoname(self):
@ -24,12 +24,18 @@ class ScheduledJobType(Document):
# enqueue event if last execution is done
if self.is_event_due():
self.update_last_execution()
frappe.flags.enqueued_jobs.append(self.method)
if frappe.flags.in_test:
self.execute()
else:
enqueue('frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job',
job_type=self.method)
if frappe.flags.enqueued_jobs:
frappe.flags.enqueued_jobs.append(self.method)
if frappe.flags.execute_job:
self.execute()
else:
if not self.is_job_in_queue():
enqueue('frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job',
queue = self.get_queue_name(), job_type=self.method)
return True
else:
return False
def is_event_due(self, current_time = None):
'''Return true if event is due based on time lapsed since last execution'''
@ -39,6 +45,10 @@ class ScheduledJobType(Document):
# if the next scheduled event is before NOW, then its due!
return self.last_execution <= (current_time or now_datetime())
def is_job_in_queue(self):
queued_jobs = get_jobs(site=frappe.local.site, key='job_type')[frappe.local.site]
return self.method in queued_jobs
def get_next_execution(self):
CRON_MAP = {
"Yearly": "0 0 1 1 *",
@ -91,7 +101,7 @@ class ScheduledJobType(Document):
frappe.db.commit()
def get_queue_name(self):
return self.queue.replace(' ', '_').lower()
return 'long' if ('Long' in self.queue) else 'default'
@frappe.whitelist()
def execute_event(doc):
@ -143,4 +153,4 @@ def insert_single_event(queue, event, cron_format = None):
def clear_events(all_events, scheduler_events):
for event in frappe.get_all('Scheduled Job Type', ('name', 'method')):
if event.method not in all_events:
frappe.db.delete_doc('Scheduled Job Type', event.name)
frappe.delete_doc('Scheduled Job Type', event.name)

View file

@ -147,7 +147,7 @@ CREATE TABLE "tabDocType Link" (
"parentfield" varchar(255) DEFAULT NULL,
"parenttype" varchar(255) DEFAULT NULL,
"idx" bigint NOT NULL DEFAULT 0,
"label" varchar(140) NOT NULL,
"label" varchar(140) DEFAULT NULL,
"group" varchar(140) DEFAULT NULL,
"link_doctype" varchar(140) NOT NULL,
"link_fieldname" varchar(140) NOT NULL,

View file

@ -3,7 +3,7 @@ from __future__ import unicode_literals
from unittest import TestCase
from dateutil.relativedelta import relativedelta
from frappe.core.doctype.scheduled_job_type.scheduled_job_type import sync_jobs
from frappe.utils.background_jobs import enqueue
from frappe.utils.background_jobs import enqueue, get_jobs
from frappe.utils.scheduler import enqueue_events
import frappe
@ -20,12 +20,31 @@ class TestScheduler(TestCase):
def test_enqueue_jobs(self):
frappe.db.sql('update `tabScheduled Job Type` set last_execution = "2010-01-01 00:00:00"')
frappe.flags.execute_job = True
enqueue_events(site = frappe.local.site)
frappe.flags.execute_job = False
self.assertTrue('frappe.email.queue.clear_outbox', frappe.flags.enqueued_jobs)
self.assertTrue('frappe.utils.change_log.check_for_update', frappe.flags.enqueued_jobs)
self.assertTrue('frappe.email.doctype.auto_email_report.auto_email_report.send_monthly', frappe.flags.enqueued_jobs)
def test_queue_peeking(self):
if not frappe.db.exists('Scheduled Job Type', 'test_scheduler.test_timeout'):
job = frappe.get_doc(dict(
doctype = 'Scheduled Job Type',
method = 'frappe.tests.test_scheduler.test_timeout',
last_execution = '2010-01-01 00:00:00',
queue = 'All'
)).insert()
else:
job = frappe.get_doc('Scheduled Job Type', 'test_scheduler.test_timeout')
self.assertTrue(job.enqueue())
print(get_jobs(site=frappe.local.site, key='job_type'))
self.assertFalse(job.enqueue())
job.delete()
def test_job_timeout(self):
return
job = enqueue(test_timeout, timeout=10)

View file

@ -160,18 +160,25 @@ def get_worker_name(queue):
def get_jobs(site=None, queue=None, key='method'):
'''Gets jobs per queue or per site or both'''
jobs_per_site = defaultdict(list)
def add_to_dict(job):
if key in job.kwargs:
jobs_per_site[job.kwargs['site']].append(job.kwargs[key])
elif key in job.kwargs.get('kwargs', {}):
# optional keyword arguments are stored in 'kwargs' of 'kwargs'
jobs_per_site[job.kwargs['site']].append(job.kwargs['kwargs'][key])
for queue in get_queue_list(queue):
q = get_queue(queue)
for job in q.jobs:
if job.kwargs.get('site'):
if site is None:
# get jobs for all sites
jobs_per_site[job.kwargs['site']].append(job.kwargs[key])
add_to_dict(job)
elif job.kwargs['site'] == site:
# get jobs only for given site
jobs_per_site[site].append(job.kwargs[key])
add_to_dict(job)
else:
print('No site found in job', job.__dict__)