diff --git a/frappe/tasks.py b/frappe/tasks.py index df6e72eee3..b313078bd8 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -17,11 +17,11 @@ def sync_queues(): if shortjob_workers: for worker in shortjob_workers: sync_worker(app, worker) - + if longjob_workers: for worker in longjob_workers: sync_worker(app, worker, prefix=LONGJOBS_PREFIX) - + def get_workers(app): longjob_workers = [] shortjob_workers = [] @@ -32,7 +32,7 @@ def get_workers(app): longjob_workers.append(worker) else: shortjob_workers.append(worker) - + return shortjob_workers, longjob_workers def sync_worker(app, worker, prefix=''): @@ -70,19 +70,19 @@ def scheduler_task(site, event, handler, now=False): if not now: frappe.connect(site=site) frappe.get_attr(handler)() - + except Exception: frappe.db.rollback() traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler)) task_logger.warn(traceback) raise - + else: frappe.db.commit() finally: delete_lock(handler) - + if not now: frappe.destroy() @@ -91,8 +91,12 @@ def scheduler_task(site, event, handler, now=False): @celery_task() def enqueue_scheduler_events(): for site in get_sites(): + enqueue_events_for_site.delay(site=site) + +@celery_task() +def enqueue_events_for_site(site): + try: frappe.connect(site=site) - try: - enqueue_events(site) - finally: - frappe.destroy() + enqueue_events(site) + finally: + frappe.destroy()