[enhancement] add celery task to Async Task for logs, tracking

This commit is contained in:
Rushabh Mehta 2015-09-07 18:09:25 +05:30
parent cd53c77587
commit eb4e622067
10 changed files with 269 additions and 128 deletions

View file

@ -17,7 +17,6 @@ END_LINE = '<!-- frappe: end-file -->'
TASK_LOG_MAX_AGE = 86400 # 1 day in seconds
redis_server = None
def handler(f):
cmd = f.__module__ + '.' + f.__name__
@ -27,8 +26,8 @@ def handler(f):
if frappe.conf.disable_async:
return execute_cmd(cmd, from_async=True)
args = frappe._dict(args)
task = run_async_task.delay(frappe.local.site,
(frappe.session and frappe.session.user) or 'Administrator', cmd, args)
task = run_async_task.delay(site=frappe.local.site,
user=(frappe.session and frappe.session.user) or 'Administrator', cmd=cmd, form_dict=args)
if set_in_response:
frappe.local.response['task_id'] = task.id
return task.id
@ -39,9 +38,9 @@ def handler(f):
from frappe.handler import execute_cmd
if frappe.conf.disable_async:
return execute_cmd(cmd, from_async=True)
task = run_async_task.delay(frappe.local.site,
(frappe.session and frappe.session.user) or 'Administrator', cmd,
frappe.local.form_dict)
task = run_async_task.delay(site=frappe.local.site,
user=(frappe.session and frappe.session.user) or 'Administrator', cmd=cmd,
form_dict=frappe.local.form_dict)
frappe.local.response['task_id'] = task.id
return {
"status": "queued",
@ -55,19 +54,6 @@ def handler(f):
return queue
def run_async_task(method, args, reference_doctype=None, reference_name=None, set_in_response=True):
if frappe.local.request and frappe.local.request.method == "GET":
frappe.throw("Cannot run task in a GET request")
task_id = method.run(args, set_in_response=set_in_response)
task = frappe.new_doc("Async Task")
task.celery_task_id = task_id
task.status = "Queued"
task.reference_doctype = reference_doctype
task.reference_name = reference_name
task.save()
return task_id
@frappe.whitelist()
def get_pending_tasks_for_doc(doctype, docname):
return frappe.db.sql_list("select name from `tabAsync Task` where status in ('Queued', 'Running') and reference_doctype='%s' and reference_name='%s'" % (doctype, docname))
@ -76,10 +62,9 @@ def get_pending_tasks_for_doc(doctype, docname):
@handler
def ping():
from time import sleep
sleep(6)
sleep(1)
return "pong"
@frappe.whitelist()
def get_task_status(task_id):
from frappe.celery_app import get_celery
@ -93,7 +78,6 @@ def get_task_status(task_id):
def set_task_status(task_id, status, response=None):
frappe.db.set_value("Async Task", task_id, "status", status)
if not response:
response = {}
response.update({

View file

@ -10,8 +10,9 @@ task_logger = get_task_logger(__name__)
from datetime import timedelta
import frappe
import json
import os
import threading
import time
SITES_PATH = os.environ.get('SITES_PATH', '.')
@ -26,35 +27,36 @@ _app = None
def get_celery():
global _app
if not _app:
conf = frappe.get_site_config(sites_path=SITES_PATH)
_app = Celery('frappe',
broker=conf.celery_broker or DEFAULT_CELERY_BROKER,
backend=conf.async_redis_server or DEFAULT_CELERY_BACKEND)
setup_celery(_app, conf)
_app = get_celery_app()
return _app
def setup_celery(app, conf):
def get_celery_app():
conf = frappe.get_site_config(sites_path=SITES_PATH)
app = Celery('frappe',
broker=conf.celery_broker or DEFAULT_CELERY_BROKER,
backend=conf.async_redis_server or DEFAULT_CELERY_BACKEND)
app.autodiscover_tasks(frappe.get_all_apps(with_frappe=True, with_internal_apps=False,
sites_path=SITES_PATH))
app.conf.CELERY_SEND_EVENTS = True
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_RESULT_SERIALIZER = 'json'
app.CELERY_TASK_RESULT_EXPIRES = timedelta(0, 3600)
app.conf.CELERY_TASK_RESULT_EXPIRES = timedelta(0, 3600)
if conf.celery_queue_per_site:
app.conf.CELERY_ROUTES = (SiteRouter(), AsyncTaskRouter())
app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf)
if conf.celery_error_emails:
app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
for k, v in conf.celery_error_emails.iteritems():
setattr(app.conf, k, v)
return app
class SiteRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if hasattr(frappe.local, 'site'):
@ -62,17 +64,17 @@ class SiteRouter(object):
return get_queue(frappe.local.site, LONGJOBS_PREFIX)
else:
return get_queue(frappe.local.site)
return None
class AsyncTaskRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == "frappe.tasks.run_async_task" and hasattr(frappe.local, 'site'):
return get_queue(frappe.local.site, ASYNC_TASKS_PREFIX)
def get_queue(site, prefix=None):
return {'queue': "{}{}".format(prefix or "", site)}
def get_beat_schedule(conf):
schedule = {
'scheduler': {
@ -80,17 +82,116 @@ def get_beat_schedule(conf):
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
},
}
if conf.celery_queue_per_site:
schedule['sync_queues'] = {
'task': 'frappe.tasks.sync_queues',
'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL)
}
return schedule
def celery_task(*args, **kwargs):
return get_celery().task(*args, **kwargs)
func = get_celery().task(*args, **kwargs)
return func
def make_async_task(args):
task = frappe.new_doc("Async Task")
task.update(args)
task.status = "Queued"
task.set_docstatus_user_and_timestamp()
task.db_insert()
def run_test():
result = test.delay(site=frappe.local.site)
result = result.get(propagate=False)
print result
@celery_task()
def test(site=None):
time.sleep(2)
raise Exception
print "task"
class MonitorThread(object):
def __init__(self, celery_app, interval=1):
self.celery_app = celery_app
self.interval = interval
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
def catchall(self, event):
if event['type'] != 'worker-heartbeat':
self.state.event(event)
if not 'uuid' in event:
return
task = self.state.tasks.get(event['uuid'])
info = task.info()
if 'enqueue_events_for_site' in event['name']:
# don't log enqueue events
return
try:
kwargs = eval(info.get('kwargs'))
if 'site' in kwargs:
frappe.connect(kwargs['site'])
if event['type']=='task-received':
make_async_task({'name': event['uuid'], 'task_name': event['name']})
if event['type']=='task-succeeded':
task = frappe.get_doc("Async Task", event['uuid'])
task.status = 'Succeeded'
task.result = info.get('result')
task.runtime = info.get('runtime')
task.set_docstatus_user_and_timestamp()
task.db_update()
if event['type']=='task-failed':
task = frappe.get_doc("Async Task", event['uuid'])
task.status = 'Failed'
task.traceback = event.get('traceback') or event.get('exception')
task.traceback = frappe.as_json(info) + "\n\n" + task.traceback
task.runtime = info.get('runtime')
task.set_docstatus_user_and_timestamp()
task.db_update()
frappe.db.commit()
except Exception:
print frappe.get_traceback()
finally:
frappe.destroy()
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.celery_app.events.Receiver(connection, handlers={
'*': self.catchall
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
# unable to capture
pass
time.sleep(self.interval)
if __name__ == '__main__':
get_celery().start()
app = get_celery()
MonitorThread(app)
app.start()

View file

@ -2,35 +2,13 @@
"allow_copy": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "field:celery_task_id",
"autoname": "",
"creation": "2015-07-03 11:28:03.496346",
"custom": 0,
"docstatus": 0,
"doctype": "DocType",
"document_type": "Document",
"fields": [
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "celery_task_id",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Celery Task ID",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
@ -43,7 +21,116 @@
"in_list_view": 0,
"label": "Status",
"no_copy": 0,
"options": "\nQueued\nRunning\nFinished\nFailed\n",
"options": "\nQueued\nRunning\nSucceeded\nFailed\n",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "task_name",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Task Name",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "runtime",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Runtime",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "result",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Result",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "traceback",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Traceback",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "section_break_6",
"fieldtype": "Section Break",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
@ -54,50 +141,6 @@
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "stdout",
"fieldtype": "Long Text",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "stdout",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "stderr",
"fieldtype": "Long Text",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "stderr",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"bold": 0,
@ -114,7 +157,7 @@
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
@ -137,7 +180,7 @@
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
@ -152,7 +195,7 @@
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"modified": "2015-07-28 16:18:11.344349",
"modified": "2015-09-07 08:08:22.193911",
"modified_by": "Administrator",
"module": "Core",
"name": "Async Task",
@ -183,5 +226,6 @@
"read_only": 0,
"read_only_onload": 0,
"sort_field": "modified",
"sort_order": "DESC"
"sort_order": "DESC",
"title_field": "task_name"
}

View file

@ -0,0 +1,10 @@
frappe.listview_settings['Async Task'] = {
add_fields: ["status"],
get_indicator: function(doc) {
if(doc.status==="Succeeded") {
return [__("Succeeded"), "green", "status,=,Succeeded"];
} else if(doc.status==="Failed") {
return [__("Failed"), "red", "status,=,Failed"];
}
}
};

View file

@ -15,8 +15,8 @@ from frappe.utils.dateutils import parse_date
from frappe.utils import cint, cstr, flt
from frappe.core.page.data_import_tool.data_import_tool import get_data_keys
#@frappe.async.handler
@frappe.whitelist()
@frappe.async.handler
#frappe.whitelist()
def upload(rows = None, submit_after_import=None, ignore_encoding_errors=False, overwrite=None,
ignore_links=False, pre_process=None):
"""upload data"""

View file

@ -167,7 +167,7 @@ class Document(BaseDocument):
self.check_permission("create")
self._set_defaults()
self._set_docstatus_user_and_timestamp()
self.set_docstatus_user_and_timestamp()
self.check_if_latest()
self.run_method("before_insert")
self.set_new_name()
@ -218,7 +218,7 @@ class Document(BaseDocument):
self.check_permission("write", "save")
self._set_docstatus_user_and_timestamp()
self.set_docstatus_user_and_timestamp()
self.check_if_latest()
self.set_parent_in_children()
self.validate_higher_perm_levels()
@ -298,7 +298,7 @@ class Document(BaseDocument):
if self.doctype in frappe.db.value_cache:
del frappe.db.value_cache[self.doctype]
def _set_docstatus_user_and_timestamp(self):
def set_docstatus_user_and_timestamp(self):
self._original_modified = self.modified
self.modified = now()
self.modified_by = frappe.session.user

View file

@ -127,5 +127,7 @@ frappe.socket = {
frappe.provide("frappe.realtime");
frappe.realtime.on = function(event, callback) {
frappe.socket.socket.on(event, callback);
if(frappe.socket.socket) {
frappe.socket.socket.on(event, callback);
}
}

View file

@ -137,7 +137,7 @@ def pull_from_email_account(site, email_account):
frappe.destroy()
@celery_task(bind=True)
def run_async_task(self, site, user, cmd, form_dict):
def run_async_task(self, site=None, user=None, cmd=None, form_dict=None):
ret = {}
frappe.init(site)
frappe.connect()

View file

@ -19,7 +19,7 @@
{%- endfor -%}
</head>
<body>
<div class="centered splash">
<div class="centered splash" style="max-width: 360px;">
<img src="{{ splash_image or "/assets/frappe/images/splash.png" }}">
</div>
<div class="offcanvas-container">

View file

@ -11,7 +11,7 @@ from frappe.tasks import run_async_task
class TestAsync(unittest.TestCase):
def test_response(self):
result = run_async_task.delay(frappe.local.site, 'Administrator', 'async_ping',
frappe._dict())
result = run_async_task.delay(site=frappe.local.site, user='Administrator', cmd='async_ping',
form_dict=frappe._dict())
result = result.get()
self.assertEquals(result.get("message"), "pong")