102 lines
2.8 KiB
Python
102 lines
2.8 KiB
Python
# Copyright (c) 2013, Web Notes Technologies Pvt. Ltd. and Contributors
|
|
# MIT License. See license.txt
|
|
|
|
from __future__ import unicode_literals
|
|
import frappe
|
|
from frappe.utils.scheduler import enqueue_events
|
|
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX
|
|
from frappe.cli import get_sites
|
|
from frappe.utils.file_lock import delete_lock
|
|
|
|
@celery_task()
|
|
def sync_queues():
|
|
"""notifies workers to monitor newly added sites"""
|
|
app = get_celery()
|
|
shortjob_workers, longjob_workers = get_workers(app)
|
|
|
|
if shortjob_workers:
|
|
for worker in shortjob_workers:
|
|
sync_worker(app, worker)
|
|
|
|
if longjob_workers:
|
|
for worker in longjob_workers:
|
|
sync_worker(app, worker, prefix=LONGJOBS_PREFIX)
|
|
|
|
def get_workers(app):
|
|
longjob_workers = []
|
|
shortjob_workers = []
|
|
|
|
active_queues = app.control.inspect().active_queues()
|
|
for worker in active_queues:
|
|
if worker.startswith(LONGJOBS_PREFIX):
|
|
longjob_workers.append(worker)
|
|
else:
|
|
shortjob_workers.append(worker)
|
|
|
|
return shortjob_workers, longjob_workers
|
|
|
|
def sync_worker(app, worker, prefix=''):
|
|
active_queues = set(get_active_queues(app, worker))
|
|
required_queues = set(get_required_queues(app, prefix=prefix))
|
|
to_add = required_queues - active_queues
|
|
to_remove = active_queues - required_queues
|
|
for queue in to_add:
|
|
app.control.broadcast('add_consumer', arguments={
|
|
'queue': queue
|
|
}, reply=True, destination=[worker])
|
|
for queue in to_remove:
|
|
app.control.broadcast('cancel_consumer', arguments={
|
|
'queue': queue
|
|
}, reply=True, destination=[worker])
|
|
|
|
|
|
def get_active_queues(app, worker):
|
|
active_queues = app.control.inspect().active_queues()
|
|
return [queue['name'] for queue in active_queues[worker]]
|
|
|
|
def get_required_queues(app, prefix=''):
|
|
ret = []
|
|
for site in get_sites():
|
|
ret.append('{}{}'.format(prefix, site))
|
|
ret.append(app.conf['CELERY_DEFAULT_QUEUE'])
|
|
return ret
|
|
|
|
@celery_task()
|
|
def scheduler_task(site, event, handler, now=False):
|
|
from frappe.utils.scheduler import log
|
|
traceback = ""
|
|
task_logger.info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
|
try:
|
|
if not now:
|
|
frappe.connect(site=site)
|
|
frappe.get_attr(handler)()
|
|
|
|
except Exception:
|
|
frappe.db.rollback()
|
|
traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
|
|
task_logger.warn(traceback)
|
|
raise
|
|
|
|
else:
|
|
frappe.db.commit()
|
|
|
|
finally:
|
|
delete_lock(handler)
|
|
|
|
if not now:
|
|
frappe.destroy()
|
|
|
|
task_logger.info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
|
|
|
|
@celery_task()
|
|
def enqueue_scheduler_events():
|
|
for site in get_sites():
|
|
enqueue_events_for_site.delay(site=site)
|
|
|
|
@celery_task()
|
|
def enqueue_events_for_site(site):
|
|
try:
|
|
frappe.connect(site=site)
|
|
enqueue_events(site)
|
|
finally:
|
|
frappe.destroy()
|