From d59e499418b26501cf9775ebec95b1c12860c358 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 29 Jun 2023 22:06:01 +0530 Subject: [PATCH] refactor!: Use SocketIO namespaces for multitenancy --- frappe/public/js/frappe/socketio_client.js | 6 +- frappe/realtime.py | 19 +++-- realtime/index.js | 99 ++++++++++------------ realtime/middlewares/authenticate.js | 29 +++++++ 4 files changed, 88 insertions(+), 65 deletions(-) diff --git a/frappe/public/js/frappe/socketio_client.js b/frappe/public/js/frappe/socketio_client.js index 9e70e6a835..bfcf0b1a6c 100644 --- a/frappe/public/js/frappe/socketio_client.js +++ b/frappe/public/js/frappe/socketio_client.js @@ -34,13 +34,13 @@ class RealTimeClient { // Enable secure option when using HTTPS if (window.location.protocol == "https:") { - this.socket = io.connect(this.get_host(port), { + this.socket = io(this.get_host(port), { secure: true, withCredentials: true, reconnectionAttempts: 3, }); } else if (window.location.protocol == "http:") { - this.socket = io.connect(this.get_host(port), { + this.socket = io(this.get_host(port), { withCredentials: true, reconnectionAttempts: 3, }); @@ -105,7 +105,7 @@ class RealTimeClient { } host = host + ":" + port; } - return host; + return host + `/${frappe.boot.sitename}`; } subscribe(task_id, opts) { diff --git a/frappe/realtime.py b/frappe/realtime.py index 47b3d8a95c..eb5d4a7541 100644 --- a/frappe/realtime.py +++ b/frappe/realtime.py @@ -104,7 +104,12 @@ def emit_via_redis(event, message, room): with suppress(redis.exceptions.ConnectionError): r = get_redis_connection_without_auth() - r.publish("events", frappe.as_json({"event": event, "message": message, "room": room})) + r.publish( + "events", + frappe.as_json( + {"event": event, "message": message, "room": room, "namespace": frappe.local.site} + ), + ) @frappe.whitelist(allow_guest=True) @@ -136,24 +141,24 @@ def get_user_info(): def get_doctype_room(doctype): - return f"{frappe.local.site}:doctype:{doctype}" + return f"doctype:{doctype}" def get_doc_room(doctype, docname): - return f"{frappe.local.site}:doc:{doctype}/{cstr(docname)}" + return f"doc:{doctype}/{cstr(docname)}" def get_user_room(user): - return f"{frappe.local.site}:user:{user}" + return f"user:{user}" def get_site_room(): - return f"{frappe.local.site}:all" + return "all" def get_task_progress_room(task_id): - return f"{frappe.local.site}:task_progress:{task_id}" + return f"task_progress:{task_id}" def get_website_room(): - return f"{frappe.local.site}:website" + return "website" diff --git a/realtime/index.js b/realtime/index.js index c625608a2f..3c01db7246 100644 --- a/realtime/index.js +++ b/realtime/index.js @@ -5,9 +5,9 @@ const { get_conf, get_redis_subscriber } = require("../node_utils"); const conf = get_conf(); const log = console.log; // eslint-disable-line -const { get_hostname, get_url } = require("./utils"); +const { get_url } = require("./utils"); -const io = new Server(conf.socketio_port, { +let io = new Server(conf.socketio_port, { cors: { // Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket origin: true, @@ -15,17 +15,22 @@ const io = new Server(conf.socketio_port, { }, }); +// Multitenancy implementation +// allow arbitrary sitename as namespaces +// namespaces get validated during authentication. +const realtime = io.of(/^\/.*$/); + // load and register middlewares const authenticate = require("./middlewares/authenticate"); -io.use(authenticate); +realtime.use(authenticate); // load and register handler -io.on("connection", function (socket) { - socket.join(get_user_room(socket, socket.user)); - socket.join(get_website_room(socket)); +realtime.on("connection", function (socket) { + socket.join(get_user_room(socket.user)); + socket.join(get_website_room()); if (socket.user_type == "System User") { - socket.join(get_site_room(socket)); + socket.join(get_site_room()); } socket.on("doctype_subscribe", function (doctype) { @@ -33,27 +38,27 @@ io.on("connection", function (socket) { socket, doctype, callback: () => { - socket.join(get_doctype_room(socket, doctype)); + socket.join(get_doctype_room(doctype)); }, }); }); socket.on("doctype_unsubscribe", function (doctype) { - socket.leave(get_doctype_room(socket, doctype)); + socket.leave(get_doctype_room(doctype)); }); socket.on("task_subscribe", function (task_id) { - var room = get_task_room(socket, task_id); + var room = get_task_room(task_id); socket.join(room); }); socket.on("task_unsubscribe", function (task_id) { - var room = get_task_room(socket, task_id); + var room = get_task_room(task_id); socket.leave(room); }); socket.on("progress_subscribe", function (task_id) { - var room = get_task_room(socket, task_id); + var room = get_task_room(task_id); socket.join(room); }); @@ -63,14 +68,14 @@ io.on("connection", function (socket) { doctype, docname, callback: () => { - let room = get_doc_room(socket, doctype, docname); + let room = get_doc_room(doctype, docname); socket.join(room); }, }); }); socket.on("doc_unsubscribe", function (doctype, docname) { - let room = get_doc_room(socket, doctype, docname); + let room = get_doc_room(doctype, docname); socket.leave(room); }); @@ -80,7 +85,7 @@ io.on("connection", function (socket) { doctype, docname, callback: () => { - let room = get_open_doc_room(socket, doctype, docname); + let room = get_open_doc_room(doctype, docname); socket.join(room); socket.subscribed_documents.push([doctype, docname]); @@ -96,7 +101,7 @@ io.on("connection", function (socket) { socket.on("doc_close", function (doctype, docname) { // remove this user from the list of 'who is currently viewing the form' - let room = get_open_doc_room(socket, doctype, docname); + let room = get_open_doc_room(doctype, docname); socket.leave(room); socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => { !(dt == doctype && dn == docname); @@ -114,51 +119,32 @@ io.on("connection", function (socket) { }); }); -function get_doc_room(socket, doctype, docname) { - return get_site_name(socket) + ":doc:" + doctype + "/" + docname; +function get_doc_room(doctype, docname) { + return "doc:" + doctype + "/" + docname; } -function get_open_doc_room(socket, doctype, docname) { - return get_site_name(socket) + ":open_doc:" + doctype + "/" + docname; +function get_open_doc_room(doctype, docname) { + return "open_doc:" + doctype + "/" + docname; } -function get_user_room(socket, user) { - return get_site_name(socket) + ":user:" + user || socket.user; +function get_user_room(user) { + return "user:" + user; } -function get_site_room(socket) { - return get_site_name(socket) + ":all"; +function get_site_room() { + return "all"; } -function get_website_room(socket) { - return get_site_name(socket) + ":website"; +function get_website_room() { + return "website"; } -function get_doctype_room(socket, doctype) { - return get_site_name(socket) + ":doctype:" + doctype; +function get_doctype_room(doctype) { + return "doctype:" + doctype; } -function get_task_room(socket, task_id) { - return get_site_name(socket) + ":task_progress:" + task_id; -} - -function get_site_name(socket) { - if (socket.site_name) { - return socket.site_name; - } else if (socket.request.headers["x-frappe-site-name"]) { - socket.site_name = get_hostname(socket.request.headers["x-frappe-site-name"]); - } else if ( - conf.default_site && - ["localhost", "127.0.0.1"].indexOf(get_hostname(socket.request.headers.host)) !== -1 - ) { - // from currentsite.txt since host is localhost - socket.site_name = conf.default_site; - } else if (socket.request.headers.origin) { - socket.site_name = get_hostname(socket.request.headers.origin); - } else { - socket.site_name = get_hostname(socket.request.headers.host); - } - return socket.site_name; +function get_task_room(task_id) { + return "task_progress:" + task_id; } function can_subscribe_doc(args) { @@ -215,12 +201,14 @@ function notify_subscribed_doc_users(args) { if (!(args && args.doctype && args.docname)) { return; } - const open_doc_room = get_open_doc_room(args.socket, args.doctype, args.docname); + const socket = args.socket; + const open_doc_room = get_open_doc_room(args.doctype, args.docname); - const clients = Array.from(io.sockets.adapter.rooms.get(open_doc_room) || []); + const clients = Array.from(socket.nsp.adapter.rooms.get(open_doc_room) || []); let users = []; - io.sockets.sockets.forEach((sock) => { + + socket.nsp.sockets.forEach((sock) => { if (clients.includes(sock.id)) { users.push(sock.user); } @@ -230,14 +218,14 @@ function notify_subscribed_doc_users(args) { if (users.length == 1 && users[0] == args.socket.user) return; // notify - io.to(open_doc_room).emit("doc_viewers", { + socket.nsp.to(open_doc_room).emit("doc_viewers", { doctype: args.doctype, docname: args.docname, users: Array.from(new Set(users)), }); } -io.sockets.on("connection", function (socket) { +realtime.on("connection", function (socket) { socket.on("disconnect", () => user_disconnected(socket)); }); @@ -252,8 +240,9 @@ const subscriber = get_redis_subscriber(); subscriber.on("message", function (_channel, message) { message = JSON.parse(message); + let namespace = "/" + message.namespace; if (message.room) { - io.to(message.room).emit(message.event, message.message); + io.of(namespace).to(message.room).emit(message.event, message.message); } else { io.emit(message.event, message.message); } diff --git a/realtime/middlewares/authenticate.js b/realtime/middlewares/authenticate.js index b18af4fecf..9bd2ed592e 100644 --- a/realtime/middlewares/authenticate.js +++ b/realtime/middlewares/authenticate.js @@ -2,7 +2,17 @@ const cookie = require("cookie"); const request = require("superagent"); const { get_hostname, get_url } = require("../utils"); +const { get_conf } = require("../../node_utils"); +const conf = get_conf(); + function authenticate_with_frappe(socket, next) { + let namespace = socket.nsp.name; + namespace = namespace.slice(1, namespace.length); // remove leading `/` + + if (namespace != get_site_name(socket)) { + next(new Error("Invalid namespace")); + } + if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) { next(new Error("Invalid origin")); return; @@ -38,4 +48,23 @@ function authenticate_with_frappe(socket, next) { }); } +function get_site_name(socket) { + if (socket.site_name) { + return socket.site_name; + } else if (socket.request.headers["x-frappe-site-name"]) { + socket.site_name = get_hostname(socket.request.headers["x-frappe-site-name"]); + } else if ( + conf.default_site && + ["localhost", "127.0.0.1"].indexOf(get_hostname(socket.request.headers.host)) !== -1 + ) { + // from currentsite.txt since host is localhost + socket.site_name = conf.default_site; + } else if (socket.request.headers.origin) { + socket.site_name = get_hostname(socket.request.headers.origin); + } else { + socket.site_name = get_hostname(socket.request.headers.host); + } + return socket.site_name; +} + module.exports = authenticate_with_frappe;