refactor: move handlers to separate file
This commit is contained in:
parent
d59e499418
commit
bdaed29ef2
5 changed files with 223 additions and 225 deletions
|
|
@ -6,7 +6,6 @@ class RealTimeClient {
|
|||
constructor() {
|
||||
this.open_tasks = {};
|
||||
this.open_docs = new Set();
|
||||
this.emit_queue = [];
|
||||
}
|
||||
|
||||
on(event, callback) {
|
||||
|
|
@ -96,9 +95,9 @@ class RealTimeClient {
|
|||
}
|
||||
|
||||
get_host(port = 3000) {
|
||||
var host = window.location.origin;
|
||||
let host = window.location.origin;
|
||||
if (window.dev_server) {
|
||||
var parts = host.split(":");
|
||||
let parts = host.split(":");
|
||||
port = frappe.boot.socketio_port || port.toString() || "3000";
|
||||
if (parts.length > 2) {
|
||||
host = parts[0] + ":" + parts[1];
|
||||
|
|
@ -171,7 +170,7 @@ class RealTimeClient {
|
|||
}
|
||||
|
||||
// success
|
||||
var opts = this.open_tasks[data.task_id];
|
||||
let opts = this.open_tasks[data.task_id];
|
||||
if (opts[method]) {
|
||||
opts[method](data);
|
||||
}
|
||||
|
|
|
|||
198
realtime/handlers/frappe_handlers.js
Normal file
198
realtime/handlers/frappe_handlers.js
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
const request = require("superagent");
|
||||
const { get_url } = require("../utils");
|
||||
const log = console.log; // eslint-disable-line
|
||||
|
||||
const WEBSITE_ROOM = "website";
|
||||
const SITE_ROOM = "all";
|
||||
|
||||
function frappe_handlers(realtime, socket) {
|
||||
socket.join(user_room(socket.user));
|
||||
socket.join(WEBSITE_ROOM);
|
||||
|
||||
if (socket.user_type == "System User") {
|
||||
socket.join(SITE_ROOM);
|
||||
}
|
||||
|
||||
socket.on("doctype_subscribe", function (doctype) {
|
||||
can_subscribe_doctype({
|
||||
socket,
|
||||
doctype,
|
||||
callback: () => {
|
||||
socket.join(doctype_room(doctype));
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doctype_unsubscribe", function (doctype) {
|
||||
socket.leave(doctype_room(doctype));
|
||||
});
|
||||
|
||||
socket.on("task_subscribe", function (task_id) {
|
||||
const room = task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on("task_unsubscribe", function (task_id) {
|
||||
const room = task_room(task_id);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on("progress_subscribe", function (task_id) {
|
||||
const room = task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on("doc_subscribe", function (doctype, docname) {
|
||||
can_subscribe_doc({
|
||||
socket,
|
||||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doc_unsubscribe", function (doctype, docname) {
|
||||
let room = doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on("doc_open", function (doctype, docname) {
|
||||
can_subscribe_doc({
|
||||
socket,
|
||||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = open_doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
if (!socket.subscribed_documents) socket.subscribed_documents = [];
|
||||
socket.subscribed_documents.push([doctype, docname]);
|
||||
|
||||
// show who is currently viewing the form
|
||||
notify_subscribed_doc_users({
|
||||
socket: socket,
|
||||
doctype: doctype,
|
||||
docname: docname,
|
||||
});
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doc_close", function (doctype, docname) {
|
||||
// remove this user from the list of 'who is currently viewing the form'
|
||||
let room = open_doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
|
||||
if (socket.subscribed_documents) {
|
||||
socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => {
|
||||
!(dt == doctype && dn == docname);
|
||||
});
|
||||
}
|
||||
|
||||
notify_subscribed_doc_users({
|
||||
socket: socket,
|
||||
doctype: doctype,
|
||||
docname: docname,
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
notify_disconnected_documents(socket);
|
||||
});
|
||||
}
|
||||
|
||||
function notify_disconnected_documents(socket) {
|
||||
if (socket.subscribed_documents) {
|
||||
socket.subscribed_documents.forEach(([doctype, docname]) => {
|
||||
notify_subscribed_doc_users({ socket, doctype, docname });
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function can_subscribe_doctype(args) {
|
||||
if (!args) return;
|
||||
if (!args.doctype) return;
|
||||
request
|
||||
.get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doctype"))
|
||||
.type("form")
|
||||
.query({
|
||||
sid: args.socket.sid,
|
||||
doctype: args.doctype,
|
||||
})
|
||||
.end(function (err, res) {
|
||||
if (!res || res.status == 403 || err) {
|
||||
if (err) {
|
||||
log(err);
|
||||
}
|
||||
return false;
|
||||
} else if (res.status == 200) {
|
||||
args.callback && args.callback(err, res);
|
||||
return true;
|
||||
}
|
||||
log("ERROR (can_subscribe_doctype): ", err, res);
|
||||
});
|
||||
}
|
||||
|
||||
function notify_subscribed_doc_users(args) {
|
||||
if (!(args && args.doctype && args.docname)) {
|
||||
return;
|
||||
}
|
||||
const socket = args.socket;
|
||||
const room = open_doc_room(args.doctype, args.docname);
|
||||
|
||||
const clients = Array.from(socket.nsp.adapter.rooms.get(room) || []);
|
||||
|
||||
let users = [];
|
||||
|
||||
socket.nsp.sockets.forEach((sock) => {
|
||||
if (clients.includes(sock.id)) {
|
||||
users.push(sock.user);
|
||||
}
|
||||
});
|
||||
|
||||
// dont send update to self. meaningless.
|
||||
if (users.length == 1 && users[0] == args.socket.user) return;
|
||||
|
||||
// notify
|
||||
socket.nsp.to(room).emit("doc_viewers", {
|
||||
doctype: args.doctype,
|
||||
docname: args.docname,
|
||||
users: Array.from(new Set(users)),
|
||||
});
|
||||
}
|
||||
|
||||
function can_subscribe_doc(args) {
|
||||
if (!args) return;
|
||||
if (!args.doctype || !args.docname) return;
|
||||
request
|
||||
.get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doc"))
|
||||
.type("form")
|
||||
.query({
|
||||
sid: args.socket.sid,
|
||||
doctype: args.doctype,
|
||||
docname: args.docname,
|
||||
})
|
||||
.end(function (err, res) {
|
||||
if (!res) {
|
||||
log("No response for doc_subscribe");
|
||||
} else if (res.status == 403) {
|
||||
return;
|
||||
} else if (err) {
|
||||
log(err);
|
||||
} else if (res.status == 200) {
|
||||
args.callback(err, res);
|
||||
} else {
|
||||
log("Something went wrong", err, res);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const doc_room = (doctype, docname) => "doc:" + doctype + "/" + docname;
|
||||
const open_doc_room = (doctype, docname) => "open_doc:" + doctype + "/" + docname;
|
||||
const user_room = (user) => "user:" + user;
|
||||
const doctype_room = (doctype) => "doctype:" + doctype;
|
||||
const task_room = (task_id) => "task_progress:" + task_id;
|
||||
|
||||
module.exports = frappe_handlers;
|
||||
|
|
@ -1,11 +1,7 @@
|
|||
const request = require("superagent");
|
||||
const { Server } = require("socket.io");
|
||||
|
||||
const { get_conf, get_redis_subscriber } = require("../node_utils");
|
||||
const conf = get_conf();
|
||||
const log = console.log; // eslint-disable-line
|
||||
|
||||
const { get_url } = require("./utils");
|
||||
|
||||
let io = new Server(conf.socketio_port, {
|
||||
cors: {
|
||||
|
|
@ -15,7 +11,7 @@ let io = new Server(conf.socketio_port, {
|
|||
},
|
||||
});
|
||||
|
||||
// Multitenancy implementation
|
||||
// Multitenancy implementation.
|
||||
// allow arbitrary sitename as namespaces
|
||||
// namespaces get validated during authentication.
|
||||
const realtime = io.of(/^\/.*$/);
|
||||
|
|
@ -23,218 +19,23 @@ const realtime = io.of(/^\/.*$/);
|
|||
// load and register middlewares
|
||||
const authenticate = require("./middlewares/authenticate");
|
||||
realtime.use(authenticate);
|
||||
// =======================
|
||||
|
||||
// load and register handler
|
||||
realtime.on("connection", function (socket) {
|
||||
socket.join(get_user_room(socket.user));
|
||||
socket.join(get_website_room());
|
||||
|
||||
if (socket.user_type == "System User") {
|
||||
socket.join(get_site_room());
|
||||
}
|
||||
|
||||
socket.on("doctype_subscribe", function (doctype) {
|
||||
can_subscribe_doctype({
|
||||
socket,
|
||||
doctype,
|
||||
callback: () => {
|
||||
socket.join(get_doctype_room(doctype));
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doctype_unsubscribe", function (doctype) {
|
||||
socket.leave(get_doctype_room(doctype));
|
||||
});
|
||||
|
||||
socket.on("task_subscribe", function (task_id) {
|
||||
var room = get_task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on("task_unsubscribe", function (task_id) {
|
||||
var room = get_task_room(task_id);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on("progress_subscribe", function (task_id) {
|
||||
var room = get_task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on("doc_subscribe", function (doctype, docname) {
|
||||
can_subscribe_doc({
|
||||
socket,
|
||||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = get_doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doc_unsubscribe", function (doctype, docname) {
|
||||
let room = get_doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on("doc_open", function (doctype, docname) {
|
||||
can_subscribe_doc({
|
||||
socket,
|
||||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = get_open_doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
socket.subscribed_documents.push([doctype, docname]);
|
||||
|
||||
// show who is currently viewing the form
|
||||
notify_subscribed_doc_users({
|
||||
socket: socket,
|
||||
doctype: doctype,
|
||||
docname: docname,
|
||||
});
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doc_close", function (doctype, docname) {
|
||||
// remove this user from the list of 'who is currently viewing the form'
|
||||
let room = get_open_doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => {
|
||||
!(dt == doctype && dn == docname);
|
||||
});
|
||||
|
||||
notify_subscribed_doc_users({
|
||||
socket: socket,
|
||||
doctype: doctype,
|
||||
docname: docname,
|
||||
});
|
||||
});
|
||||
// load and register handlers
|
||||
const frappe_handlers = require("./handlers/frappe_handlers");
|
||||
function on_connection(socket) {
|
||||
frappe_handlers(realtime, socket);
|
||||
|
||||
// ESBUild "open in editor" on error
|
||||
socket.on("open_in_editor", (data) => {
|
||||
subscriber.publish("open_in_editor", JSON.stringify(data));
|
||||
});
|
||||
});
|
||||
|
||||
function get_doc_room(doctype, docname) {
|
||||
return "doc:" + doctype + "/" + docname;
|
||||
}
|
||||
|
||||
function get_open_doc_room(doctype, docname) {
|
||||
return "open_doc:" + doctype + "/" + docname;
|
||||
}
|
||||
|
||||
function get_user_room(user) {
|
||||
return "user:" + user;
|
||||
}
|
||||
|
||||
function get_site_room() {
|
||||
return "all";
|
||||
}
|
||||
|
||||
function get_website_room() {
|
||||
return "website";
|
||||
}
|
||||
|
||||
function get_doctype_room(doctype) {
|
||||
return "doctype:" + doctype;
|
||||
}
|
||||
|
||||
function get_task_room(task_id) {
|
||||
return "task_progress:" + task_id;
|
||||
}
|
||||
|
||||
function can_subscribe_doc(args) {
|
||||
if (!args) return;
|
||||
if (!args.doctype || !args.docname) return;
|
||||
request
|
||||
.get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doc"))
|
||||
.type("form")
|
||||
.query({
|
||||
sid: args.socket.sid,
|
||||
doctype: args.doctype,
|
||||
docname: args.docname,
|
||||
})
|
||||
.end(function (err, res) {
|
||||
if (!res) {
|
||||
log("No response for doc_subscribe");
|
||||
} else if (res.status == 403) {
|
||||
return;
|
||||
} else if (err) {
|
||||
log(err);
|
||||
} else if (res.status == 200) {
|
||||
args.callback(err, res);
|
||||
} else {
|
||||
log("Something went wrong", err, res);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function can_subscribe_doctype(args) {
|
||||
if (!args) return;
|
||||
if (!args.doctype) return;
|
||||
request
|
||||
.get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doctype"))
|
||||
.type("form")
|
||||
.query({
|
||||
sid: args.socket.sid,
|
||||
doctype: args.doctype,
|
||||
})
|
||||
.end(function (err, res) {
|
||||
if (!res || res.status == 403 || err) {
|
||||
if (err) {
|
||||
log(err);
|
||||
}
|
||||
return false;
|
||||
} else if (res.status == 200) {
|
||||
args.callback && args.callback(err, res);
|
||||
return true;
|
||||
}
|
||||
log("ERROR (can_subscribe_doctype): ", err, res);
|
||||
});
|
||||
}
|
||||
|
||||
function notify_subscribed_doc_users(args) {
|
||||
if (!(args && args.doctype && args.docname)) {
|
||||
return;
|
||||
}
|
||||
const socket = args.socket;
|
||||
const open_doc_room = get_open_doc_room(args.doctype, args.docname);
|
||||
|
||||
const clients = Array.from(socket.nsp.adapter.rooms.get(open_doc_room) || []);
|
||||
|
||||
let users = [];
|
||||
|
||||
socket.nsp.sockets.forEach((sock) => {
|
||||
if (clients.includes(sock.id)) {
|
||||
users.push(sock.user);
|
||||
}
|
||||
});
|
||||
|
||||
// dont send update to self. meaningless.
|
||||
if (users.length == 1 && users[0] == args.socket.user) return;
|
||||
|
||||
// notify
|
||||
socket.nsp.to(open_doc_room).emit("doc_viewers", {
|
||||
doctype: args.doctype,
|
||||
docname: args.docname,
|
||||
users: Array.from(new Set(users)),
|
||||
});
|
||||
}
|
||||
|
||||
realtime.on("connection", function (socket) {
|
||||
socket.on("disconnect", () => user_disconnected(socket));
|
||||
});
|
||||
|
||||
function user_disconnected(socket) {
|
||||
socket.subscribed_documents.forEach(([doctype, docname]) => {
|
||||
notify_subscribed_doc_users({ socket, doctype, docname });
|
||||
});
|
||||
}
|
||||
realtime.on("connection", on_connection);
|
||||
// =======================
|
||||
|
||||
// Consume events sent from python via redis pub-sub channel.
|
||||
const subscriber = get_redis_subscriber();
|
||||
|
||||
subscriber.on("message", function (_channel, message) {
|
||||
|
|
@ -244,8 +45,10 @@ subscriber.on("message", function (_channel, message) {
|
|||
if (message.room) {
|
||||
io.of(namespace).to(message.room).emit(message.event, message.message);
|
||||
} else {
|
||||
io.emit(message.event, message.message);
|
||||
// publish to ALL sites only used for things like build event.
|
||||
realtime.emit(message.event, message.message);
|
||||
}
|
||||
});
|
||||
|
||||
subscriber.subscribe("events");
|
||||
// =======================
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
const cookie = require("cookie");
|
||||
const request = require("superagent");
|
||||
const { get_hostname, get_url } = require("../utils");
|
||||
const { get_url } = require("../utils");
|
||||
|
||||
const { get_conf } = require("../../node_utils");
|
||||
const conf = get_conf();
|
||||
|
|
@ -40,7 +40,6 @@ function authenticate_with_frappe(socket, next) {
|
|||
socket.user = res.body.message.user;
|
||||
socket.user_type = res.body.message.user_type;
|
||||
socket.sid = cookies.sid;
|
||||
socket.subscribed_documents = [];
|
||||
next();
|
||||
})
|
||||
.catch((e) => {
|
||||
|
|
@ -67,4 +66,12 @@ function get_site_name(socket) {
|
|||
return socket.site_name;
|
||||
}
|
||||
|
||||
function get_hostname(url) {
|
||||
if (!url) return undefined;
|
||||
if (url.indexOf("://") > -1) {
|
||||
url = url.split("/")[2];
|
||||
}
|
||||
return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url;
|
||||
}
|
||||
|
||||
module.exports = authenticate_with_frappe;
|
||||
|
|
|
|||
|
|
@ -1,11 +1,3 @@
|
|||
function get_hostname(url) {
|
||||
if (!url) return undefined;
|
||||
if (url.indexOf("://") > -1) {
|
||||
url = url.split("/")[2];
|
||||
}
|
||||
return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url;
|
||||
}
|
||||
|
||||
function get_url(socket, path) {
|
||||
if (!path) {
|
||||
path = "";
|
||||
|
|
@ -15,5 +7,4 @@ function get_url(socket, path) {
|
|||
|
||||
module.exports = {
|
||||
get_url,
|
||||
get_hostname,
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue