This commit is contained in:
Pratik Vyas 2014-02-25 17:44:44 +05:30 committed by Anand Doshi
parent fa4d495230
commit 950a621cab
3 changed files with 37 additions and 6 deletions

View file

@ -7,6 +7,15 @@ import os
SITES_PATH = os.environ.get('SITES_PATH', '.')
class SiteRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if hasattr(frappe.local, 'site'):
return {
'queue': frappe.local.site
}
return None
def get_conf():
if hasattr(frappe.local, 'initialised'):
return frappe.local.conf
@ -22,11 +31,16 @@ def get_app():
app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False))
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_ROUTES = (SiteRouter(),)
app.conf.CELERYBEAT_SCHEDULE = {
'scheduler': {
'task': 'frappe.tasks.enqueue_scheduler_events',
'schedule': timedelta(seconds=conf['scheduler_interval'])
},
'sync_queues': {
'task': 'frappe.tasks.sync_queues',
'schedule': timedelta(seconds=conf['scheduler_interval'])
},
}
app.conf.CELERY_TIMEZONE = 'UTC'

View file

@ -1,29 +1,41 @@
from __future__ import unicode_literals
from frappe.celery import app
from frappe.celery_app import app
import frappe
from frappe.cli import get_sites
from frappe.utils.file_lock import delete_lock
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task()
def sync_queues():
sites = get_sites()
for site in sites:
app.control.broadcast('add_consumer', arguments={'queue': site}, reply=True)
@app.task()
def scheduler_task(site, event, handler):
from frappe.utils.scheduler import log
traceback = ""
try:
frappe.init(site=site)
frappe.connect()
frappe.get_attr(handler)()
frappe.conn.commit()
delete_lock(handler)
except Exception:
traceback += log("Method: {event}, Handler: {handler}".format(event=event, handler=handler))
traceback += log(frappe.get_traceback())
logger.warn(traceback)
frappe.conn.rollback()
frappe.destroy()
# TODO: use logger
print 'ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)
logger.info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
@app.task()
def enqueue_scheduler_events():
import frappe
from frappe.utils.scheduler import execute
logger.info('ran {handler} for {site} for event: {event}')
for site in get_sites():
execute(site)

View file

@ -2,6 +2,8 @@
# MIT License. See license.txt
from __future__ import unicode_literals
import logging
"""
Scheduler will call the following events from the module
`startup.schedule_handler` and Control Panel (for server scripts)
@ -17,7 +19,7 @@ on the need.
import frappe
import frappe.utils
from frappe.utils.file_lock import create_lock, check_lock, delete_lock
from frappe.utils.file_lock import create_lock, check_lock, delete_lock, LockTimeoutError
def execute(site):
"""
@ -26,10 +28,13 @@ def execute(site):
Database connection: Ideally it should be connected from outside, if there is
no connection, it will connect from defs.py
"""
logging.info('executing %', site)
frappe.init(site=site)
if check_lock('scheduler'):
try:
create_lock('scheduler')
except LockTimeoutError:
frappe.destroy()
return
create_lock('scheduler')
from datetime import datetime
format = '%Y-%m-%d %H:%M:%S'