diff --git a/frappe/celery_app.py b/frappe/celery_app.py index 25fdd96cb1..78ba1b5083 100644 --- a/frappe/celery_app.py +++ b/frappe/celery_app.py @@ -19,6 +19,7 @@ SITES_PATH = os.environ.get('SITES_PATH', '.') DEFAULT_CELERY_BROKER = "redis://localhost" DEFAULT_CELERY_BACKEND = None DEFAULT_SCHEDULER_INTERVAL = 300 +LONGJOBS_PREFIX = "longjobs@" _app = None def get_celery(): @@ -31,37 +32,49 @@ def get_celery(): backend=conf.celery_result_backend or DEFAULT_CELERY_BACKEND) setup_celery(_app, conf) - + return _app def setup_celery(app, conf): app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False, sites_path=SITES_PATH)) app.conf.CELERY_TASK_SERIALIZER = 'json' app.conf.CELERY_ACCEPT_CONTENT = ['json'] - app.conf.CELERY_ROUTES = (SiteRouter(),) - app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf) app.conf.CELERY_TIMEZONE = 'UTC' + if conf.celery_queue_per_site: + app.conf.CELERY_ROUTES = (SiteRouter(),) + + app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf) + class SiteRouter(object): def route_for_task(self, task, args=None, kwargs=None): if hasattr(frappe.local, 'site'): - return { - 'queue': frappe.local.site - } + if kwargs and kwargs.get("event", "").endswith("_long"): + get_queue(frappe.local.site, LONGJOBS_PREFIX) + else: + get_queue(frappe.local.site) + return None - + +def get_queue(site, prefix=None): + return {'queue': "{}{}".format(prefix or "", site)} + def get_beat_schedule(conf): - return { - 'scheduler': { - 'task': 'frappe.tasks.enqueue_scheduler_events', - 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) - }, - 'sync_queues': { - 'task': 'frappe.tasks.sync_queues', - 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) - } + schedule = { + 'scheduler': { + 'task': 'frappe.tasks.enqueue_scheduler_events', + 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) + }, } + if conf.celery_queue_per_site: + schedule['sync_queues'] = { + 'task': 'frappe.tasks.sync_queues', + 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) + } + + return schedule + def celery_task(*args, **kwargs): return get_celery().task(*args, **kwargs) diff --git a/frappe/cli.py b/frappe/cli.py index 0f8b39e187..797558ab63 100755 --- a/frappe/cli.py +++ b/frappe/cli.py @@ -115,6 +115,8 @@ def setup_install(parser): help="path to directory with sites") parser.add_argument("--install_app", metavar="APP-NAME", nargs=1, help="Install a new app") + parser.add_argument("--add_to_installed_apps", metavar="APP-NAME", nargs="*", + help="Add these app(s) to Installed Apps") parser.add_argument("--root-password", nargs=1, help="Root password for new app") parser.add_argument("--reinstall", default=False, action="store_true", @@ -265,6 +267,16 @@ def install_app(app_name, verbose=False): install_app(app_name, verbose=verbose) frappe.destroy() +@cmd +def add_to_installed_apps(*apps): + from frappe.installer import add_to_installed_apps + frappe.connect() + all_apps = frappe.get_all_apps(with_frappe=True) + for each in apps: + if each in all_apps: + add_to_installed_apps(each) + frappe.destroy() + @cmd def reinstall(verbose=True): install(db_name=frappe.conf.db_name, verbose=verbose, force=True, reinstall=True) @@ -491,10 +503,10 @@ def celery(arg): frappe.destroy() @cmd -def run_scheduler_event(event): +def run_scheduler_event(event, force=False): import frappe.utils.scheduler frappe.connect() - result = frappe.utils.scheduler.trigger(frappe.local.site, event, now=True) + frappe.utils.scheduler.trigger(frappe.local.site, event, now=force) frappe.destroy() # replace diff --git a/frappe/hooks.txt b/frappe/hooks.txt index f51a8b4973..8888baacc9 100644 --- a/frappe/hooks.txt +++ b/frappe/hooks.txt @@ -23,11 +23,11 @@ website_clear_cache = frappe.templates.generators.website_group.clear_cache get_desktop_icons = frappe.manage.get_desktop_icons notification_config = frappe.core.notifications.get_notification_config -scheduler_event = all:frappe.utils.email_lib.bulk.flush -scheduler_event = daily:frappe.utils.email_lib.bulk.clear_outbox -scheduler_event = daily:frappe.core.doctype.notification_count.notification_count.delete_event_notification_count -scheduler_event = daily:frappe.core.doctype.event.event.send_event_digest -scheduler_event = hourly:frappe.templates.generator.website_group.clear_event_cache +scheduler_event:all = frappe.utils.email_lib.bulk.flush +scheduler_event:daily = frappe.utils.email_lib.bulk.clear_outbox +scheduler_event:daily = frappe.core.doctype.notification_count.notification_count.delete_event_notification_count +scheduler_event:daily = frappe.core.doctype.event.event.send_event_digest +scheduler_event:hourly = frappe.templates.generators.website_group.clear_event_cache # TODO # on_session_creation = frappe.auth.notify_administrator_login diff --git a/frappe/modules/patch_handler.py b/frappe/modules/patch_handler.py index c3e0629601..0ff2978246 100644 --- a/frappe/modules/patch_handler.py +++ b/frappe/modules/patch_handler.py @@ -104,7 +104,7 @@ def executed(patchmodule): else: done = frappe.db.get_value("Patch Log", {"patch": patchmodule}) if done: - print "Patch %s executed in %s" % (patchmodule, frappe.db.cur_db_name) + print "Patch %s already executed in %s" % (patchmodule, frappe.db.cur_db_name) return done def block_user(block): diff --git a/frappe/tasks.py b/frappe/tasks.py index 8a495102b9..0d87929412 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -4,17 +4,37 @@ from __future__ import unicode_literals import frappe from frappe.utils.scheduler import enqueue_events -from frappe.celery_app import get_celery, celery_task, task_logger +from frappe.celery_app import get_celery, celery_task, task_logger, get_queue, LONGJOBS_PREFIX from frappe.cli import get_sites from frappe.utils.file_lock import delete_lock @celery_task() def sync_queues(): """notifies workers to monitor newly added sites""" - sites = get_sites() app = get_celery() - for site in sites: - app.control.broadcast('add_consumer', arguments={'queue': site}, reply=True) + shortjob_workers, longjob_workers = get_workers(app) + + for site in get_sites(): + if shortjob_workers: + app.control.broadcast('add_consumer', arguments=get_queue(site), + reply=True, destination=shortjob_workers) + + if longjob_workers: + app.control.broadcast('add_consumer', arguments=get_queue(site, LONGJOBS_PREFIX), + reply=True, destination=longjob_workers) + +def get_workers(app): + longjob_workers = [] + shortjob_workers = [] + + active_queues = app.control.inspect().active_queues() + for worker in active_queues.keys(): + if worker.startswith(LONGJOBS_PREFIX): + longjob_workers.append(worker) + else: + shortjob_workers.append(worker) + + return shortjob_workers, longjob_workers @celery_task() def scheduler_task(site, event, handler, now=False): diff --git a/frappe/utils/__init__.py b/frappe/utils/__init__.py index 69a09d6cbf..b868ea14b9 100644 --- a/frappe/utils/__init__.py +++ b/frappe/utils/__init__.py @@ -817,7 +817,7 @@ def get_files_path(): return get_site_path("public", "files") def get_backups_path(): - return get_site_path("public", "backup") + return get_site_path("private", "backups") def get_url(uri=None): url = get_request_site_address() diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 1d59dac2e8..aa2db5608a 100644 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -55,13 +55,16 @@ def enqueue_applicable_events(site, nowtime, last): if nowtime.day != last.day: # if first task of the day execute daily tasks - trigger(site, 'daily') and _log("daily") + trigger(site, "daily") and _log("daily") + trigger(site, "daily_long") and _log("daily_long") if nowtime.month != last.month: trigger(site, "monthly") and _log("monthly") + trigger(site, "monthly_long") and _log("monthly_long") if nowtime.weekday()==0: trigger(site, "weekly") and _log("weekly") + trigger(site, "weekly_long") and _log("weekly_long") if nowtime.hour != last.hour: trigger(site, "hourly") and _log("hourly") @@ -75,19 +78,16 @@ def trigger(site, event, now=False): from frappe.tasks import scheduler_task sites_path = unicode(frappe.local.sites_path) - for scheduler_event in frappe.get_hooks().scheduler_event: - event_name, handler = scheduler_event.split(":") - if event==event_name and not check_lock(handler): + for handler in frappe.get_hooks("scheduler_event:{}".format(event)): + if not check_lock(handler): if not now: - result = scheduler_task.delay(site, event, handler) + scheduler_task.delay(site=site, event=event, handler=handler) create_lock(handler) else: frappe.init(site, sites_path=sites_path) create_lock(handler) - result = scheduler_task(site, event, handler) + scheduler_task(site=site, event=event, handler=handler) - return result - def log(method, message=None): """log error in patch_log""" message = frappe.utils.cstr(message) + "\n" if message else "" diff --git a/frappe/website/js/website.js b/frappe/website/js/website.js index af57bc8ac6..2c57f5c2a3 100644 --- a/frappe/website/js/website.js +++ b/frappe/website/js/website.js @@ -236,7 +236,7 @@ $.extend(frappe, { return // our custom logic - if (link.href.indexOf("cmd=")!==-1 || link.hasAttribute("dont-use-ajax")) + if (link.href.indexOf("cmd=")!==-1 || link.hasAttribute("no-pjax")) return event.preventDefault() @@ -461,7 +461,7 @@ $(document).ready(function() { // switch to app link if(getCookie("system_user")==="yes") { $("#website-post-login .dropdown-menu").append('
  • \ -
  • Switch To App
  • '); +
  • Switch To App
  • '); } frappe.render_user();