refactor(socketio)!: list_update

- Subscribe to list_update only for the list/report views that are
  opened
- Check if user has read permission for doctype to subscribe to list
  updates
This commit is contained in:
Gavin D'souza 2022-11-15 12:40:40 +05:30
parent e97994f211
commit 9931c3af04
5 changed files with 89 additions and 31 deletions

View file

@ -1315,6 +1315,7 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
if (this.list_view_settings && this.list_view_settings.disable_auto_refresh) {
return;
}
frappe.socketio.list_subscribe(this.doctype);
frappe.realtime.on("list_update", (data) => {
if (this.avoid_realtime_update()) {
return;

View file

@ -133,6 +133,9 @@ frappe.socketio = {
task_unsubscribe: function (task_id) {
frappe.socketio.socket.emit("task_unsubscribe", task_id);
},
list_subscribe: function (doctype) {
frappe.socketio.socket.emit("list_update", doctype);
},
doc_subscribe: function (doctype, docname) {
if (frappe.flags.doc_subscribe) {
console.log("throttled");

View file

@ -56,6 +56,7 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
if (this.list_view_settings?.disable_auto_refresh) {
return;
}
frappe.socketio.list_subscribe(this.doctype);
frappe.realtime.on("list_update", (data) => this.on_update(data));
}

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import os
from contextlib import suppress
import redis
@ -22,14 +23,14 @@ def publish_progress(percent, title=None, doctype=None, docname=None, descriptio
def publish_realtime(
event=None,
message=None,
room=None,
user=None,
doctype=None,
docname=None,
task_id=None,
after_commit=False,
event: str = None,
message: dict = None,
room: str = None,
user: str = None,
doctype: str = None,
docname: str = None,
task_id: str = None,
after_commit: bool = False,
):
"""Publish real-time updates
@ -44,29 +45,29 @@ def publish_realtime(
message = {}
if event is None:
if getattr(frappe.local, "task_id", None):
event = "task_progress"
else:
event = "global"
if event == "msgprint" and not user:
event = "task_progress" if frappe.local.task_id else "global"
elif event == "msgprint" and not user:
user = frappe.session.user
elif event == "list_update":
doctype = doctype or message.get("doctype")
room = get_list_room(doctype)
if not task_id and hasattr(frappe.local, "task_id"):
task_id = frappe.local.task_id
if not room:
if not task_id and hasattr(frappe.local, "task_id"):
task_id = frappe.local.task_id
if task_id:
room = get_task_progress_room(task_id)
if not "task_id" in message:
message["task_id"] = task_id
after_commit = False
if "task_id" not in message:
message["task_id"] = task_id
room = get_task_progress_room(task_id)
elif user:
# transmit to specific user: System, Website or Guest
room = get_user_room(user)
elif doctype and docname:
room = get_doc_room(doctype, docname)
else:
# This will be broadcasted to all Desk users
room = get_site_room()
if after_commit:
@ -83,13 +84,10 @@ def emit_via_redis(event, message, room):
:param event: Event name, like `task_progress` etc.
:param message: JSON message object. For async must contain `task_id`
:param room: name of the room"""
r = get_redis_server()
try:
with suppress(redis.exceptions.ConnectionError):
r = get_redis_server()
r.publish("events", frappe.as_json({"event": event, "message": message, "room": room}))
except redis.exceptions.ConnectionError:
# print(frappe.get_traceback())
pass
def get_redis_server():
@ -117,6 +115,19 @@ def can_subscribe_doc(doctype, docname):
return True
@frappe.whitelist(allow_guest=True)
def can_subscribe_list(doctype):
if os.environ.get("CI"):
return True
from frappe.exceptions import PermissionError
if not frappe.has_permission(user=frappe.session.user, doctype=doctype, ptype="read"):
raise PermissionError()
return True
@frappe.whitelist(allow_guest=True)
def get_user_info():
from frappe.sessions import Session
@ -129,17 +140,21 @@ def get_user_info():
}
def get_list_room(doctype):
return f"{frappe.local.site}:list:{doctype}"
def get_doc_room(doctype, docname):
return "".join([frappe.local.site, ":doc:", doctype, "/", cstr(docname)])
return f"{frappe.local.site}:doc:{doctype}/{cstr(docname)}"
def get_user_room(user):
return "".join([frappe.local.site, ":user:", user])
return f"{frappe.local.site}:user:{user}"
def get_site_room():
return "".join([frappe.local.site, ":all"])
return f"{frappe.local.site}:all"
def get_task_progress_room(task_id):
return "".join([frappe.local.site, ":task_progress:", task_id])
return f"{frappe.local.site}:task_progress:{task_id}"

View file

@ -57,10 +57,20 @@ io.on("connection", function (socket) {
const room = get_user_room(socket);
socket.join(room);
if (socket.user == "System User") {
if (socket.user_type == "System User") {
socket.join(get_site_room(socket));
}
socket.on("list_update", function (doctype) {
can_subscribe_list({
socket,
doctype,
callback: () => {
socket.join(get_list_room(socket, doctype));
},
});
});
socket.on("task_subscribe", function (task_id) {
var room = get_task_room(socket, task_id);
socket.join(room);
@ -220,6 +230,10 @@ function get_site_room(socket) {
return get_site_name(socket) + ":all";
}
function get_list_room(socket, doctype) {
return get_site_name(socket) + ":list:" + doctype;
}
function get_task_room(socket, task_id) {
return get_site_name(socket) + ":task_progress:" + task_id;
}
@ -284,6 +298,30 @@ function can_subscribe_doc(args) {
});
}
function can_subscribe_list(args) {
if (!args) return;
if (!args.doctype) return;
request
.get(get_url(args.socket, "/api/method/frappe.realtime.can_subscribe_list"))
.type("form")
.query({
sid: args.socket.sid,
doctype: args.doctype,
})
.end(function (err, res) {
if (!res || res.status == 403 || err) {
if (err) {
log(err);
}
return false;
} else if (res.status == 200) {
args?.callback(err, res);
return true;
}
log("ERROR (can_subscribe_list): ", err, res);
});
}
function send_users(args, action) {
if (!(args && args.doctype && args.docname)) {
return;