From 2b822cc63f33fdfc848ec77847b95ad3f481871c Mon Sep 17 00:00:00 2001 From: Sagar Vora <16315650+sagarvora@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:59:37 +0530 Subject: [PATCH] refactor(query): simplify permission logic in add_permission_conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _raise_permission_error helper for reuse - Simplify get_user_permission_conditions to return list[Criterion] - Rewrite add_permission_conditions to match documented logic: - No role perms → apply only share permissions - Role perms → (owner OR user perms) AND query conditions - Shared docs OR other conditions when applicable - Move apply_permissions check inside add_permission_conditions --- frappe/database/query.py | 98 +++++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 37 deletions(-) diff --git a/frappe/database/query.py b/frappe/database/query.py index 6f15be1960..a2d54fd643 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -324,8 +324,7 @@ class Engine: stacklevel=2, ) - if self.apply_permissions: - self.add_permission_conditions() + self.add_permission_conditions() # Store metadata for masked field processing during execution self.query._doctype = self.doctype @@ -1249,10 +1248,13 @@ class Engine: ) if not has_permission("select") and not has_permission("read"): - frappe.throw( - _("Insufficient Permission for {0}").format(frappe.bold(self.doctype)), - frappe.PermissionError, - ) + self._raise_permission_error() + + def _raise_permission_error(self): + frappe.throw( + _("Insufficient Permission for {0}").format(frappe.bold(self.doctype)), + frappe.PermissionError, + ) def apply_field_permissions(self): """Filter the list of fields based on permlevel.""" @@ -1321,20 +1323,17 @@ class Engine: return allowed_fields - def get_user_permission_conditions(self, role_permissions) -> tuple[list, bool]: + def get_user_permission_conditions(self) -> list[Criterion]: """Build conditions for user permissions and return tuple of (conditions, fetch_shared_docs)""" conditions = [] - fetch_shared_docs = False if self.ignore_user_permissions: - return conditions, fetch_shared_docs + return conditions user_permissions = frappe.permissions.get_user_permissions(self.user) if not user_permissions: - return conditions, fetch_shared_docs - - fetch_shared_docs = True + return conditions doctype_link_fields = self.get_doctype_link_fields() for df in doctype_link_fields: @@ -1367,7 +1366,7 @@ class Engine: value_condition = self.table[field_name].isin(docs) conditions.append(empty_value_condition | value_condition) - return conditions, fetch_shared_docs + return conditions def get_doctype_link_fields(self): meta = frappe.get_meta(self.doctype) @@ -1378,40 +1377,65 @@ class Engine: return doctype_link_fields def add_permission_conditions(self): - conditions = [] - role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user) + """ + Logic for adding permission conditions is as follows: - # False only when role permissions without if owner exist and no user perms are applied - fetch_shared_docs = True - has_role_permissions = False + If no role permissions with read/select exist: + - apply only share permissions + + If role permissions with read/select exist: + - apply (if owner constraints OR user permissions), AND + - apply permission query conditions + + If if owner / user permission / permission query constraints are applied, + final condition = (existing conditions) OR (share condtion) + (rationale: shared documents trump all other restrictions) + + Else, all documents are accessible based on role permissions. + """ + + if not self.apply_permissions: + return + + role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user) + has_role_permission = role_permissions.get("read") or role_permissions.get("select") + + if not has_role_permission: + # no role permissions, apply only share permissions + shared_docs = frappe.share.get_shared(self.doctype, self.user) + if not shared_docs: + # this should NEVER happen, but being defensive + self._raise_permission_error() + + self.query = self.query.where(self.table.name.isin(shared_docs)) + return + + # build conditions from: if_owner constraint OR user permissions + conditions = [] if self.requires_owner_constraint(role_permissions): + # skip user perm check if owner constraint is required conditions.append(self.table.owner == self.user) - # skip user perm check if owner constraint is required - elif role_permissions.get("read") or role_permissions.get("select"): - has_role_permissions = True - user_perm_conditions, fetch_shared_docs = self.get_user_permission_conditions(role_permissions) + elif user_perm_conditions := self.get_user_permission_conditions(): conditions.extend(user_perm_conditions) - permission_query_conditions = self.get_permission_query_conditions() - if permission_query_conditions: - conditions.extend(permission_query_conditions) + conditions.extend(self.get_permission_query_conditions()) - shared_docs = [] - if fetch_shared_docs: - shared_docs = frappe.share.get_shared(self.doctype, self.user) + if not conditions: + # no conditions to apply, all documents are accessible + return + where_condition = Criterion.all(conditions) + + # since some conditions apply, we need to consider shared docs as well + shared_docs = frappe.share.get_shared(self.doctype, self.user) if shared_docs: - # Depending on the presence of role permissions, decide whether to use OR or AND - if has_role_permissions: - self.query = self.query.where(Criterion.all(conditions) | self.table.name.isin(shared_docs)) - else: - self.query = self.query.where(Criterion.all(conditions) & self.table.name.isin(shared_docs)) - else: - # AND all permission conditions - self.query = self.query.where(Criterion.all(conditions)) + # shared docs trump all other restrictions + where_condition |= self.table.name.isin(shared_docs) - def get_permission_query_conditions(self): + self.query = self.query.where(where_condition) + + def get_permission_query_conditions(self) -> list["RawCriterion"]: """Add permission query conditions from hooks and server scripts""" from frappe.core.doctype.server_script.server_script_utils import get_server_script_map