use celery from scheduler events

This commit is contained in:
Pratik Vyas 2014-02-18 20:53:12 +05:30 committed by Anand Doshi
parent 934d9558d4
commit 2cee8905f2
3 changed files with 74 additions and 19 deletions

37
frappe/celery.py Normal file
View file

@ -0,0 +1,37 @@
from __future__ import absolute_import
from celery import Celery
from datetime import timedelta
import frappe
import json
import os
SITES_PATH = os.environ.get('SITES_PATH', '.')
def get_conf():
if hasattr(frappe.local, 'initialised'):
return frappe.local.conf
with open(os.path.join(SITES_PATH, 'site_config.json')) as f:
return json.load(f)
def get_app():
conf = get_conf()
frappe.local.sites_path = SITES_PATH
app = Celery('frappe',
broker=conf['celery_broker'],
backend=conf['celery_result_backend'])
app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False))
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERYBEAT_SCHEDULE = {
'scheduler': {
'task': 'frappe.tasks.enqueue_scheduler_events',
'schedule': timedelta(seconds=conf['scheduler_interval'])
},
}
app.conf.CELERY_TIMEZONE = 'UTC'
return app
app = get_app()
if __name__ == '__main__':
app.start()

26
frappe/tasks.py Normal file
View file

@ -0,0 +1,26 @@
from __future__ import unicode_literals
from frappe.celery import app
import frappe
from frappe.cli import get_sites
@app.task()
def scheduler_task(site, event, handler):
from frappe.utils.scheduler import log
traceback = ""
try:
frappe.init(site=site)
frappe.get_attr(handler)()
frappe.conn.commit()
except Exception:
traceback += log("Method: {event}, Handler: {handler}".format(event=event, handler=handler))
traceback += log(frappe.get_traceback())
frappe.conn.rollback()
# TODO: use logger
print 'ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)
@app.task()
def enqueue_scheduler_events():
import frappe
from frappe.utils.scheduler import execute
for site in get_sites():
execute(site)

View file

@ -18,7 +18,7 @@ on the need.
import frappe
import frappe.utils
def execute(site=None):
def execute(site):
"""
execute jobs
this method triggers the other scheduler events
@ -47,37 +47,29 @@ def execute(site=None):
if nowtime.day != last.day:
# if first task of the day execute daily tasks
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - daily:' + trigger('daily'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - daily:' + trigger(site, 'daily'))
if nowtime.month != last.month:
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - monthly:' + trigger('monthly'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - monthly:' + trigger(site, 'monthly'))
if nowtime.weekday()==0:
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - weekly:' + trigger('weekly'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - weekly:' + trigger(site, 'weekly'))
if nowtime.hour != last.hour:
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - hourly:' + trigger('hourly'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - hourly:' + trigger(site, 'hourly'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - all:' + trigger('all'))
out.append(nowtime.strftime("%Y-%m-%d %H:%M:%S") + ' - all:' + trigger(site, 'all'))
return '\n'.join(out)
def trigger(method):
def trigger(site, event):
"""trigger method in startup.schedule_handler"""
traceback = ""
from frappe.tasks import scheduler_task
for scheduler_event in frappe.get_hooks().scheduler_event:
event_name, handler = scheduler_event.split(":")
if method==event_name:
try:
frappe.get_attr(handler)()
frappe.db.commit()
except Exception:
traceback += log("Method: {method}, Handler: {handler}".format(method=method, handler=handler))
traceback += log(frappe.get_traceback())
frappe.db.rollback()
return traceback or 'ok'
if event==event_name:
scheduler_task.delay(site, event, handler)
return 'enqueued'
def log(method, message=None):
"""log error in patch_log"""