refactor(query): simplify permission logic in add_permission_conditions

- Extract _raise_permission_error helper for reuse
- Simplify get_user_permission_conditions to return list[Criterion]
- Rewrite add_permission_conditions to match documented logic:
  - No role perms → apply only share permissions
  - Role perms → (owner OR user perms) AND query conditions
  - Shared docs OR other conditions when applicable
- Move apply_permissions check inside add_permission_conditions
This commit is contained in:
Sagar Vora 2025-12-17 15:59:37 +05:30
parent 7205e1e075
commit 2b822cc63f

View file

@ -324,8 +324,7 @@ class Engine:
stacklevel=2,
)
if self.apply_permissions:
self.add_permission_conditions()
self.add_permission_conditions()
# Store metadata for masked field processing during execution
self.query._doctype = self.doctype
@ -1249,10 +1248,13 @@ class Engine:
)
if not has_permission("select") and not has_permission("read"):
frappe.throw(
_("Insufficient Permission for {0}").format(frappe.bold(self.doctype)),
frappe.PermissionError,
)
self._raise_permission_error()
def _raise_permission_error(self):
frappe.throw(
_("Insufficient Permission for {0}").format(frappe.bold(self.doctype)),
frappe.PermissionError,
)
def apply_field_permissions(self):
"""Filter the list of fields based on permlevel."""
@ -1321,20 +1323,17 @@ class Engine:
return allowed_fields
def get_user_permission_conditions(self, role_permissions) -> tuple[list, bool]:
def get_user_permission_conditions(self) -> list[Criterion]:
"""Build conditions for user permissions and return tuple of (conditions, fetch_shared_docs)"""
conditions = []
fetch_shared_docs = False
if self.ignore_user_permissions:
return conditions, fetch_shared_docs
return conditions
user_permissions = frappe.permissions.get_user_permissions(self.user)
if not user_permissions:
return conditions, fetch_shared_docs
fetch_shared_docs = True
return conditions
doctype_link_fields = self.get_doctype_link_fields()
for df in doctype_link_fields:
@ -1367,7 +1366,7 @@ class Engine:
value_condition = self.table[field_name].isin(docs)
conditions.append(empty_value_condition | value_condition)
return conditions, fetch_shared_docs
return conditions
def get_doctype_link_fields(self):
meta = frappe.get_meta(self.doctype)
@ -1378,40 +1377,65 @@ class Engine:
return doctype_link_fields
def add_permission_conditions(self):
conditions = []
role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user)
"""
Logic for adding permission conditions is as follows:
# False only when role permissions without if owner exist and no user perms are applied
fetch_shared_docs = True
has_role_permissions = False
If no role permissions with read/select exist:
- apply only share permissions
If role permissions with read/select exist:
- apply (if owner constraints OR user permissions), AND
- apply permission query conditions
If if owner / user permission / permission query constraints are applied,
final condition = (existing conditions) OR (share condtion)
(rationale: shared documents trump all other restrictions)
Else, all documents are accessible based on role permissions.
"""
if not self.apply_permissions:
return
role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user)
has_role_permission = role_permissions.get("read") or role_permissions.get("select")
if not has_role_permission:
# no role permissions, apply only share permissions
shared_docs = frappe.share.get_shared(self.doctype, self.user)
if not shared_docs:
# this should NEVER happen, but being defensive
self._raise_permission_error()
self.query = self.query.where(self.table.name.isin(shared_docs))
return
# build conditions from: if_owner constraint OR user permissions
conditions = []
if self.requires_owner_constraint(role_permissions):
# skip user perm check if owner constraint is required
conditions.append(self.table.owner == self.user)
# skip user perm check if owner constraint is required
elif role_permissions.get("read") or role_permissions.get("select"):
has_role_permissions = True
user_perm_conditions, fetch_shared_docs = self.get_user_permission_conditions(role_permissions)
elif user_perm_conditions := self.get_user_permission_conditions():
conditions.extend(user_perm_conditions)
permission_query_conditions = self.get_permission_query_conditions()
if permission_query_conditions:
conditions.extend(permission_query_conditions)
conditions.extend(self.get_permission_query_conditions())
shared_docs = []
if fetch_shared_docs:
shared_docs = frappe.share.get_shared(self.doctype, self.user)
if not conditions:
# no conditions to apply, all documents are accessible
return
where_condition = Criterion.all(conditions)
# since some conditions apply, we need to consider shared docs as well
shared_docs = frappe.share.get_shared(self.doctype, self.user)
if shared_docs:
# Depending on the presence of role permissions, decide whether to use OR or AND
if has_role_permissions:
self.query = self.query.where(Criterion.all(conditions) | self.table.name.isin(shared_docs))
else:
self.query = self.query.where(Criterion.all(conditions) & self.table.name.isin(shared_docs))
else:
# AND all permission conditions
self.query = self.query.where(Criterion.all(conditions))
# shared docs trump all other restrictions
where_condition |= self.table.name.isin(shared_docs)
def get_permission_query_conditions(self):
self.query = self.query.where(where_condition)
def get_permission_query_conditions(self) -> list["RawCriterion"]:
"""Add permission query conditions from hooks and server scripts"""
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map