[fix] async and socketio
This commit is contained in:
parent
2e6b3ad356
commit
a987e32f94
5 changed files with 61 additions and 34 deletions
|
|
@ -97,7 +97,9 @@ def is_file_old(file_path):
|
|||
return ((time.time() - os.stat(file_path).st_mtime) > TASK_LOG_MAX_AGE)
|
||||
|
||||
|
||||
def publish_realtime(event=None, message=None, room=None, user=None, doctype=None, docname=None, after_commit=False):
|
||||
def publish_realtime(event=None, message=None, room=None,
|
||||
user=None, doctype=None, docname=None, task_id=None,
|
||||
after_commit=False):
|
||||
"""Publish real-time updates
|
||||
|
||||
:param event: Event name, like `task_progress` etc. that will be handled by the client (default is `task_progress` if within task or `global`)
|
||||
|
|
@ -117,10 +119,13 @@ def publish_realtime(event=None, message=None, room=None, user=None, doctype=Non
|
|||
event = "global"
|
||||
|
||||
if not room:
|
||||
if getattr(frappe.local, "task_id", None):
|
||||
room = get_task_progress_room()
|
||||
if not task_id and hasattr(frappe.local, "task_id"):
|
||||
task_id = frappe.local.task_id
|
||||
|
||||
if task_id:
|
||||
room = get_task_progress_room(task_id)
|
||||
if not "task_id" in message:
|
||||
message["task_id"] = frappe.local.task_id
|
||||
message["task_id"] = task_id
|
||||
|
||||
after_commit = False
|
||||
elif user:
|
||||
|
|
@ -153,7 +158,7 @@ def put_log(line_no, line, task_id=None):
|
|||
r = get_redis_server()
|
||||
if not task_id:
|
||||
task_id = frappe.local.task_id
|
||||
task_progress_room = get_task_progress_room()
|
||||
task_progress_room = get_task_progress_room(task_id)
|
||||
task_log_key = "task_log:" + task_id
|
||||
publish_realtime('task_progress', {
|
||||
"message": {
|
||||
|
|
@ -225,5 +230,5 @@ def get_user_room(user):
|
|||
def get_site_room():
|
||||
return ''.join([frappe.local.site, ':all'])
|
||||
|
||||
def get_task_progress_room():
|
||||
return "task_progress:" + frappe.local.task_id
|
||||
def get_task_progress_room(task_id):
|
||||
return "".join([frappe.local.site, ":task_progress:", task_id])
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ frappe.ui.form.Dashboard = Class.extend({
|
|||
if(!progress_area.length) {
|
||||
progress_area = $('<div class="progress-area" style="margin-top: 10px">').appendTo(this.body);
|
||||
}
|
||||
var progress_chart = $('<div class="progress-chart" title="'+title+'"></div>')
|
||||
var progress_chart = $('<div class="progress-chart" title="'+(title || '')+'"></div>')
|
||||
.appendTo(progress_area);
|
||||
|
||||
var n_charts = progress_area.find(".progress-chart").length,
|
||||
|
|
|
|||
|
|
@ -86,11 +86,19 @@ frappe.socket = {
|
|||
return host;
|
||||
},
|
||||
subscribe: function(task_id, opts) {
|
||||
// TODO DEPRECATE
|
||||
|
||||
frappe.socket.socket.emit('task_subscribe', task_id);
|
||||
frappe.socket.socket.emit('progress_subscribe', task_id);
|
||||
|
||||
frappe.socket.open_tasks[task_id] = opts;
|
||||
},
|
||||
task_subscribe: function(task_id) {
|
||||
frappe.socket.socket.emit('task_subscribe', task_id);
|
||||
},
|
||||
task_unsubscribe: function(task_id) {
|
||||
frappe.socket.socket.emit('task_unsubscribe', task_id);
|
||||
},
|
||||
doc_subscribe: function(doctype, docname) {
|
||||
frappe.socket.socket.emit('doc_subscribe', doctype, docname);
|
||||
frappe.socket.open_docs.push({doctype: doctype, docname: docname});
|
||||
|
|
@ -118,7 +126,7 @@ frappe.socket = {
|
|||
frappe.socket.process_response(data, data.status.toLowerCase());
|
||||
});
|
||||
frappe.socket.socket.on('task_progress', function(data) {
|
||||
frappe.socket.process_response(data, "progress");
|
||||
frappe.socket.process_response(data, "progress");
|
||||
});
|
||||
},
|
||||
setup_reconnect: function() {
|
||||
|
|
@ -152,14 +160,14 @@ frappe.socket = {
|
|||
}
|
||||
|
||||
// success
|
||||
if(data) {
|
||||
var opts = frappe.socket.open_tasks[data.task_id];
|
||||
if(opts[method]) opts[method](data);
|
||||
var opts = frappe.socket.open_tasks[data.task_id];
|
||||
if(opts[method]) {
|
||||
opts[method](data);
|
||||
}
|
||||
|
||||
// "callback" is std frappe term
|
||||
if(method==="success") {
|
||||
if(opts.callback) opts.callback(data);
|
||||
}
|
||||
// "callback" is std frappe term
|
||||
if(method==="success") {
|
||||
if(opts.callback) opts.callback(data);
|
||||
}
|
||||
|
||||
// always
|
||||
|
|
@ -177,9 +185,11 @@ frappe.socket = {
|
|||
|
||||
frappe.provide("frappe.realtime");
|
||||
frappe.realtime.on = function(event, callback) {
|
||||
if(frappe.socket.socket) {
|
||||
frappe.socket.socket.on(event, callback);
|
||||
}
|
||||
frappe.socket.socket && frappe.socket.socket.on(event, callback);
|
||||
};
|
||||
|
||||
frappe.realtime.off = function(event, callback) {
|
||||
frappe.socket.socket && frappe.socket.socket.off(event, callback);
|
||||
}
|
||||
|
||||
frappe.realtime.publish = function(event, message) {
|
||||
|
|
|
|||
|
|
@ -243,20 +243,22 @@ def get_data():
|
|||
|
||||
setup_template = """# -*- coding: utf-8 -*-
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
from pip.req import parse_requirements
|
||||
|
||||
version = '0.0.1'
|
||||
requirements = parse_requirements("requirements.txt", session="")
|
||||
|
||||
setup(
|
||||
name='{app_name}',
|
||||
version=version,
|
||||
description='{app_description}',
|
||||
author='{app_publisher}',
|
||||
author_email='{app_email}',
|
||||
packages=find_packages(),
|
||||
zip_safe=False,
|
||||
include_package_data=True,
|
||||
install_requires=("frappe",),
|
||||
name='{app_name}',
|
||||
version=version,
|
||||
description='{app_description}',
|
||||
author='{app_publisher}',
|
||||
author_email='{app_email}',
|
||||
packages=find_packages(),
|
||||
zip_safe=False,
|
||||
include_package_data=True,
|
||||
install_requires=[str(ir.req) for ir in requirements],
|
||||
dependency_links=[str(ir._link) for ir in requirements if ir._link]
|
||||
)
|
||||
"""
|
||||
|
||||
|
|
|
|||
20
socketio.js
20
socketio.js
|
|
@ -53,12 +53,17 @@ io.on('connection', function(socket){
|
|||
});
|
||||
|
||||
socket.on('task_subscribe', function(task_id) {
|
||||
var room = 'task:' + task_id;
|
||||
var room = get_task_room(socket, task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on('task_unsubscribe', function(task_id) {
|
||||
var room = get_task_room(socket, task_id);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on('progress_subscribe', function(task_id) {
|
||||
var room = 'task_progress:' + task_id;
|
||||
var room = get_task_room(socket, task_id);
|
||||
socket.join(room);
|
||||
send_existing_lines(task_id, socket);
|
||||
});
|
||||
|
|
@ -134,14 +139,15 @@ subscriber.on("message", function(channel, message) {
|
|||
subscriber.subscribe("events");
|
||||
|
||||
function send_existing_lines(task_id, socket) {
|
||||
var room = get_task_room(socket, task_id);
|
||||
subscriber.hgetall('task_log:' + task_id, function(err, lines) {
|
||||
socket.emit('task_progress', {
|
||||
io.to(room).emit('task_progress', {
|
||||
"task_id": task_id,
|
||||
"message": {
|
||||
"lines": lines
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function get_doc_room(socket, doctype, docname) {
|
||||
|
|
@ -160,6 +166,10 @@ function get_site_room(socket) {
|
|||
return get_site_name(socket) + ':all';
|
||||
}
|
||||
|
||||
function get_task_room(socket, task_id) {
|
||||
return get_site_name(socket) + ':task_progress:' + task_id;
|
||||
}
|
||||
|
||||
function get_site_name(socket) {
|
||||
if (conf.default_site) {
|
||||
return conf.default_site;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue