Fixes in Celery and Scheduler Hooks
This commit is contained in:
parent
b18ab9c7df
commit
7abc5b5e41
8 changed files with 84 additions and 39 deletions
|
|
@ -19,6 +19,7 @@ SITES_PATH = os.environ.get('SITES_PATH', '.')
|
|||
DEFAULT_CELERY_BROKER = "redis://localhost"
|
||||
DEFAULT_CELERY_BACKEND = None
|
||||
DEFAULT_SCHEDULER_INTERVAL = 300
|
||||
LONGJOBS_PREFIX = "longjobs@"
|
||||
|
||||
_app = None
|
||||
def get_celery():
|
||||
|
|
@ -31,37 +32,49 @@ def get_celery():
|
|||
backend=conf.celery_result_backend or DEFAULT_CELERY_BACKEND)
|
||||
|
||||
setup_celery(_app, conf)
|
||||
|
||||
|
||||
return _app
|
||||
|
||||
def setup_celery(app, conf):
|
||||
app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False, sites_path=SITES_PATH))
|
||||
app.conf.CELERY_TASK_SERIALIZER = 'json'
|
||||
app.conf.CELERY_ACCEPT_CONTENT = ['json']
|
||||
app.conf.CELERY_ROUTES = (SiteRouter(),)
|
||||
app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf)
|
||||
app.conf.CELERY_TIMEZONE = 'UTC'
|
||||
|
||||
if conf.celery_queue_per_site:
|
||||
app.conf.CELERY_ROUTES = (SiteRouter(),)
|
||||
|
||||
app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf)
|
||||
|
||||
class SiteRouter(object):
|
||||
def route_for_task(self, task, args=None, kwargs=None):
|
||||
if hasattr(frappe.local, 'site'):
|
||||
return {
|
||||
'queue': frappe.local.site
|
||||
}
|
||||
if kwargs and kwargs.get("event", "").endswith("_long"):
|
||||
get_queue(frappe.local.site, LONGJOBS_PREFIX)
|
||||
else:
|
||||
get_queue(frappe.local.site)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_queue(site, prefix=None):
|
||||
return {'queue': "{}{}".format(prefix or "", site)}
|
||||
|
||||
def get_beat_schedule(conf):
|
||||
return {
|
||||
'scheduler': {
|
||||
'task': 'frappe.tasks.enqueue_scheduler_events',
|
||||
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
|
||||
},
|
||||
'sync_queues': {
|
||||
'task': 'frappe.tasks.sync_queues',
|
||||
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
|
||||
}
|
||||
schedule = {
|
||||
'scheduler': {
|
||||
'task': 'frappe.tasks.enqueue_scheduler_events',
|
||||
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
|
||||
},
|
||||
}
|
||||
|
||||
if conf.celery_queue_per_site:
|
||||
schedule['sync_queues'] = {
|
||||
'task': 'frappe.tasks.sync_queues',
|
||||
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
|
||||
}
|
||||
|
||||
return schedule
|
||||
|
||||
def celery_task(*args, **kwargs):
|
||||
return get_celery().task(*args, **kwargs)
|
||||
|
||||
|
|
|
|||
|
|
@ -115,6 +115,8 @@ def setup_install(parser):
|
|||
help="path to directory with sites")
|
||||
parser.add_argument("--install_app", metavar="APP-NAME", nargs=1,
|
||||
help="Install a new app")
|
||||
parser.add_argument("--add_to_installed_apps", metavar="APP-NAME", nargs="*",
|
||||
help="Add these app(s) to Installed Apps")
|
||||
parser.add_argument("--root-password", nargs=1,
|
||||
help="Root password for new app")
|
||||
parser.add_argument("--reinstall", default=False, action="store_true",
|
||||
|
|
@ -265,6 +267,16 @@ def install_app(app_name, verbose=False):
|
|||
install_app(app_name, verbose=verbose)
|
||||
frappe.destroy()
|
||||
|
||||
@cmd
|
||||
def add_to_installed_apps(*apps):
|
||||
from frappe.installer import add_to_installed_apps
|
||||
frappe.connect()
|
||||
all_apps = frappe.get_all_apps(with_frappe=True)
|
||||
for each in apps:
|
||||
if each in all_apps:
|
||||
add_to_installed_apps(each)
|
||||
frappe.destroy()
|
||||
|
||||
@cmd
|
||||
def reinstall(verbose=True):
|
||||
install(db_name=frappe.conf.db_name, verbose=verbose, force=True, reinstall=True)
|
||||
|
|
@ -491,10 +503,10 @@ def celery(arg):
|
|||
frappe.destroy()
|
||||
|
||||
@cmd
|
||||
def run_scheduler_event(event):
|
||||
def run_scheduler_event(event, force=False):
|
||||
import frappe.utils.scheduler
|
||||
frappe.connect()
|
||||
result = frappe.utils.scheduler.trigger(frappe.local.site, event, now=True)
|
||||
frappe.utils.scheduler.trigger(frappe.local.site, event, now=force)
|
||||
frappe.destroy()
|
||||
|
||||
# replace
|
||||
|
|
|
|||
|
|
@ -23,11 +23,11 @@ website_clear_cache = frappe.templates.generators.website_group.clear_cache
|
|||
get_desktop_icons = frappe.manage.get_desktop_icons
|
||||
notification_config = frappe.core.notifications.get_notification_config
|
||||
|
||||
scheduler_event = all:frappe.utils.email_lib.bulk.flush
|
||||
scheduler_event = daily:frappe.utils.email_lib.bulk.clear_outbox
|
||||
scheduler_event = daily:frappe.core.doctype.notification_count.notification_count.delete_event_notification_count
|
||||
scheduler_event = daily:frappe.core.doctype.event.event.send_event_digest
|
||||
scheduler_event = hourly:frappe.templates.generator.website_group.clear_event_cache
|
||||
scheduler_event:all = frappe.utils.email_lib.bulk.flush
|
||||
scheduler_event:daily = frappe.utils.email_lib.bulk.clear_outbox
|
||||
scheduler_event:daily = frappe.core.doctype.notification_count.notification_count.delete_event_notification_count
|
||||
scheduler_event:daily = frappe.core.doctype.event.event.send_event_digest
|
||||
scheduler_event:hourly = frappe.templates.generators.website_group.clear_event_cache
|
||||
|
||||
# TODO
|
||||
# on_session_creation = frappe.auth.notify_administrator_login
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ def executed(patchmodule):
|
|||
else:
|
||||
done = frappe.db.get_value("Patch Log", {"patch": patchmodule})
|
||||
if done:
|
||||
print "Patch %s executed in %s" % (patchmodule, frappe.db.cur_db_name)
|
||||
print "Patch %s already executed in %s" % (patchmodule, frappe.db.cur_db_name)
|
||||
return done
|
||||
|
||||
def block_user(block):
|
||||
|
|
|
|||
|
|
@ -4,17 +4,37 @@
|
|||
from __future__ import unicode_literals
|
||||
import frappe
|
||||
from frappe.utils.scheduler import enqueue_events
|
||||
from frappe.celery_app import get_celery, celery_task, task_logger
|
||||
from frappe.celery_app import get_celery, celery_task, task_logger, get_queue, LONGJOBS_PREFIX
|
||||
from frappe.cli import get_sites
|
||||
from frappe.utils.file_lock import delete_lock
|
||||
|
||||
@celery_task()
|
||||
def sync_queues():
|
||||
"""notifies workers to monitor newly added sites"""
|
||||
sites = get_sites()
|
||||
app = get_celery()
|
||||
for site in sites:
|
||||
app.control.broadcast('add_consumer', arguments={'queue': site}, reply=True)
|
||||
shortjob_workers, longjob_workers = get_workers(app)
|
||||
|
||||
for site in get_sites():
|
||||
if shortjob_workers:
|
||||
app.control.broadcast('add_consumer', arguments=get_queue(site),
|
||||
reply=True, destination=shortjob_workers)
|
||||
|
||||
if longjob_workers:
|
||||
app.control.broadcast('add_consumer', arguments=get_queue(site, LONGJOBS_PREFIX),
|
||||
reply=True, destination=longjob_workers)
|
||||
|
||||
def get_workers(app):
|
||||
longjob_workers = []
|
||||
shortjob_workers = []
|
||||
|
||||
active_queues = app.control.inspect().active_queues()
|
||||
for worker in active_queues.keys():
|
||||
if worker.startswith(LONGJOBS_PREFIX):
|
||||
longjob_workers.append(worker)
|
||||
else:
|
||||
shortjob_workers.append(worker)
|
||||
|
||||
return shortjob_workers, longjob_workers
|
||||
|
||||
@celery_task()
|
||||
def scheduler_task(site, event, handler, now=False):
|
||||
|
|
|
|||
|
|
@ -817,7 +817,7 @@ def get_files_path():
|
|||
return get_site_path("public", "files")
|
||||
|
||||
def get_backups_path():
|
||||
return get_site_path("public", "backup")
|
||||
return get_site_path("private", "backups")
|
||||
|
||||
def get_url(uri=None):
|
||||
url = get_request_site_address()
|
||||
|
|
|
|||
|
|
@ -55,13 +55,16 @@ def enqueue_applicable_events(site, nowtime, last):
|
|||
|
||||
if nowtime.day != last.day:
|
||||
# if first task of the day execute daily tasks
|
||||
trigger(site, 'daily') and _log("daily")
|
||||
trigger(site, "daily") and _log("daily")
|
||||
trigger(site, "daily_long") and _log("daily_long")
|
||||
|
||||
if nowtime.month != last.month:
|
||||
trigger(site, "monthly") and _log("monthly")
|
||||
trigger(site, "monthly_long") and _log("monthly_long")
|
||||
|
||||
if nowtime.weekday()==0:
|
||||
trigger(site, "weekly") and _log("weekly")
|
||||
trigger(site, "weekly_long") and _log("weekly_long")
|
||||
|
||||
if nowtime.hour != last.hour:
|
||||
trigger(site, "hourly") and _log("hourly")
|
||||
|
|
@ -75,19 +78,16 @@ def trigger(site, event, now=False):
|
|||
from frappe.tasks import scheduler_task
|
||||
sites_path = unicode(frappe.local.sites_path)
|
||||
|
||||
for scheduler_event in frappe.get_hooks().scheduler_event:
|
||||
event_name, handler = scheduler_event.split(":")
|
||||
if event==event_name and not check_lock(handler):
|
||||
for handler in frappe.get_hooks("scheduler_event:{}".format(event)):
|
||||
if not check_lock(handler):
|
||||
if not now:
|
||||
result = scheduler_task.delay(site, event, handler)
|
||||
scheduler_task.delay(site=site, event=event, handler=handler)
|
||||
create_lock(handler)
|
||||
else:
|
||||
frappe.init(site, sites_path=sites_path)
|
||||
create_lock(handler)
|
||||
result = scheduler_task(site, event, handler)
|
||||
scheduler_task(site=site, event=event, handler=handler)
|
||||
|
||||
return result
|
||||
|
||||
def log(method, message=None):
|
||||
"""log error in patch_log"""
|
||||
message = frappe.utils.cstr(message) + "\n" if message else ""
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ $.extend(frappe, {
|
|||
return
|
||||
|
||||
// our custom logic
|
||||
if (link.href.indexOf("cmd=")!==-1 || link.hasAttribute("dont-use-ajax"))
|
||||
if (link.href.indexOf("cmd=")!==-1 || link.hasAttribute("no-pjax"))
|
||||
return
|
||||
|
||||
event.preventDefault()
|
||||
|
|
@ -461,7 +461,7 @@ $(document).ready(function() {
|
|||
// switch to app link
|
||||
if(getCookie("system_user")==="yes") {
|
||||
$("#website-post-login .dropdown-menu").append('<li class="divider"></li>\
|
||||
<li><a href="/app" dont-use-ajax><i class="icon-fixed-width icon-th-large"></i> Switch To App</a></li>');
|
||||
<li><a href="/app" no-pjax><i class="icon-fixed-width icon-th-large"></i> Switch To App</a></li>');
|
||||
}
|
||||
|
||||
frappe.render_user();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue