diff --git a/frappe/async.py b/frappe/async.py index 088407e287..7c77c799f6 100644 --- a/frappe/async.py +++ b/frappe/async.py @@ -17,7 +17,6 @@ END_LINE = '' TASK_LOG_MAX_AGE = 86400 # 1 day in seconds redis_server = None - def handler(f): cmd = f.__module__ + '.' + f.__name__ @@ -27,8 +26,8 @@ def handler(f): if frappe.conf.disable_async: return execute_cmd(cmd, from_async=True) args = frappe._dict(args) - task = run_async_task.delay(frappe.local.site, - (frappe.session and frappe.session.user) or 'Administrator', cmd, args) + task = run_async_task.delay(site=frappe.local.site, + user=(frappe.session and frappe.session.user) or 'Administrator', cmd=cmd, form_dict=args) if set_in_response: frappe.local.response['task_id'] = task.id return task.id @@ -39,9 +38,9 @@ def handler(f): from frappe.handler import execute_cmd if frappe.conf.disable_async: return execute_cmd(cmd, from_async=True) - task = run_async_task.delay(frappe.local.site, - (frappe.session and frappe.session.user) or 'Administrator', cmd, - frappe.local.form_dict) + task = run_async_task.delay(site=frappe.local.site, + user=(frappe.session and frappe.session.user) or 'Administrator', cmd=cmd, + form_dict=frappe.local.form_dict) frappe.local.response['task_id'] = task.id return { "status": "queued", @@ -55,19 +54,6 @@ def handler(f): return queue -def run_async_task(method, args, reference_doctype=None, reference_name=None, set_in_response=True): - if frappe.local.request and frappe.local.request.method == "GET": - frappe.throw("Cannot run task in a GET request") - task_id = method.run(args, set_in_response=set_in_response) - task = frappe.new_doc("Async Task") - task.celery_task_id = task_id - task.status = "Queued" - task.reference_doctype = reference_doctype - task.reference_name = reference_name - task.save() - return task_id - - @frappe.whitelist() def get_pending_tasks_for_doc(doctype, docname): return frappe.db.sql_list("select name from `tabAsync Task` where status in ('Queued', 'Running') and reference_doctype='%s' and reference_name='%s'" % (doctype, docname)) @@ -76,10 +62,9 @@ def get_pending_tasks_for_doc(doctype, docname): @handler def ping(): from time import sleep - sleep(6) + sleep(1) return "pong" - @frappe.whitelist() def get_task_status(task_id): from frappe.celery_app import get_celery @@ -93,7 +78,6 @@ def get_task_status(task_id): def set_task_status(task_id, status, response=None): - frappe.db.set_value("Async Task", task_id, "status", status) if not response: response = {} response.update({ diff --git a/frappe/celery_app.py b/frappe/celery_app.py index 1165898c66..49934a4dc2 100644 --- a/frappe/celery_app.py +++ b/frappe/celery_app.py @@ -10,8 +10,9 @@ task_logger = get_task_logger(__name__) from datetime import timedelta import frappe -import json import os +import threading +import time SITES_PATH = os.environ.get('SITES_PATH', '.') @@ -26,35 +27,36 @@ _app = None def get_celery(): global _app if not _app: - conf = frappe.get_site_config(sites_path=SITES_PATH) - - _app = Celery('frappe', - broker=conf.celery_broker or DEFAULT_CELERY_BROKER, - backend=conf.async_redis_server or DEFAULT_CELERY_BACKEND) - - setup_celery(_app, conf) - + _app = get_celery_app() return _app - -def setup_celery(app, conf): + +def get_celery_app(): + conf = frappe.get_site_config(sites_path=SITES_PATH) + app = Celery('frappe', + broker=conf.celery_broker or DEFAULT_CELERY_BROKER, + backend=conf.async_redis_server or DEFAULT_CELERY_BACKEND) + app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False, sites_path=SITES_PATH)) + app.conf.CELERY_SEND_EVENTS = True app.conf.CELERY_TASK_SERIALIZER = 'json' app.conf.CELERY_ACCEPT_CONTENT = ['json'] app.conf.CELERY_TIMEZONE = 'UTC' app.conf.CELERY_RESULT_SERIALIZER = 'json' - app.CELERY_TASK_RESULT_EXPIRES = timedelta(0, 3600) - + app.conf.CELERY_TASK_RESULT_EXPIRES = timedelta(0, 3600) + if conf.celery_queue_per_site: app.conf.CELERY_ROUTES = (SiteRouter(), AsyncTaskRouter()) - + app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf) if conf.celery_error_emails: app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True for k, v in conf.celery_error_emails.iteritems(): setattr(app.conf, k, v) - + + return app + class SiteRouter(object): def route_for_task(self, task, args=None, kwargs=None): if hasattr(frappe.local, 'site'): @@ -62,17 +64,17 @@ class SiteRouter(object): return get_queue(frappe.local.site, LONGJOBS_PREFIX) else: return get_queue(frappe.local.site) - + return None class AsyncTaskRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task == "frappe.tasks.run_async_task" and hasattr(frappe.local, 'site'): return get_queue(frappe.local.site, ASYNC_TASKS_PREFIX) - + def get_queue(site, prefix=None): return {'queue': "{}{}".format(prefix or "", site)} - + def get_beat_schedule(conf): schedule = { 'scheduler': { @@ -80,17 +82,116 @@ def get_beat_schedule(conf): 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) }, } - + if conf.celery_queue_per_site: schedule['sync_queues'] = { 'task': 'frappe.tasks.sync_queues', 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) } - + return schedule - + def celery_task(*args, **kwargs): - return get_celery().task(*args, **kwargs) - + func = get_celery().task(*args, **kwargs) + return func + +def make_async_task(args): + task = frappe.new_doc("Async Task") + task.update(args) + task.status = "Queued" + task.set_docstatus_user_and_timestamp() + task.db_insert() + +def run_test(): + result = test.delay(site=frappe.local.site) + result = result.get(propagate=False) + print result + +@celery_task() +def test(site=None): + time.sleep(2) + raise Exception + print "task" + +class MonitorThread(object): + def __init__(self, celery_app, interval=1): + self.celery_app = celery_app + self.interval = interval + + self.state = self.celery_app.events.State() + + self.thread = threading.Thread(target=self.run, args=()) + self.thread.daemon = True + self.thread.start() + + def catchall(self, event): + if event['type'] != 'worker-heartbeat': + self.state.event(event) + + if not 'uuid' in event: + return + + task = self.state.tasks.get(event['uuid']) + info = task.info() + + if 'enqueue_events_for_site' in event['name']: + # don't log enqueue events + return + + try: + kwargs = eval(info.get('kwargs')) + + if 'site' in kwargs: + frappe.connect(kwargs['site']) + + if event['type']=='task-received': + make_async_task({'name': event['uuid'], 'task_name': event['name']}) + + if event['type']=='task-succeeded': + task = frappe.get_doc("Async Task", event['uuid']) + task.status = 'Succeeded' + task.result = info.get('result') + task.runtime = info.get('runtime') + task.set_docstatus_user_and_timestamp() + task.db_update() + + if event['type']=='task-failed': + task = frappe.get_doc("Async Task", event['uuid']) + task.status = 'Failed' + task.traceback = event.get('traceback') or event.get('exception') + task.traceback = frappe.as_json(info) + "\n\n" + task.traceback + task.runtime = info.get('runtime') + task.set_docstatus_user_and_timestamp() + task.db_update() + + frappe.db.commit() + except Exception: + print frappe.get_traceback() + finally: + frappe.destroy() + + + def run(self): + + while True: + try: + with self.celery_app.connection() as connection: + recv = self.celery_app.events.Receiver(connection, handlers={ + '*': self.catchall + }) + recv.capture(limit=None, timeout=None, wakeup=True) + + except (KeyboardInterrupt, SystemExit): + raise + + except Exception: + # unable to capture + pass + + time.sleep(self.interval) + + if __name__ == '__main__': - get_celery().start() + app = get_celery() + MonitorThread(app) + app.start() diff --git a/frappe/core/doctype/async_task/async_task.json b/frappe/core/doctype/async_task/async_task.json index 2223088826..330596ccd2 100644 --- a/frappe/core/doctype/async_task/async_task.json +++ b/frappe/core/doctype/async_task/async_task.json @@ -2,35 +2,13 @@ "allow_copy": 0, "allow_import": 0, "allow_rename": 0, - "autoname": "field:celery_task_id", + "autoname": "", "creation": "2015-07-03 11:28:03.496346", "custom": 0, "docstatus": 0, "doctype": "DocType", "document_type": "Document", "fields": [ - { - "allow_on_submit": 0, - "bold": 0, - "collapsible": 0, - "fieldname": "celery_task_id", - "fieldtype": "Data", - "hidden": 0, - "ignore_user_permissions": 0, - "in_filter": 0, - "in_list_view": 1, - "label": "Celery Task ID", - "no_copy": 0, - "permlevel": 0, - "precision": "", - "print_hide": 0, - "read_only": 0, - "report_hide": 0, - "reqd": 0, - "search_index": 0, - "set_only_once": 0, - "unique": 0 - }, { "allow_on_submit": 0, "bold": 0, @@ -43,7 +21,116 @@ "in_list_view": 0, "label": "Status", "no_copy": 0, - "options": "\nQueued\nRunning\nFinished\nFailed\n", + "options": "\nQueued\nRunning\nSucceeded\nFailed\n", + "permlevel": 0, + "precision": "", + "print_hide": 0, + "read_only": 1, + "report_hide": 0, + "reqd": 0, + "search_index": 0, + "set_only_once": 0, + "unique": 0 + }, + { + "allow_on_submit": 0, + "bold": 0, + "collapsible": 0, + "fieldname": "task_name", + "fieldtype": "Data", + "hidden": 0, + "ignore_user_permissions": 0, + "in_filter": 0, + "in_list_view": 0, + "label": "Task Name", + "no_copy": 0, + "permlevel": 0, + "precision": "", + "print_hide": 0, + "read_only": 1, + "report_hide": 0, + "reqd": 0, + "search_index": 0, + "set_only_once": 0, + "unique": 0 + }, + { + "allow_on_submit": 0, + "bold": 0, + "collapsible": 0, + "fieldname": "runtime", + "fieldtype": "Data", + "hidden": 0, + "ignore_user_permissions": 0, + "in_filter": 0, + "in_list_view": 1, + "label": "Runtime", + "no_copy": 0, + "permlevel": 0, + "precision": "", + "print_hide": 0, + "read_only": 1, + "report_hide": 0, + "reqd": 0, + "search_index": 0, + "set_only_once": 0, + "unique": 0 + }, + { + "allow_on_submit": 0, + "bold": 0, + "collapsible": 0, + "fieldname": "result", + "fieldtype": "Code", + "hidden": 0, + "ignore_user_permissions": 0, + "in_filter": 0, + "in_list_view": 0, + "label": "Result", + "no_copy": 0, + "permlevel": 0, + "precision": "", + "print_hide": 0, + "read_only": 1, + "report_hide": 0, + "reqd": 0, + "search_index": 0, + "set_only_once": 0, + "unique": 0 + }, + { + "allow_on_submit": 0, + "bold": 0, + "collapsible": 0, + "fieldname": "traceback", + "fieldtype": "Code", + "hidden": 0, + "ignore_user_permissions": 0, + "in_filter": 0, + "in_list_view": 0, + "label": "Traceback", + "no_copy": 0, + "permlevel": 0, + "precision": "", + "print_hide": 0, + "read_only": 1, + "report_hide": 0, + "reqd": 0, + "search_index": 0, + "set_only_once": 0, + "unique": 0 + }, + { + "allow_on_submit": 0, + "bold": 0, + "collapsible": 0, + "fieldname": "section_break_6", + "fieldtype": "Section Break", + "hidden": 0, + "ignore_user_permissions": 0, + "in_filter": 0, + "in_list_view": 0, + "no_copy": 0, "permlevel": 0, "precision": "", "print_hide": 0, @@ -54,50 +141,6 @@ "set_only_once": 0, "unique": 0 }, - { - "allow_on_submit": 0, - "bold": 0, - "collapsible": 0, - "fieldname": "stdout", - "fieldtype": "Long Text", - "hidden": 0, - "ignore_user_permissions": 0, - "in_filter": 0, - "in_list_view": 0, - "label": "stdout", - "no_copy": 0, - "permlevel": 0, - "precision": "", - "print_hide": 0, - "read_only": 1, - "report_hide": 0, - "reqd": 0, - "search_index": 0, - "set_only_once": 0, - "unique": 0 - }, - { - "allow_on_submit": 0, - "bold": 0, - "collapsible": 0, - "fieldname": "stderr", - "fieldtype": "Long Text", - "hidden": 0, - "ignore_user_permissions": 0, - "in_filter": 0, - "in_list_view": 0, - "label": "stderr", - "no_copy": 0, - "permlevel": 0, - "precision": "", - "print_hide": 0, - "read_only": 1, - "report_hide": 0, - "reqd": 0, - "search_index": 0, - "set_only_once": 0, - "unique": 0 - }, { "allow_on_submit": 0, "bold": 0, @@ -114,7 +157,7 @@ "permlevel": 0, "precision": "", "print_hide": 0, - "read_only": 0, + "read_only": 1, "report_hide": 0, "reqd": 0, "search_index": 0, @@ -137,7 +180,7 @@ "permlevel": 0, "precision": "", "print_hide": 0, - "read_only": 0, + "read_only": 1, "report_hide": 0, "reqd": 0, "search_index": 0, @@ -152,7 +195,7 @@ "is_submittable": 0, "issingle": 0, "istable": 0, - "modified": "2015-07-28 16:18:11.344349", + "modified": "2015-09-07 08:08:22.193911", "modified_by": "Administrator", "module": "Core", "name": "Async Task", @@ -183,5 +226,6 @@ "read_only": 0, "read_only_onload": 0, "sort_field": "modified", - "sort_order": "DESC" + "sort_order": "DESC", + "title_field": "task_name" } \ No newline at end of file diff --git a/frappe/core/doctype/async_task/async_task_list.js b/frappe/core/doctype/async_task/async_task_list.js new file mode 100644 index 0000000000..699da13084 --- /dev/null +++ b/frappe/core/doctype/async_task/async_task_list.js @@ -0,0 +1,10 @@ +frappe.listview_settings['Async Task'] = { + add_fields: ["status"], + get_indicator: function(doc) { + if(doc.status==="Succeeded") { + return [__("Succeeded"), "green", "status,=,Succeeded"]; + } else if(doc.status==="Failed") { + return [__("Failed"), "red", "status,=,Failed"]; + } + } +}; diff --git a/frappe/core/page/data_import_tool/importer.py b/frappe/core/page/data_import_tool/importer.py index 5bd5d73a91..e32fe9e1c2 100644 --- a/frappe/core/page/data_import_tool/importer.py +++ b/frappe/core/page/data_import_tool/importer.py @@ -15,8 +15,8 @@ from frappe.utils.dateutils import parse_date from frappe.utils import cint, cstr, flt from frappe.core.page.data_import_tool.data_import_tool import get_data_keys -#@frappe.async.handler -@frappe.whitelist() +@frappe.async.handler +#frappe.whitelist() def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False, overwrite=None, ignore_links=False, pre_process=None): """upload data""" diff --git a/frappe/model/document.py b/frappe/model/document.py index c945c3394b..0e3ab3a72d 100644 --- a/frappe/model/document.py +++ b/frappe/model/document.py @@ -167,7 +167,7 @@ class Document(BaseDocument): self.check_permission("create") self._set_defaults() - self._set_docstatus_user_and_timestamp() + self.set_docstatus_user_and_timestamp() self.check_if_latest() self.run_method("before_insert") self.set_new_name() @@ -218,7 +218,7 @@ class Document(BaseDocument): self.check_permission("write", "save") - self._set_docstatus_user_and_timestamp() + self.set_docstatus_user_and_timestamp() self.check_if_latest() self.set_parent_in_children() self.validate_higher_perm_levels() @@ -298,7 +298,7 @@ class Document(BaseDocument): if self.doctype in frappe.db.value_cache: del frappe.db.value_cache[self.doctype] - def _set_docstatus_user_and_timestamp(self): + def set_docstatus_user_and_timestamp(self): self._original_modified = self.modified self.modified = now() self.modified_by = frappe.session.user diff --git a/frappe/public/js/frappe/socket.js b/frappe/public/js/frappe/socket.js index 511cf28e68..e088a9138d 100644 --- a/frappe/public/js/frappe/socket.js +++ b/frappe/public/js/frappe/socket.js @@ -127,5 +127,7 @@ frappe.socket = { frappe.provide("frappe.realtime"); frappe.realtime.on = function(event, callback) { - frappe.socket.socket.on(event, callback); + if(frappe.socket.socket) { + frappe.socket.socket.on(event, callback); + } } diff --git a/frappe/tasks.py b/frappe/tasks.py index 467415a93b..a3732c8da6 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -137,7 +137,7 @@ def pull_from_email_account(site, email_account): frappe.destroy() @celery_task(bind=True) -def run_async_task(self, site, user, cmd, form_dict): +def run_async_task(self, site=None, user=None, cmd=None, form_dict=None): ret = {} frappe.init(site) frappe.connect() diff --git a/frappe/templates/pages/desk.html b/frappe/templates/pages/desk.html index 6887deb16c..78e14b37bb 100644 --- a/frappe/templates/pages/desk.html +++ b/frappe/templates/pages/desk.html @@ -19,7 +19,7 @@ {%- endfor -%}
-