This commit is contained in:
Pratik Vyas 2015-06-20 21:10:10 +05:30
parent 2cf5ec66f6
commit 422668a67f
23 changed files with 7600 additions and 10 deletions

View file

@ -7,6 +7,7 @@ globals attached to frappe module
from __future__ import unicode_literals
from werkzeug.local import Local, release_local
from functools import wraps
import os, importlib, inspect, logging, json
# public
@ -14,6 +15,7 @@ from frappe.__version__ import __version__
from .exceptions import *
from .utils.jinja import get_jenv, get_template, render_template
local = Local()
class _dict(dict):

View file

@ -58,13 +58,13 @@ def handle():
if frappe.local.request.method=="GET":
if not doc.has_permission("read"):
frappe.throw(_("Not permitted"), frappe.PermissionError)
doc.run_method(method, **frappe.local.form_dict)
frappe.local.response.update({"data": doc.run_method(method, **frappe.local.form_dict)})
if frappe.local.request.method=="POST":
if not doc.has_permission("write"):
frappe.throw(_("Not permitted"), frappe.PermissionError)
doc.run_method(method, **frappe.local.form_dict)
frappe.local.response.update({"data": doc.run_method(method, **frappe.local.form_dict)})
frappe.db.commit()
else:

View file

@ -12,6 +12,8 @@ from werkzeug.local import LocalManager
from werkzeug.exceptions import HTTPException, NotFound
from werkzeug.contrib.profiler import ProfilerMiddleware
from werkzeug.wsgi import SharedDataMiddleware
from werkzeug.serving import run_with_reloader
import mimetypes
import frappe
@ -20,9 +22,10 @@ import frappe.auth
import frappe.api
import frappe.utils.response
import frappe.website.render
from frappe.utils import get_site_name
from frappe.utils import get_site_name, get_site_path
from frappe.middlewares import StaticDataMiddleware
local_manager = LocalManager([frappe.local])
_site = None
@ -30,6 +33,21 @@ _sites_path = os.environ.get("SITES_PATH", ".")
logger = frappe.get_logger()
class RequestContext(object):
def __init__(self, environ):
self.request = Request(environ)
def __enter__(self):
frappe.local.request = self.request
init_site(self.request)
make_form_dict(self.request)
frappe.local.http_request = frappe.auth.HTTPRequest()
def __exit__(self, type, value, traceback):
frappe.destroy()
@Request.application
def application(request):
frappe.local.request = request
@ -135,6 +153,8 @@ def make_form_dict(request):
frappe.local.form_dict.pop("_")
application = local_manager.make_middleware(application)
application.debug = True
def serve(port=8000, profile=False, site=None, sites_path='.'):
global application, _site, _sites_path
@ -155,5 +175,10 @@ def serve(port=8000, profile=False, site=None, sites_path='.'):
b'/files': os.path.abspath(sites_path).encode("utf-8")
})
application.debug = True
application.config = {
'SERVER_NAME': 'localhost:8000'
}
run_simple('0.0.0.0', int(port), application, use_reloader=True,
use_debugger=True, use_evalex=True, threaded=True)

175
frappe/async.py Normal file
View file

