refactor!: Use SocketIO namespaces for multitenancy
This commit is contained in:
parent
dad1d57b90
commit
d59e499418
4 changed files with 88 additions and 65 deletions
|
|
@ -34,13 +34,13 @@ class RealTimeClient {
|
|||
|
||||
// Enable secure option when using HTTPS
|
||||
if (window.location.protocol == "https:") {
|
||||
this.socket = io.connect(this.get_host(port), {
|
||||
this.socket = io(this.get_host(port), {
|
||||
secure: true,
|
||||
withCredentials: true,
|
||||
reconnectionAttempts: 3,
|
||||
});
|
||||
} else if (window.location.protocol == "http:") {
|
||||
this.socket = io.connect(this.get_host(port), {
|
||||
this.socket = io(this.get_host(port), {
|
||||
withCredentials: true,
|
||||
reconnectionAttempts: 3,
|
||||
});
|
||||
|
|
@ -105,7 +105,7 @@ class RealTimeClient {
|
|||
}
|
||||
host = host + ":" + port;
|
||||
}
|
||||
return host;
|
||||
return host + `/${frappe.boot.sitename}`;
|
||||
}
|
||||
|
||||
subscribe(task_id, opts) {
|
||||
|
|
|
|||
|
|
@ -104,7 +104,12 @@ def emit_via_redis(event, message, room):
|
|||
|
||||
with suppress(redis.exceptions.ConnectionError):
|
||||
r = get_redis_connection_without_auth()
|
||||
r.publish("events", frappe.as_json({"event": event, "message": message, "room": room}))
|
||||
r.publish(
|
||||
"events",
|
||||
frappe.as_json(
|
||||
{"event": event, "message": message, "room": room, "namespace": frappe.local.site}
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
|
|
@ -136,24 +141,24 @@ def get_user_info():
|
|||
|
||||
|
||||
def get_doctype_room(doctype):
|
||||
return f"{frappe.local.site}:doctype:{doctype}"
|
||||
return f"doctype:{doctype}"
|
||||
|
||||
|
||||
def get_doc_room(doctype, docname):
|
||||
return f"{frappe.local.site}:doc:{doctype}/{cstr(docname)}"
|
||||
return f"doc:{doctype}/{cstr(docname)}"
|
||||
|
||||
|
||||
def get_user_room(user):
|
||||
return f"{frappe.local.site}:user:{user}"
|
||||
return f"user:{user}"
|
||||
|
||||
|
||||
def get_site_room():
|
||||
return f"{frappe.local.site}:all"
|
||||
return "all"
|
||||
|
||||
|
||||
def get_task_progress_room(task_id):
|
||||
return f"{frappe.local.site}:task_progress:{task_id}"
|
||||
return f"task_progress:{task_id}"
|
||||
|
||||
|
||||
def get_website_room():
|
||||
return f"{frappe.local.site}:website"
|
||||
return "website"
|
||||
|
|
|
|||
|
|
@ -5,9 +5,9 @@ const { get_conf, get_redis_subscriber } = require("../node_utils");
|
|||
const conf = get_conf();
|
||||
const log = console.log; // eslint-disable-line
|
||||
|
||||
const { get_hostname, get_url } = require("./utils");
|
||||
const { get_url } = require("./utils");
|
||||
|
||||
const io = new Server(conf.socketio_port, {
|
||||
let io = new Server(conf.socketio_port, {
|
||||
cors: {
|
||||
// Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket
|
||||
origin: true,
|
||||
|
|
@ -15,17 +15,22 @@ const io = new Server(conf.socketio_port, {
|
|||
},
|
||||
});
|
||||
|
||||
// Multitenancy implementation
|
||||
// allow arbitrary sitename as namespaces
|
||||
// namespaces get validated during authentication.
|
||||
const realtime = io.of(/^\/.*$/);
|
||||
|
||||
// load and register middlewares
|
||||
const authenticate = require("./middlewares/authenticate");
|
||||
io.use(authenticate);
|
||||
realtime.use(authenticate);
|
||||
|
||||
// load and register handler
|
||||
io.on("connection", function (socket) {
|
||||
socket.join(get_user_room(socket, socket.user));
|
||||
socket.join(get_website_room(socket));
|
||||
realtime.on("connection", function (socket) {
|
||||
socket.join(get_user_room(socket.user));
|
||||
socket.join(get_website_room());
|
||||
|
||||
if (socket.user_type == "System User") {
|
||||
socket.join(get_site_room(socket));
|
||||
socket.join(get_site_room());
|
||||
}
|
||||
|
||||
socket.on("doctype_subscribe", function (doctype) {
|
||||
|
|
@ -33,27 +38,27 @@ io.on("connection", function (socket) {
|
|||
socket,
|
||||
doctype,
|
||||
callback: () => {
|
||||
socket.join(get_doctype_room(socket, doctype));
|
||||
socket.join(get_doctype_room(doctype));
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doctype_unsubscribe", function (doctype) {
|
||||
socket.leave(get_doctype_room(socket, doctype));
|
||||
socket.leave(get_doctype_room(doctype));
|
||||
});
|
||||
|
||||
socket.on("task_subscribe", function (task_id) {
|
||||
var room = get_task_room(socket, task_id);
|
||||
var room = get_task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
socket.on("task_unsubscribe", function (task_id) {
|
||||
var room = get_task_room(socket, task_id);
|
||||
var room = get_task_room(task_id);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
socket.on("progress_subscribe", function (task_id) {
|
||||
var room = get_task_room(socket, task_id);
|
||||
var room = get_task_room(task_id);
|
||||
socket.join(room);
|
||||
});
|
||||
|
||||
|
|
@ -63,14 +68,14 @@ io.on("connection", function (socket) {
|
|||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = get_doc_room(socket, doctype, docname);
|
||||
let room = get_doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("doc_unsubscribe", function (doctype, docname) {
|
||||
let room = get_doc_room(socket, doctype, docname);
|
||||
let room = get_doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
});
|
||||
|
||||
|
|
@ -80,7 +85,7 @@ io.on("connection", function (socket) {
|
|||
doctype,
|
||||
docname,
|
||||
callback: () => {
|
||||
let room = get_open_doc_room(socket, doctype, docname);
|
||||
let room = get_open_doc_room(doctype, docname);
|
||||
socket.join(room);
|
||||
socket.subscribed_documents.push([doctype, docname]);
|
||||
|
||||
|
|
@ -96,7 +101,7 @@ io.on("connection", function (socket) {
|
|||
|
||||
socket.on("doc_close", function (doctype, docname) {
|
||||
// remove this user from the list of 'who is currently viewing the form'
|
||||
let room = get_open_doc_room(socket, doctype, docname);
|
||||
let room = get_open_doc_room(doctype, docname);
|
||||
socket.leave(room);
|
||||
socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => {
|
||||
!(dt == doctype && dn == docname);
|
||||
|
|
@ -114,51 +119,32 @@ io.on("connection", function (socket) {
|
|||
});
|
||||
});
|
||||
|
||||
function get_doc_room(socket, doctype, docname) {
|
||||
return get_site_name(socket) + ":doc:" + doctype + "/" + docname;
|
||||
function get_doc_room(doctype, docname) {
|
||||
return "doc:" + doctype + "/" + docname;
|
||||
}
|
||||
|
||||
function get_open_doc_room(socket, doctype, docname) {
|
||||
return get_site_name(socket) + ":open_doc:" + doctype + "/" + docname;
|
||||
function get_open_doc_room(doctype, docname) {
|
||||
return "open_doc:" + doctype + "/" + docname;
|
||||
}
|
||||
|
||||
function get_user_room(socket, user) {
|
||||
return get_site_name(socket) + ":user:" + user || socket.user;
|
||||
function get_user_room(user) {
|
||||
return "user:" + user;
|
||||
}
|
||||
|
||||
function get_site_room(socket) {
|
||||
return get_site_name(socket) + ":all";
|
||||
function get_site_room() {
|
||||
return "all";
|
||||
}
|
||||
|
||||
function get_website_room(socket) {
|
||||
return get_site_name(socket) + ":website";
|
||||
function get_website_room() {
|
||||
return "website";
|
||||
}
|
||||
|
||||
function get_doctype_room(socket, doctype) {
|
||||
return get_site_name(socket) + ":doctype:" + doctype;
|
||||
function get_doctype_room(doctype) {
|
||||
return "doctype:" + doctype;
|
||||
}
|
||||
|
||||
function get_task_room(socket, task_id) {
|
||||
return get_site_name(socket) + ":task_progress:" + task_id;
|
||||
}
|
||||
|
||||
function get_site_name(socket) {
|
||||
if (socket.site_name) {
|
||||
return socket.site_name;
|
||||
} else if (socket.request.headers["x-frappe-site-name"]) {
|
||||
socket.site_name = get_hostname(socket.request.headers["x-frappe-site-name"]);
|
||||
} else if (
|
||||
conf.default_site &&
|
||||
["localhost", "127.0.0.1"].indexOf(get_hostname(socket.request.headers.host)) !== -1
|
||||
) {
|
||||
// from currentsite.txt since host is localhost
|
||||
socket.site_name = conf.default_site;
|
||||
} else if (socket.request.headers.origin) {
|
||||
socket.site_name = get_hostname(socket.request.headers.origin);
|
||||
} else {
|
||||
socket.site_name = get_hostname(socket.request.headers.host);
|
||||
}
|
||||
return socket.site_name;
|
||||
function get_task_room(task_id) {
|
||||
return "task_progress:" + task_id;
|
||||
}
|
||||
|
||||
function can_subscribe_doc(args) {
|
||||
|
|
@ -215,12 +201,14 @@ function notify_subscribed_doc_users(args) {
|
|||
if (!(args && args.doctype && args.docname)) {
|
||||
return;
|
||||
}
|
||||
const open_doc_room = get_open_doc_room(args.socket, args.doctype, args.docname);
|
||||
const socket = args.socket;
|
||||
const open_doc_room = get_open_doc_room(args.doctype, args.docname);
|
||||
|
||||
const clients = Array.from(io.sockets.adapter.rooms.get(open_doc_room) || []);
|
||||
const clients = Array.from(socket.nsp.adapter.rooms.get(open_doc_room) || []);
|
||||
|
||||
let users = [];
|
||||
io.sockets.sockets.forEach((sock) => {
|
||||
|
||||
socket.nsp.sockets.forEach((sock) => {
|
||||
if (clients.includes(sock.id)) {
|
||||
users.push(sock.user);
|
||||
}
|
||||
|
|
@ -230,14 +218,14 @@ function notify_subscribed_doc_users(args) {
|
|||
if (users.length == 1 && users[0] == args.socket.user) return;
|
||||
|
||||
// notify
|
||||
io.to(open_doc_room).emit("doc_viewers", {
|
||||
socket.nsp.to(open_doc_room).emit("doc_viewers", {
|
||||
doctype: args.doctype,
|
||||
docname: args.docname,
|
||||
users: Array.from(new Set(users)),
|
||||
});
|
||||
}
|
||||
|
||||
io.sockets.on("connection", function (socket) {
|
||||
realtime.on("connection", function (socket) {
|
||||
socket.on("disconnect", () => user_disconnected(socket));
|
||||
});
|
||||
|
||||
|
|
@ -252,8 +240,9 @@ const subscriber = get_redis_subscriber();
|
|||
subscriber.on("message", function (_channel, message) {
|
||||
message = JSON.parse(message);
|
||||
|
||||
let namespace = "/" + message.namespace;
|
||||
if (message.room) {
|
||||
io.to(message.room).emit(message.event, message.message);
|
||||
io.of(namespace).to(message.room).emit(message.event, message.message);
|
||||
} else {
|
||||
io.emit(message.event, message.message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,17 @@ const cookie = require("cookie");
|
|||
const request = require("superagent");
|
||||
const { get_hostname, get_url } = require("../utils");
|
||||
|
||||
const { get_conf } = require("../../node_utils");
|
||||
const conf = get_conf();
|
||||
|
||||
function authenticate_with_frappe(socket, next) {
|
||||
let namespace = socket.nsp.name;
|
||||
namespace = namespace.slice(1, namespace.length); // remove leading `/`
|
||||
|
||||
if (namespace != get_site_name(socket)) {
|
||||
next(new Error("Invalid namespace"));
|
||||
}
|
||||
|
||||
if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
|
||||
next(new Error("Invalid origin"));
|
||||
return;
|
||||
|
|
@ -38,4 +48,23 @@ function authenticate_with_frappe(socket, next) {
|
|||
});
|
||||
}
|
||||
|
||||
function get_site_name(socket) {
|
||||
if (socket.site_name) {
|
||||
return socket.site_name;
|
||||
} else if (socket.request.headers["x-frappe-site-name"]) {
|
||||
socket.site_name = get_hostname(socket.request.headers["x-frappe-site-name"]);
|
||||
} else if (
|
||||
conf.default_site &&
|
||||
["localhost", "127.0.0.1"].indexOf(get_hostname(socket.request.headers.host)) !== -1
|
||||
) {
|
||||
// from currentsite.txt since host is localhost
|
||||
socket.site_name = conf.default_site;
|
||||
} else if (socket.request.headers.origin) {
|
||||
socket.site_name = get_hostname(socket.request.headers.origin);
|
||||
} else {
|
||||
socket.site_name = get_hostname(socket.request.headers.host);
|
||||
}
|
||||
return socket.site_name;
|
||||
}
|
||||
|
||||
module.exports = authenticate_with_frappe;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue