From 2cee8905f2fd868b3b48af8d6766731d9c0df8c3 Mon Sep 17 00:00:00 2001 From: Pratik Vyas Date: Tue, 18 Feb 2014 20:53:12 +0530 Subject: [PATCH] use celery from scheduler events --- frappe/celery.py | 37 +++++++++++++++++++++++++++++++++++++ frappe/tasks.py | 26 ++++++++++++++++++++++++++ frappe/utils/scheduler.py | 30 +++++++++++------------------- 3 files changed, 74 insertions(+), 19 deletions(-) create mode 100644 frappe/celery.py create mode 100644 frappe/tasks.py diff --git a/frappe/celery.py b/frappe/celery.py new file mode 100644 index 0000000000..2596f1d15e --- /dev/null +++ b/frappe/celery.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import +from celery import Celery +from datetime import timedelta +import frappe +import json +import os + +SITES_PATH = os.environ.get('SITES_PATH', '.') + +def get_conf(): + if hasattr(frappe.local, 'initialised'): + return frappe.local.conf + with open(os.path.join(SITES_PATH, 'site_config.json')) as f: + return json.load(f) + +def get_app(): + conf = get_conf() + frappe.local.sites_path = SITES_PATH + app = Celery('frappe', + broker=conf['celery_broker'], + backend=conf['celery_result_backend']) + app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False)) + app.conf.CELERY_TASK_SERIALIZER = 'json' + app.conf.CELERYBEAT_SCHEDULE = { + 'scheduler': { + 'task': 'frappe.tasks.enqueue_scheduler_events', + 'schedule': timedelta(seconds=conf['scheduler_interval']) + }, + } + + app.conf.CELERY_TIMEZONE = 'UTC' + return app + +app = get_app() + +if __name__ == '__main__': + app.start() diff --git a/frappe/tasks.py b/frappe/tasks.py new file mode 100644 index 0000000000..2fffbabb98 --- /dev/null +++ b/frappe/tasks.py @@ -0,0 +1,26 @@ +from __future__ import unicode_literals +from frappe.celery import app +import frappe +from frappe.cli import get_sites + +@app.task() +def scheduler_task(site, event, handler): + from frappe.utils.scheduler import log + traceback = "" + try: + frappe.init(site=site) + frappe.get_attr(handler)() + frappe.conn.commit() + except Exception: + traceback += log("Method: {event}, Handler: {handler}".format(event=event, handler=handler)) + traceback += log(frappe.get_traceback()) + frappe.conn.rollback() + # TODO: use logger + print 'ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event) + +@app.task() +def enqueue_scheduler_events(): + import frappe + from frappe.utils.scheduler import execute + for site in get_sites(): + execute(site) diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 0035a484ec..64b9f03b92 100644 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -18,7 +18,7 @@ on the need. import frappe import frappe.utils -def execute(site=None): +def execute(site): """ execute jobs this method triggers the other scheduler events @@ -47,37 +47,29 @@ def execute(site=None): if nowtime.day != last.day: # if first task of the day execute daily tasks - out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - daily:' + trigger('daily')) + out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - daily:' + trigger(site, 'daily')) if nowtime.month != last.month: - out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - monthly:' + trigger('monthly')) + out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - monthly:' + trigger(site, 'monthly')) if nowtime.weekday()==0: - out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - weekly:' + trigger('weekly')) + out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - weekly:' + trigger(site, 'weekly')) if nowtime.hour != last.hour: - out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - hourly:' + trigger('hourly')) + out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - hourly:' + trigger(site, 'hourly')) - out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - all:' + trigger('all')) + out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - all:' + trigger(site, 'all')) return '\n'.join(out) -def trigger(method): +def trigger(site, event): """trigger method in startup.schedule_handler""" - traceback = "" + from frappe.tasks import scheduler_task for scheduler_event in frappe.get_hooks().scheduler_event: event_name, handler = scheduler_event.split(":") - - if method==event_name: - try: - frappe.get_attr(handler)() - frappe.db.commit() - except Exception: - traceback += log("Method: {method}, Handler: {handler}".format(method=method, handler=handler)) - traceback += log(frappe.get_traceback()) - frappe.db.rollback() - - return traceback or 'ok' + if event==event_name: + scheduler_task.delay(site, event, handler) + return 'enqueued' def log(method, message=None): """log error in patch_log"""