From bdaed29ef253ba6ef149af7eeff24179c1a736e3 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 23:11:28 +0530 Subject: [PATCH] refactor: move handlers to separate file --- frappe/public/js/frappe/socketio_client.js | 7 +- realtime/handlers/frappe_handlers.js | 198 ++++++++++++++++++ realtime/index.js | 223 ++------------------- realtime/middlewares/authenticate.js | 11 +- realtime/utils.js | 9 - 5 files changed, 223 insertions(+), 225 deletions(-) create mode 100644 realtime/handlers/frappe_handlers.js diff --git a/frappe/public/js/frappe/socketio_client.js b/frappe/public/js/frappe/socketio_client.js index bfcf0b1a6c..7084e9e2df 100644 --- a/frappe/public/js/frappe/socketio_client.js +++ b/frappe/public/js/frappe/socketio_client.js @@ -6,7 +6,6 @@ class RealTimeClient { constructor() { this.open_tasks = {}; this.open_docs = new Set(); - this.emit_queue = []; } on(event, callback) { @@ -96,9 +95,9 @@ class RealTimeClient { } get_host(port = 3000) { - var host = window.location.origin; + let host = window.location.origin; if (window.dev_server) { - var parts = host.split(":"); + let parts = host.split(":"); port = frappe.boot.socketio_port || port.toString() || "3000"; if (parts.length > 2) { host = parts[0] + ":" + parts[1]; @@ -171,7 +170,7 @@ class RealTimeClient { } // success - var opts = this.open_tasks[data.task_id]; + let opts = this.open_tasks[data.task_id]; if (opts[method]) { opts[method](data); } diff --git a/realtime/handlers/frappe_handlers.js b/realtime/handlers/frappe_handlers.js new file mode 100644 index 0000000000..eb7344388c --- /dev/null +++ b/realtime/handlers/frappe_handlers.js @@ -0,0 +1,198 @@ +const request = require("superagent"); +const { get_url } = require("../utils"); +const log = console.log; // eslint-disable-line + +const WEBSITE_ROOM = "website"; +const SITE_ROOM = "all"; + +function frappe_handlers(realtime, socket) { + socket.join(user_room(socket.user)); + socket.join(WEBSITE_ROOM); + + if (socket.user_type == "System User") { + socket.join(SITE_ROOM); + } + + socket.on("doctype_subscribe", function (doctype) { + can_subscribe_doctype({ + socket, + doctype, + callback: () => { + socket.join(doctype_room(doctype)); + }, + }); + }); + + socket.on("doctype_unsubscribe", function (doctype) { + socket.leave(doctype_room(doctype)); + }); + + socket.on("task_subscribe", function (task_id) { + const room = task_room(task_id); + socket.join(room); + }); + + socket.on("task_unsubscribe", function (task_id) { + const room = task_room(task_id); + socket.leave(room); + }); + + socket.on("progress_subscribe", function (task_id) { + const room = task_room(task_id); + socket.join(room); + }); + + socket.on("doc_subscribe", function (doctype, docname) { + can_subscribe_doc({ + socket, + doctype, + docname, + callback: () => { + let room = doc_room(doctype, docname); + socket.join(room); + }, + }); + }); + + socket.on("doc_unsubscribe", function (doctype, docname) { + let room = doc_room(doctype, docname); + socket.leave(room); + }); + + socket.on("doc_open", function (doctype, docname) { + can_subscribe_doc({ + socket, + doctype, + docname, + callback: () => { + let room = open_doc_room(doctype, docname); + socket.join(room); + if (!socket.subscribed_documents) socket.subscribed_documents = []; + socket.subscribed_documents.push([doctype, docname]); + + // show who is currently viewing the form + notify_subscribed_doc_users({ + socket: socket, + doctype: doctype, + docname: docname, + }); + }, + }); + }); + + socket.on("doc_close", function (doctype, docname) { + // remove this user from the list of 'who is currently viewing the form' + let room = open_doc_room(doctype, docname); + socket.leave(room); + + if (socket.subscribed_documents) { + socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => { + !(dt == doctype && dn == docname); + }); + } + + notify_subscribed_doc_users({ + socket: socket, + doctype: doctype, + docname: docname, + }); + }); + + socket.on("disconnect", () => { + notify_disconnected_documents(socket); + }); +} + +function notify_disconnected_documents(socket) { + if (socket.subscribed_documents) { + socket.subscribed_documents.forEach(([doctype, docname]) => { + notify_subscribed_doc_users({ socket, doctype, docname }); + }); + } +} + +function can_subscribe_doctype(args) { + if (!args) return; + if (!args.doctype) return; + request + .get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doctype")) + .type("form") + .query({ + sid: args.socket.sid, + doctype: args.doctype, + }) + .end(function (err, res) { + if (!res || res.status == 403 || err) { + if (err) { + log(err); + } + return false; + } else if (res.status == 200) { + args.callback && args.callback(err, res); + return true; + } + log("ERROR (can_subscribe_doctype): ", err, res); + }); +} + +function notify_subscribed_doc_users(args) { + if (!(args && args.doctype && args.docname)) { + return; + } + const socket = args.socket; + const room = open_doc_room(args.doctype, args.docname); + + const clients = Array.from(socket.nsp.adapter.rooms.get(room) || []); + + let users = []; + + socket.nsp.sockets.forEach((sock) => { + if (clients.includes(sock.id)) { + users.push(sock.user); + } + }); + + // dont send update to self. meaningless. + if (users.length == 1 && users[0] == args.socket.user) return; + + // notify + socket.nsp.to(room).emit("doc_viewers", { + doctype: args.doctype, + docname: args.docname, + users: Array.from(new Set(users)), + }); +} + +function can_subscribe_doc(args) { + if (!args) return; + if (!args.doctype || !args.docname) return; + request + .get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doc")) + .type("form") + .query({ + sid: args.socket.sid, + doctype: args.doctype, + docname: args.docname, + }) + .end(function (err, res) { + if (!res) { + log("No response for doc_subscribe"); + } else if (res.status == 403) { + return; + } else if (err) { + log(err); + } else if (res.status == 200) { + args.callback(err, res); + } else { + log("Something went wrong", err, res); + } + }); +} + +const doc_room = (doctype, docname) => "doc:" + doctype + "/" + docname; +const open_doc_room = (doctype, docname) => "open_doc:" + doctype + "/" + docname; +const user_room = (user) => "user:" + user; +const doctype_room = (doctype) => "doctype:" + doctype; +const task_room = (task_id) => "task_progress:" + task_id; + +module.exports = frappe_handlers; diff --git a/realtime/index.js b/realtime/index.js index 3c01db7246..cf6549d521 100644 --- a/realtime/index.js +++ b/realtime/index.js @@ -1,11 +1,7 @@ -const request = require("superagent"); const { Server } = require("socket.io"); const { get_conf, get_redis_subscriber } = require("../node_utils"); const conf = get_conf(); -const log = console.log; // eslint-disable-line - -const { get_url } = require("./utils"); let io = new Server(conf.socketio_port, { cors: { @@ -15,7 +11,7 @@ let io = new Server(conf.socketio_port, { }, }); -// Multitenancy implementation +// Multitenancy implementation. // allow arbitrary sitename as namespaces // namespaces get validated during authentication. const realtime = io.of(/^\/.*$/); @@ -23,218 +19,23 @@ const realtime = io.of(/^\/.*$/); // load and register middlewares const authenticate = require("./middlewares/authenticate"); realtime.use(authenticate); +// ======================= -// load and register handler -realtime.on("connection", function (socket) { - socket.join(get_user_room(socket.user)); - socket.join(get_website_room()); - - if (socket.user_type == "System User") { - socket.join(get_site_room()); - } - - socket.on("doctype_subscribe", function (doctype) { - can_subscribe_doctype({ - socket, - doctype, - callback: () => { - socket.join(get_doctype_room(doctype)); - }, - }); - }); - - socket.on("doctype_unsubscribe", function (doctype) { - socket.leave(get_doctype_room(doctype)); - }); - - socket.on("task_subscribe", function (task_id) { - var room = get_task_room(task_id); - socket.join(room); - }); - - socket.on("task_unsubscribe", function (task_id) { - var room = get_task_room(task_id); - socket.leave(room); - }); - - socket.on("progress_subscribe", function (task_id) { - var room = get_task_room(task_id); - socket.join(room); - }); - - socket.on("doc_subscribe", function (doctype, docname) { - can_subscribe_doc({ - socket, - doctype, - docname, - callback: () => { - let room = get_doc_room(doctype, docname); - socket.join(room); - }, - }); - }); - - socket.on("doc_unsubscribe", function (doctype, docname) { - let room = get_doc_room(doctype, docname); - socket.leave(room); - }); - - socket.on("doc_open", function (doctype, docname) { - can_subscribe_doc({ - socket, - doctype, - docname, - callback: () => { - let room = get_open_doc_room(doctype, docname); - socket.join(room); - socket.subscribed_documents.push([doctype, docname]); - - // show who is currently viewing the form - notify_subscribed_doc_users({ - socket: socket, - doctype: doctype, - docname: docname, - }); - }, - }); - }); - - socket.on("doc_close", function (doctype, docname) { - // remove this user from the list of 'who is currently viewing the form' - let room = get_open_doc_room(doctype, docname); - socket.leave(room); - socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => { - !(dt == doctype && dn == docname); - }); - - notify_subscribed_doc_users({ - socket: socket, - doctype: doctype, - docname: docname, - }); - }); +// load and register handlers +const frappe_handlers = require("./handlers/frappe_handlers"); +function on_connection(socket) { + frappe_handlers(realtime, socket); + // ESBUild "open in editor" on error socket.on("open_in_editor", (data) => { subscriber.publish("open_in_editor", JSON.stringify(data)); }); -}); - -function get_doc_room(doctype, docname) { - return "doc:" + doctype + "/" + docname; } -function get_open_doc_room(doctype, docname) { - return "open_doc:" + doctype + "/" + docname; -} - -function get_user_room(user) { - return "user:" + user; -} - -function get_site_room() { - return "all"; -} - -function get_website_room() { - return "website"; -} - -function get_doctype_room(doctype) { - return "doctype:" + doctype; -} - -function get_task_room(task_id) { - return "task_progress:" + task_id; -} - -function can_subscribe_doc(args) { - if (!args) return; - if (!args.doctype || !args.docname) return; - request - .get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doc")) - .type("form") - .query({ - sid: args.socket.sid, - doctype: args.doctype, - docname: args.docname, - }) - .end(function (err, res) { - if (!res) { - log("No response for doc_subscribe"); - } else if (res.status == 403) { - return; - } else if (err) { - log(err); - } else if (res.status == 200) { - args.callback(err, res); - } else { - log("Something went wrong", err, res); - } - }); -} - -function can_subscribe_doctype(args) { - if (!args) return; - if (!args.doctype) return; - request - .get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_doctype")) - .type("form") - .query({ - sid: args.socket.sid, - doctype: args.doctype, - }) - .end(function (err, res) { - if (!res || res.status == 403 || err) { - if (err) { - log(err); - } - return false; - } else if (res.status == 200) { - args.callback && args.callback(err, res); - return true; - } - log("ERROR (can_subscribe_doctype): ", err, res); - }); -} - -function notify_subscribed_doc_users(args) { - if (!(args && args.doctype && args.docname)) { - return; - } - const socket = args.socket; - const open_doc_room = get_open_doc_room(args.doctype, args.docname); - - const clients = Array.from(socket.nsp.adapter.rooms.get(open_doc_room) || []); - - let users = []; - - socket.nsp.sockets.forEach((sock) => { - if (clients.includes(sock.id)) { - users.push(sock.user); - } - }); - - // dont send update to self. meaningless. - if (users.length == 1 && users[0] == args.socket.user) return; - - // notify - socket.nsp.to(open_doc_room).emit("doc_viewers", { - doctype: args.doctype, - docname: args.docname, - users: Array.from(new Set(users)), - }); -} - -realtime.on("connection", function (socket) { - socket.on("disconnect", () => user_disconnected(socket)); -}); - -function user_disconnected(socket) { - socket.subscribed_documents.forEach(([doctype, docname]) => { - notify_subscribed_doc_users({ socket, doctype, docname }); - }); -} +realtime.on("connection", on_connection); +// ======================= +// Consume events sent from python via redis pub-sub channel. const subscriber = get_redis_subscriber(); subscriber.on("message", function (_channel, message) { @@ -244,8 +45,10 @@ subscriber.on("message", function (_channel, message) { if (message.room) { io.of(namespace).to(message.room).emit(message.event, message.message); } else { - io.emit(message.event, message.message); + // publish to ALL sites only used for things like build event. + realtime.emit(message.event, message.message); } }); subscriber.subscribe("events"); +// ======================= diff --git a/realtime/middlewares/authenticate.js b/realtime/middlewares/authenticate.js index 9bd2ed592e..1f8876a1da 100644 --- a/realtime/middlewares/authenticate.js +++ b/realtime/middlewares/authenticate.js @@ -1,6 +1,6 @@ const cookie = require("cookie"); const request = require("superagent"); -const { get_hostname, get_url } = require("../utils"); +const { get_url } = require("../utils"); const { get_conf } = require("../../node_utils"); const conf = get_conf(); @@ -40,7 +40,6 @@ function authenticate_with_frappe(socket, next) { socket.user = res.body.message.user; socket.user_type = res.body.message.user_type; socket.sid = cookies.sid; - socket.subscribed_documents = []; next(); }) .catch((e) => { @@ -67,4 +66,12 @@ function get_site_name(socket) { return socket.site_name; } +function get_hostname(url) { + if (!url) return undefined; + if (url.indexOf("://") > -1) { + url = url.split("/")[2]; + } + return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url; +} + module.exports = authenticate_with_frappe; diff --git a/realtime/utils.js b/realtime/utils.js index 74620f3bd1..0ff3be0a49 100644 --- a/realtime/utils.js +++ b/realtime/utils.js @@ -1,11 +1,3 @@ -function get_hostname(url) { - if (!url) return undefined; - if (url.indexOf("://") > -1) { - url = url.split("/")[2]; - } - return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url; -} - function get_url(socket, path) { if (!path) { path = ""; @@ -15,5 +7,4 @@ function get_url(socket, path) { module.exports = { get_url, - get_hostname, };