@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
import os
import time
from functools import wraps
from frappe.utils import get_site_path
import json
from frappe import conf
END_LINE = '<!-- frappe: end-file -->'
TASK_LOG_MAX_AGE = 86400 # 1 day in seconds
redis_server = None
def handler(f):
cmd = f.__module__ + '.' + f.__name__
def _run(args, set_in_response=True):
from frappe.tasks import run_async_task
args = frappe._dict(args)
task = run_async_task.delay(frappe.local.site,
(frappe.session and frappe.session.user) or 'Administrator', cmd, args)
if set_in_response:
frappe.local.response['task_id'] = task.id
return task.id
@wraps(f)
def _f(*args, **kwargs):
from frappe.tasks import run_async_task
task = run_async_task.delay(frappe.local.site,
(frappe.session and frappe.session.user) or 'Administrator', cmd,
frappe.local.form_dict)
frappe.local.response['task_id'] = task.id
return {
"status": "queued",
"task_id": task.id
}
_f.async = True
_f._f = f
_f.run = _run
frappe.whitelisted.append(f)
frappe.whitelisted.append(_f)
return _f
def run_async_task(method, args, reference_doctype=None, reference_name=None, set_in_response=True):
if frappe.local.request and frappe.local.request.method == "GET":
frappe.throw("Cannot run task in a GET request")
task_id = method.run(args, set_in_response=set_in_response)
task = frappe.new_doc("Async Task")
task.celery_task_id = task_id
task.status = "Queued"
task.reference_doctype = reference_doctype
task.reference_name = reference_name
task.save()
return task_id
@frappe.whitelist()
def get_pending_tasks_for_doc(doctype, docname):
return frappe.db.sql_list("select name from `tabAsync Task` where status in ('Queued', 'Running') and reference_doctype='%s' and reference_name='%s'" % (doctype, docname))
@handler
def ping():
from time import sleep
sleep(6)
return "pong"
@frappe.whitelist()
def get_task_status(task_id):
from frappe.celery_app import get_celery
c = get_celery()
a = c.AsyncResult(task_id)
frappe.local.response['response'] = a.result
return {
"state": a.state,
"progress": 0
}
def set_task_status(task_id, status, response=None):
frappe.db.set_value("Async Task", task_id, "status", status)
if not response:
response = {}
response.update({
"status": status,
"task_id": task_id
})
emit_via_redis("task_status_change", response, room="task:" + task_id)
def remove_old_task_logs():
logs_path = get_site_path('task-logs')
def full_path(_file):
return os.path.join(logs_path, _file)
files_to_remove = [full_path(_file) for _file in os.listdir(logs_path)]
files_to_remove = [_file for _file in files_to_remove if is_file_old(_file) and os.path.isfile(_file)]
for _file in files_to_remove:
os.remove(_file)
def is_file_old(file_path):
return ((time.time() - os.stat(file_path).st_mtime) > TASK_LOG_MAX_AGE)
def emit_via_redis(event, message, room=None):
r = get_redis_server()
r.publish('events', json.dumps({'event': event, 'message': message, 'room': room}))
def put_log(task_id, line_no, line):
r = get_redis_server()
print "task_log:" + task_id
r.hset("task_log:" + task_id, line_no, line)
def get_redis_server():
"""Returns memcache connection."""
global redis_server
if not redis_server:
from redis import Redis
redis_server = Redis.from_url(conf.get("cache_redis_server") or "redis://localhost:12311")
return redis_server
class FileAndRedisStream(file):
def __init__(self, *args, **kwargs):
ret = super(FileAndRedisStream, self).__init__(*args, **kwargs)
self.count = 0
return ret
def write(self, data):
ret = super(FileAndRedisStream, self).write(data)
if frappe.local.task_id:
emit_via_redis('task_progress', {
"message": {
"lines": {self.count: data}
},
"task_id": frappe.local.task_id
}, room="task_progress:" + frappe.local.task_id)
put_log(frappe.local.task_id, self.count, data)
self.count += 1
return ret
def get_std_streams(task_id):
stdout = FileAndRedisStream(get_task_log_file_path(task_id, 'stdout'), 'w')
# stderr = FileAndRedisStream(get_task_log_file_path(task_id, 'stderr'), 'w')
return stdout, stdout
def get_task_log_file_path(task_id, stream_type):
logs_dir = frappe.utils.get_site_path('task-logs')
return os.path.join(logs_dir, task_id + '.' + stream_type)
@frappe.whitelist(allow_guest=True)
def can_subscribe_doc(doctype, docname, sid):
from frappe.sessions import Session
from frappe.exceptions import PermissionError
session = Session(None).get_session_data()
if not frappe.has_permission(user=session.user, doctype=doctype, doc=docname, ptype='read'):
raise PermissionError()
return True

View file

@ -127,7 +127,8 @@ def pack(target, sources, no_compress, verbose):
tmpin, tmpout = StringIO(data.encode('utf-8')), StringIO()
jsm.minify(tmpin, tmpout)
minified = tmpout.getvalue()
outtxt += unicode(minified or '', 'utf-8').strip('\n') + ';'
if minified:
outtxt += unicode(minified or '', 'utf-8').strip('\n') + ';'
if verbose:
print "{0}: {1}k".format(f, int(len(minified) / 1024))

View file

@ -17,7 +17,7 @@ SITES_PATH = os.environ.get('SITES_PATH', '.')
# defaults
DEFAULT_CELERY_BROKER = "redis://localhost"
DEFAULT_CELERY_BACKEND = None
DEFAULT_CELERY_BACKEND = "redis://localhost"
DEFAULT_SCHEDULER_INTERVAL = 300
LONGJOBS_PREFIX = "longjobs@"
@ -41,6 +41,7 @@ def setup_celery(app, conf):
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_RESULT_SERIALIZER = 'json'
if conf.celery_queue_per_site:
app.conf.CELERY_ROUTES = (SiteRouter(),)

View file

@ -0,0 +1,142 @@
{
"allow_copy": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "field:celery_task_id",
"creation": "2015-07-03 11:28:03.496346",
"custom": 0,
"docstatus": 0,
"doctype": "DocType",
"document_type": "Transaction",
"fields": [
{
"allow_on_submit": 0,
"fieldname": "celery_task_id",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Celery Task ID",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"fieldname": "status",
"fieldtype": "Select",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Status",
"no_copy": 0,
"options": "\nQueued\nRunning\nFinished\nFailed\n",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"fieldname": "stdout",
"fieldtype": "Long Text",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "stdout",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_on_submit": 0,
"fieldname": "stderr",
"fieldtype": "Long Text",
"hidden": 0,
"ignore_user_permissions": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "stderr",
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"fieldname": "reference_doctype",
"fieldtype": "Link",
"label": "Reference DocType",
"options": "DocType",
"permlevel": 0,
"precision": ""
},
{
"fieldname": "reference_name",
"fieldtype": "Dynamic Link",
"label": "Reference Doc",
"options": "reference_doctype",
"permlevel": 0,
"precision": ""
}
],
"hide_heading": 0,
"hide_toolbar": 0,
"in_create": 0,
"in_dialog": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"modified": "2015-07-04 14:33:26.791024",
"modified_by": "Administrator",
"module": "Core",
"name": "Async Task",
"name_case": "",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"permlevel": 0,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"read_only": 0,
"read_only_onload": 0,
"sort_field": "modified",
"sort_order": "DESC"
}

View file

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe.model.document import Document
class AsyncTask(Document):
pass

View file

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
from __future__ import unicode_literals
import frappe
import unittest
# test_records = frappe.get_test_records('Async Task')
class TestAsyncTask(unittest.TestCase):
pass

View file

@ -70,7 +70,7 @@ def handle():
return build_response("json")
def execute_cmd(cmd):
def execute_cmd(cmd, async=False):
"""execute a request as python module"""
for hook in frappe.get_hooks("override_whitelisted_methods", {}).get(cmd, []):
# override using the first hook
@ -78,6 +78,8 @@ def execute_cmd(cmd):
break
method = get_attr(cmd)
if async:
method = method._f
# check if whitelisted
if frappe.session['user'] == 'Guest':
@ -103,3 +105,15 @@ def get_attr(cmd):
method = globals()[cmd]
frappe.log("method:" + cmd)
return method
@frappe.whitelist()
def get_async_task_status(task_id):
from frappe.celery_app import get_celery
c = get_celery()
a = c.AsyncResult(task_id)
frappe.local.response['response'] = a.result
return {
"state": a.state,
"progress": 0
}

View file

@ -147,6 +147,7 @@ scheduler_events = {
"frappe.desk.doctype.event.event.send_event_digest",
"frappe.sessions.clear_expired_sessions",
"frappe.email.doctype.email_alert.email_alert.trigger_daily_alerts",
"frappe.async.remove_old_task_logs",
]
}

View file

@ -214,7 +214,8 @@ def make_site_dirs():
site_private_path = os.path.join(frappe.local.site_path, 'private')
for dir_path in (
os.path.join(site_private_path, 'backups'),
os.path.join(site_public_path, 'files')):
os.path.join(site_public_path, 'files'),
os.path.join(site_public_path, 'task-logs')):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
locks_dir = frappe.get_site_path('locks')

View file

@ -15,7 +15,8 @@
"public/js/lib/moment/moment.min.js",
"public/js/lib/highlight.pack.js",
"public/js/frappe/class.js",
"website/js/website.js"
"website/js/website.js",
"public/js/lib/socket.io.min.js"
],
"js/editor.min.js": [
"public/js/lib/jquery/jquery.hotkeys.js",
@ -49,6 +50,7 @@
"public/js/lib/nprogress.js",
"public/js/lib/moment/moment-with-locales.min.js",
"public/js/lib/moment/moment-timezone-with-data.min.js",
"public/js/lib/socket.io.min.js",
"public/js/frappe/provide.js",
"public/js/frappe/class.js",
@ -61,6 +63,7 @@
"public/js/frappe/ui/messages.js",
"public/js/frappe/request.js",
"public/js/frappe/socket.js",
"public/js/frappe/router.js",
"public/js/frappe/defaults.js",
"public/js/lib/microtemplate.js",

View file

@ -28,10 +28,21 @@ frappe.call = function(opts) {
args.cmd = opts.method;
}
var callback = function(data, xhr) {
if(data.task_id) {
// async call, subscribe
frappe.socket.subscribe(data.task_id, opts);
}
else {
// ajax
return opts.callback(data, xhr);
}
}
return frappe.request.call({
type: opts.type || "POST",
args: args,
success: opts.callback,
success: callback,
error: opts.error,
always: opts.always,
btn: opts.btn,

View file

@ -0,0 +1,60 @@
frappe.socket = {
open_tasks: {},
init: function() {
frappe.socket.socket = io.connect('http://' + document.domain + ':' + 3000);
frappe.socket.socket.on('msgprint', function(message) {
frappe.msgprint(message)
});
frappe.socket.setup_listeners();
frappe.socket.setup_reconnect();
},
subscribe: function(task_id, opts) {
frappe.socket.socket.emit('task_subscribe', task_id);
frappe.socket.socket.emit('progress_subscribe', task_id);
frappe.socket.open_tasks[task_id] = opts;
},
setup_listeners: function() {
frappe.socket.socket.on('task_status_change', function(data) {
if(data.status==="Running") {
frappe.socket.process_response(data, "running");
} else {
// failed or finished
frappe.socket.process_response(data, "callback");
// delete frappe.socket.open_tasks[data.task_id];
}
});
frappe.socket.socket.on('task_progress', function(data) {
frappe.socket.process_response(data, "progress");
});
},
setup_reconnect: function() {
// subscribe again to open_tasks
frappe.socket.socket.on("connect", function() {
$.each(frappe.socket.open_tasks, function(task_id, opts) {
frappe.socket.subscribe(task_id, opts);
});
});
},
process_response: function(data, method) {
if(!data) {
return;
}
if(data) {
var opts = frappe.socket.open_tasks[data.task_id];
if(opts[method]) opts[method](data.message);
}
if(opts.always) {
opts.always(data.message);
}
if(data.status_code && status_code > 400 && opts.error) {
opts.error(data.message);
return;
}
}
}
$(frappe.socket.init);

7000
frappe/public/js/lib/socket.io.min.js vendored Normal file

File diff suppressed because it is too large Load diff

View file

@ -163,7 +163,7 @@ class Session:
"full_name": self.full_name,
"user_type": self.user_type,
"device": self.device,
"session_country": get_geo_ip_country(frappe.local.request_ip)
"session_country": get_geo_ip_country(frappe.local.request_ip) if frappe.local.request_ip else None
})
# insert session

View file

@ -7,6 +7,10 @@ from frappe.utils.scheduler import enqueue_events
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX
from frappe.utils import get_sites
from frappe.utils.file_lock import create_lock, delete_lock
from frappe.handler import execute_cmd
from frappe.async import set_task_status, END_LINE, get_std_streams
import frappe.utils.response
import sys
@celery_task()
def sync_queues():
@ -122,3 +126,46 @@ def pull_from_email_account(site, email_account):
frappe.db.commit()
finally:
frappe.destroy()
@celery_task(bind=True)
def run_async_task(self, site, user, cmd, form_dict):
ret = {}
frappe.init(site)
frappe.connect()
sys.stdout, sys.stderr = get_std_streams(self.request.id)
frappe.local.stdout, frappe.local.stderr = sys.stdout, sys.stderr
frappe.local.task_id = self.request.id
frappe.cache()
try:
set_task_status(self.request.id, "Running")
frappe.db.commit()
frappe.set_user(user)
# sleep(60)
frappe.local.form_dict = frappe._dict(form_dict)
execute_cmd(cmd, async=True)
ret = frappe.local.response
except Exception, e:
frappe.db.rollback()
set_task_status(self.request.id, "Failed")
if not frappe.flags.in_test:
frappe.db.commit()
ret = frappe.local.response
http_status_code = getattr(e, "http_status_code", 500)
ret['status_code'] = http_status_code
ret['exc'] = frappe.get_traceback()
task_logger.error('Exception in running {}: {}'.format(cmd, ret['exc']))
else:
set_task_status(self.request.id, "Finished", response=ret)
if not frappe.flags.in_test:
frappe.db.commit()
finally:
sys.stdout.write('\n' + END_LINE)
sys.stderr.write('\n' + END_LINE)
if not frappe.flags.in_test:
frappe.destroy()
sys.stdout.close()
sys.stderr.close()
sys.stdout, sys.stderr = 1, 0
return ret

View file

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import unicode_literals
import unittest
import frappe
from frappe.tasks import run_async_task
class TestAsync(unittest.TestCase):
def test_response(self):
result = run_async_task(frappe.local.site, 'Administrator', 'async_ping', frappe._dict())
self.assertEquals(result.message, "pong")

View file

@ -400,3 +400,4 @@ def get_request_session(max_retries=3):
session.mount("http://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
return session

View file

@ -28,3 +28,4 @@ html2text
email_reply_parser
click
num2words
gevent-socketio

67
socketio.js Normal file
View file

@ -0,0 +1,67 @@
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var cookie = require('cookie')
var redis = require("redis")
var subscriber = redis.createClient(12311);
var r = redis.createClient(12311);
var request = require('superagent')
app.get('/', function(req, res){
res.sendfile('index.html');
});
io.on('connection', function(socket){
socket.on('task_subscribe', function(task_id) {
var room = 'task:' + task_id;
socket.join(room);
})
socket.on('progress_subscribe', function(task_id) {
var room = 'task_progress:' + task_id;
socket.join(room);
send_existing_lines(task_id, socket);
})
socket.on('doc_subscribe', function(doctype, docname) {
var sid = cookie.parse(socket.request.headers.cookie).sid
if(!sid) {
return;
}
request.post('http://localhost:8000/api/method/frappe.async.can_subscribe_doc')
.type('form')
.send({
sid: sid,
doctype: doctype,
docname: docname
})
.end(function(err, res) {
if(res.status == 200) {
socket.join('doc:'+ doctype + '/' + docname);
}
})
})
});
function send_existing_lines(task_id, socket) {
r.hgetall('task_log:' + task_id, function(err, lines) {
socket.emit('task_progress', {
"task_id": task_id,
"message": {
"lines": lines
}
})
})
}
subscriber.on("message", function(channel, message) {
message = JSON.parse(message);
io.to(message.room).emit(message.event, message.message);
});
subscriber.subscribe("events");
http.listen(3000, function(){
console.log('listening on *:3000');
});