diff --git a/frappe/celery.py b/frappe/celery_app.py similarity index 72% rename from frappe/celery.py rename to frappe/celery_app.py index eee71b48bd..b9b0534706 100644 --- a/frappe/celery.py +++ b/frappe/celery_app.py @@ -7,6 +7,15 @@ import os SITES_PATH = os.environ.get('SITES_PATH', '.') +class SiteRouter(object): + + def route_for_task(self, task, args=None, kwargs=None): + if hasattr(frappe.local, 'site'): + return { + 'queue': frappe.local.site + } + return None + def get_conf(): if hasattr(frappe.local, 'initialised'): return frappe.local.conf @@ -22,11 +31,16 @@ def get_app(): app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False)) app.conf.CELERY_TASK_SERIALIZER = 'json' app.conf.CELERY_ACCEPT_CONTENT = ['json'] + app.conf.CELERY_ROUTES = (SiteRouter(),) app.conf.CELERYBEAT_SCHEDULE = { 'scheduler': { 'task': 'frappe.tasks.enqueue_scheduler_events', 'schedule': timedelta(seconds=conf['scheduler_interval']) }, + 'sync_queues': { + 'task': 'frappe.tasks.sync_queues', + 'schedule': timedelta(seconds=conf['scheduler_interval']) + }, } app.conf.CELERY_TIMEZONE = 'UTC' diff --git a/frappe/tasks.py b/frappe/tasks.py index 72581d6aa5..949586466e 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -1,29 +1,41 @@ from __future__ import unicode_literals -from frappe.celery import app +from frappe.celery_app import app import frappe from frappe.cli import get_sites from frappe.utils.file_lock import delete_lock +from celery.utils.log import get_task_logger + +logger = get_task_logger(__name__) + +@app.task() +def sync_queues(): + sites = get_sites() + for site in sites: + app.control.broadcast('add_consumer', arguments={'queue': site}, reply=True) + @app.task() def scheduler_task(site, event, handler): from frappe.utils.scheduler import log traceback = "" try: frappe.init(site=site) + frappe.connect() frappe.get_attr(handler)() frappe.conn.commit() delete_lock(handler) except Exception: traceback += log("Method: {event}, Handler: {handler}".format(event=event, handler=handler)) traceback += log(frappe.get_traceback()) + logger.warn(traceback) frappe.conn.rollback() frappe.destroy() - # TODO: use logger - print 'ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event) + logger.info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event)) @app.task() def enqueue_scheduler_events(): import frappe from frappe.utils.scheduler import execute + logger.info('ran {handler} for {site} for event: {event}') for site in get_sites(): execute(site) diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 2ad40dcaf1..6c094d3bc8 100644 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -2,6 +2,8 @@ # MIT License. See license.txt from __future__ import unicode_literals +import logging + """ Scheduler will call the following events from the module `startup.schedule_handler` and Control Panel (for server scripts) @@ -17,7 +19,7 @@ on the need. import frappe import frappe.utils -from frappe.utils.file_lock import create_lock, check_lock, delete_lock +from frappe.utils.file_lock import create_lock, check_lock, delete_lock, LockTimeoutError def execute(site): """ @@ -26,10 +28,13 @@ def execute(site): Database connection: Ideally it should be connected from outside, if there is no connection, it will connect from defs.py """ + logging.info('executing %', site) frappe.init(site=site) - if check_lock('scheduler'): + try: + create_lock('scheduler') + except LockTimeoutError: + frappe.destroy() return - create_lock('scheduler') from datetime import datetime format = '%Y-%m-%d %H:%M:%S'