Celery: Enqueue events for site

This commit is contained in:
Anand Doshi 2014-05-25 09:05:02 +05:30
parent e9d09a9d05
commit ceda8f2080

View file

@ -17,11 +17,11 @@ def sync_queues():
if shortjob_workers:
for worker in shortjob_workers:
sync_worker(app, worker)
if longjob_workers:
for worker in longjob_workers:
sync_worker(app, worker, prefix=LONGJOBS_PREFIX)
def get_workers(app):
longjob_workers = []
shortjob_workers = []
@ -32,7 +32,7 @@ def get_workers(app):
longjob_workers.append(worker)
else:
shortjob_workers.append(worker)
return shortjob_workers, longjob_workers
def sync_worker(app, worker, prefix=''):
@ -70,19 +70,19 @@ def scheduler_task(site, event, handler, now=False):
if not now:
frappe.connect(site=site)
frappe.get_attr(handler)()
except Exception:
frappe.db.rollback()
traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
task_logger.warn(traceback)
raise
else:
frappe.db.commit()
finally:
delete_lock(handler)
if not now:
frappe.destroy()
@ -91,8 +91,12 @@ def scheduler_task(site, event, handler, now=False):
@celery_task()
def enqueue_scheduler_events():
for site in get_sites():
enqueue_events_for_site.delay(site=site)
@celery_task()
def enqueue_events_for_site(site):
try:
frappe.connect(site=site)
try:
enqueue_events(site)
finally:
frappe.destroy()
enqueue_events(site)
finally:
frappe.destroy()