diff --git a/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py b/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py index 517f57469b..609cfd06a1 100644 --- a/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py +++ b/frappe/core/doctype/scheduled_job_type/scheduled_job_type.py @@ -9,7 +9,7 @@ from frappe.model.document import Document from frappe.utils import now_datetime, get_datetime from datetime import datetime from croniter import croniter -from frappe.utils.background_jobs import enqueue +from frappe.utils.background_jobs import enqueue, get_jobs class ScheduledJobType(Document): def autoname(self): @@ -24,12 +24,18 @@ class ScheduledJobType(Document): # enqueue event if last execution is done if self.is_event_due(): self.update_last_execution() - frappe.flags.enqueued_jobs.append(self.method) - if frappe.flags.in_test: - self.execute() - else: - enqueue('frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job', - job_type=self.method) + if frappe.flags.enqueued_jobs: + frappe.flags.enqueued_jobs.append(self.method) + + if frappe.flags.execute_job: + self.execute() + else: + if not self.is_job_in_queue(): + enqueue('frappe.core.doctype.scheduled_job_type.scheduled_job_type.run_scheduled_job', + queue = self.get_queue_name(), job_type=self.method) + return True + else: + return False def is_event_due(self, current_time = None): '''Return true if event is due based on time lapsed since last execution''' @@ -39,6 +45,10 @@ class ScheduledJobType(Document): # if the next scheduled event is before NOW, then its due! return self.last_execution <= (current_time or now_datetime()) + def is_job_in_queue(self): + queued_jobs = get_jobs(site=frappe.local.site, key='job_type')[frappe.local.site] + return self.method in queued_jobs + def get_next_execution(self): CRON_MAP = { "Yearly": "0 0 1 1 *", @@ -91,7 +101,7 @@ class ScheduledJobType(Document): frappe.db.commit() def get_queue_name(self): - return self.queue.replace(' ', '_').lower() + return 'long' if ('Long' in self.queue) else 'default' @frappe.whitelist() def execute_event(doc): @@ -143,4 +153,4 @@ def insert_single_event(queue, event, cron_format = None): def clear_events(all_events, scheduler_events): for event in frappe.get_all('Scheduled Job Type', ('name', 'method')): if event.method not in all_events: - frappe.db.delete_doc('Scheduled Job Type', event.name) + frappe.delete_doc('Scheduled Job Type', event.name) diff --git a/frappe/database/postgres/framework_postgres.sql b/frappe/database/postgres/framework_postgres.sql index 373a55279b..cd2f02d8e4 100644 --- a/frappe/database/postgres/framework_postgres.sql +++ b/frappe/database/postgres/framework_postgres.sql @@ -147,7 +147,7 @@ CREATE TABLE "tabDocType Link" ( "parentfield" varchar(255) DEFAULT NULL, "parenttype" varchar(255) DEFAULT NULL, "idx" bigint NOT NULL DEFAULT 0, - "label" varchar(140) NOT NULL, + "label" varchar(140) DEFAULT NULL, "group" varchar(140) DEFAULT NULL, "link_doctype" varchar(140) NOT NULL, "link_fieldname" varchar(140) NOT NULL, diff --git a/frappe/tests/test_scheduler.py b/frappe/tests/test_scheduler.py index 4e193354b0..c1b03f7251 100644 --- a/frappe/tests/test_scheduler.py +++ b/frappe/tests/test_scheduler.py @@ -3,7 +3,7 @@ from __future__ import unicode_literals from unittest import TestCase from dateutil.relativedelta import relativedelta from frappe.core.doctype.scheduled_job_type.scheduled_job_type import sync_jobs -from frappe.utils.background_jobs import enqueue +from frappe.utils.background_jobs import enqueue, get_jobs from frappe.utils.scheduler import enqueue_events import frappe @@ -20,12 +20,31 @@ class TestScheduler(TestCase): def test_enqueue_jobs(self): frappe.db.sql('update `tabScheduled Job Type` set last_execution = "2010-01-01 00:00:00"') + + frappe.flags.execute_job = True enqueue_events(site = frappe.local.site) + frappe.flags.execute_job = False self.assertTrue('frappe.email.queue.clear_outbox', frappe.flags.enqueued_jobs) self.assertTrue('frappe.utils.change_log.check_for_update', frappe.flags.enqueued_jobs) self.assertTrue('frappe.email.doctype.auto_email_report.auto_email_report.send_monthly', frappe.flags.enqueued_jobs) + def test_queue_peeking(self): + if not frappe.db.exists('Scheduled Job Type', 'test_scheduler.test_timeout'): + job = frappe.get_doc(dict( + doctype = 'Scheduled Job Type', + method = 'frappe.tests.test_scheduler.test_timeout', + last_execution = '2010-01-01 00:00:00', + queue = 'All' + )).insert() + else: + job = frappe.get_doc('Scheduled Job Type', 'test_scheduler.test_timeout') + + self.assertTrue(job.enqueue()) + print(get_jobs(site=frappe.local.site, key='job_type')) + self.assertFalse(job.enqueue()) + job.delete() + def test_job_timeout(self): return job = enqueue(test_timeout, timeout=10) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 61e866e2f8..7681e76646 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -160,18 +160,25 @@ def get_worker_name(queue): def get_jobs(site=None, queue=None, key='method'): '''Gets jobs per queue or per site or both''' jobs_per_site = defaultdict(list) + + def add_to_dict(job): + if key in job.kwargs: + jobs_per_site[job.kwargs['site']].append(job.kwargs[key]) + + elif key in job.kwargs.get('kwargs', {}): + # optional keyword arguments are stored in 'kwargs' of 'kwargs' + jobs_per_site[job.kwargs['site']].append(job.kwargs['kwargs'][key]) + for queue in get_queue_list(queue): q = get_queue(queue) for job in q.jobs: if job.kwargs.get('site'): if site is None: - # get jobs for all sites - jobs_per_site[job.kwargs['site']].append(job.kwargs[key]) + add_to_dict(job) elif job.kwargs['site'] == site: - # get jobs only for given site - jobs_per_site[site].append(job.kwargs[key]) + add_to_dict(job) else: print('No site found in job', job.__dict__)