From 0d92b8b195e8f18c1932e58d010a5889023e5762 Mon Sep 17 00:00:00 2001 From: Akhil Narang Date: Fri, 19 Dec 2025 19:04:10 +0530 Subject: [PATCH] fix(query): validate permissions before join Signed-off-by: Akhil Narang --- frappe/database/query.py | 94 +++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 40 deletions(-) diff --git a/frappe/database/query.py b/frappe/database/query.py index 3a445c5760..1656560e9a 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -358,7 +358,7 @@ class Engine: self.query._child_queries = [] for field in self.fields: if isinstance(field, DynamicTableField): - self.query = field.apply_select(self.query) + self.query = field.apply_select(self.query, engine=self) elif isinstance(field, ChildQuery): self.query._child_queries.append(field) else: @@ -781,7 +781,7 @@ class Engine: ) self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm) - self.query = dynamic_field.apply_join(self.query) + self.query = dynamic_field.apply_join(self.query, engine=self) # Return the pypika Field object associated with the dynamic field return dynamic_field.field else: @@ -842,7 +842,7 @@ class Engine: self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm) # Delegate join logic - self.query = child_field_handler.apply_join(self.query) + self.query = child_field_handler.apply_join(self.query, engine=self) # Return the pypika Field object from the handler return child_field_handler.field else: @@ -882,7 +882,7 @@ class Engine: self._check_field_permission( df.options, target_fieldname, parent_doctype_for_perm ) - self.query = child_field_handler.apply_join(self.query) + self.query = child_field_handler.apply_join(self.query, engine=self) return child_field_handler.field self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm) @@ -1175,7 +1175,7 @@ class Engine: self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname) # Apply join for the dynamic field - self.query = dynamic_field.apply_join(self.query) + self.query = dynamic_field.apply_join(self.query, engine=self) return dynamic_field.field else: # Validate as simple field name (alphanumeric + underscore only) @@ -1334,8 +1334,12 @@ class Engine: return allowed_fields - def get_user_permission_conditions(self) -> list[Criterion]: + def get_user_permission_conditions( + self, doctype: str | None = None, table: Table | None = None + ) -> list[Criterion]: """Build conditions for user permissions.""" + doctype = doctype or self.permission_doctype + table = table or self.permission_table conditions = [] if self.ignore_user_permissions: @@ -1346,7 +1350,7 @@ class Engine: if not user_permissions: return conditions - doctype_link_fields = self.get_doctype_link_fields() + doctype_link_fields = self.get_doctype_link_fields(doctype) for df in doctype_link_fields: if df.get("ignore_user_permissions"): continue @@ -1364,25 +1368,26 @@ class Engine: elif df.get("fieldname") == "name" and self.reference_doctype: if permission.get("applicable_for") == self.reference_doctype: docs.append(permission.get("doc")) - elif permission.get("applicable_for") == self.permission_doctype: + elif permission.get("applicable_for") == doctype: docs.append(permission.get("doc")) if docs: field_name = df.get("fieldname") strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions") if strict_user_permissions: - conditions.append(self.permission_table[field_name].isin(docs)) + conditions.append(table[field_name].isin(docs)) else: - empty_value_condition = functions.IfNull(self.permission_table[field_name], "") == "" - value_condition = self.permission_table[field_name].isin(docs) + empty_value_condition = functions.IfNull(table[field_name], "") == "" + value_condition = table[field_name].isin(docs) conditions.append(empty_value_condition | value_condition) return conditions - def get_doctype_link_fields(self): - meta = frappe.get_meta(self.permission_doctype) + def get_doctype_link_fields(self, doctype: str | None = None): + doctype = doctype or self.permission_doctype + meta = frappe.get_meta(doctype) # append current doctype with fieldname as 'name' as first link field - doctype_link_fields = [{"options": self.permission_doctype, "fieldname": "name"}] + doctype_link_fields = [{"options": doctype, "fieldname": "name"}] # append other link fields doctype_link_fields.extend(meta.get_link_fields()) return doctype_link_fields @@ -1419,29 +1424,32 @@ class Engine: self.table.parent == self.permission_table.name ) - role_permissions = frappe.permissions.get_role_permissions(self.permission_doctype, user=self.user) + if condition := self.get_permission_conditions(self.permission_doctype, self.permission_table): + self.query = self.query.where(condition) + + def get_permission_conditions(self, doctype: str, table: Table) -> Criterion | None: + role_permissions = frappe.permissions.get_role_permissions(doctype, user=self.user) has_role_permission = role_permissions.get("read") or role_permissions.get("select") if not has_role_permission: # no role permissions, apply only share permissions - shared_docs = frappe.share.get_shared(self.permission_doctype, self.user) + shared_docs = frappe.share.get_shared(doctype, self.user) if not shared_docs: # this should NEVER happen, but being defensive self._raise_permission_error() - self.query = self.query.where(self.permission_table.name.isin(shared_docs)) - return + return table.name.isin(shared_docs) # build conditions from: if_owner constraint OR user permissions conditions = [] if self.requires_owner_constraint(role_permissions): # skip user perm check if owner constraint is required - conditions.append(self.permission_table.owner == self.user) - elif user_perm_conditions := self.get_user_permission_conditions(): + conditions.append(table.owner == self.user) + elif user_perm_conditions := self.get_user_permission_conditions(doctype, table): conditions.extend(user_perm_conditions) - conditions.extend(self.get_permission_query_conditions()) + conditions.extend(self.get_permission_query_conditions(doctype)) if not conditions: # no conditions to apply, all documents are accessible @@ -1450,31 +1458,28 @@ class Engine: where_condition = Criterion.all(conditions) # since some conditions apply, we need to consider shared docs as well - shared_docs = frappe.share.get_shared(self.permission_doctype, self.user) + shared_docs = frappe.share.get_shared(doctype, self.user) if shared_docs: # shared docs trump all other restrictions - where_condition |= self.permission_table.name.isin(shared_docs) + where_condition |= table.name.isin(shared_docs) - self.query = self.query.where(where_condition) + return where_condition - def get_permission_query_conditions(self) -> list["RawCriterion"]: + def get_permission_query_conditions(self, doctype: str | None = None) -> list["RawCriterion"]: """Add permission query conditions from hooks and server scripts""" from frappe.core.doctype.server_script.server_script_utils import get_server_script_map + doctype = doctype or self.permission_doctype conditions = [] hooks = frappe.get_hooks("permission_query_conditions", {}) - condition_methods = hooks.get(self.permission_doctype, []) + hooks.get("*", []) + condition_methods = hooks.get(doctype, []) + hooks.get("*", []) for method in condition_methods: - if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.permission_doctype): + if c := frappe.call(frappe.get_attr(method), self.user, doctype=doctype): conditions.append(RawCriterion(f"({c})")) # Get conditions from server scripts - if ( - permission_script_name := get_server_script_map() - .get("permission_query", {}) - .get(self.permission_doctype) - ): + if permission_script_name := get_server_script_map().get("permission_query", {}).get(doctype): script = frappe.get_doc("Server Script", permission_script_name) if condition := script.get_permission_query_conditions(self.user): conditions.append(RawCriterion(f"({condition})")) @@ -1730,7 +1735,10 @@ class DynamicTableField: return None - def apply_select(self, query: QueryBuilder) -> QueryBuilder: + def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: + raise NotImplementedError + + def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: raise NotImplementedError @@ -1751,12 +1759,12 @@ class ChildTableField(DynamicTableField): self.table = frappe.qb.DocType(self.doctype) self.field = self.table[self.fieldname] - def apply_select(self, query: QueryBuilder) -> QueryBuilder: + def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: table = frappe.qb.DocType(self.doctype) - query = self.apply_join(query) + query = self.apply_join(query, engine=engine) return query.select(getattr(table, self.fieldname).as_(self.alias or None)) - def apply_join(self, query: QueryBuilder) -> QueryBuilder: + def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: main_table = frappe.qb.DocType(self.parent_doctype) if not query.is_joined(self.table): join_conditions = (self.table.parent == main_table.name) & ( @@ -1782,16 +1790,22 @@ class LinkTableField(DynamicTableField): self.table = frappe.qb.DocType(self.doctype) self.field = self.table[self.fieldname] - def apply_select(self, query: QueryBuilder) -> QueryBuilder: + def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: table = frappe.qb.DocType(self.doctype) - query = self.apply_join(query) + query = self.apply_join(query, engine=engine) return query.select(getattr(table, self.fieldname).as_(self.alias or None)) - def apply_join(self, query: QueryBuilder) -> QueryBuilder: + def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder: table = frappe.qb.DocType(self.doctype) main_table = frappe.qb.DocType(self.parent_doctype) if not query.is_joined(table): - query = query.left_join(table).on(table.name == getattr(main_table, self.link_fieldname)) + clause = table.name == getattr(main_table, self.link_fieldname) + + if engine and engine.apply_permissions: + if condition := engine.get_permission_conditions(self.doctype, table): + clause &= condition + + query = query.left_join(table).on(clause) return query