fix(query): validate permissions before join
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
parent
853028c10a
commit
0d92b8b195
1 changed files with 54 additions and 40 deletions
|
|
@ -358,7 +358,7 @@ class Engine:
|
|||
self.query._child_queries = []
|
||||
for field in self.fields:
|
||||
if isinstance(field, DynamicTableField):
|
||||
self.query = field.apply_select(self.query)
|
||||
self.query = field.apply_select(self.query, engine=self)
|
||||
elif isinstance(field, ChildQuery):
|
||||
self.query._child_queries.append(field)
|
||||
else:
|
||||
|
|
@ -781,7 +781,7 @@ class Engine:
|
|||
)
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
self.query = dynamic_field.apply_join(self.query)
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
# Return the pypika Field object associated with the dynamic field
|
||||
return dynamic_field.field
|
||||
else:
|
||||
|
|
@ -842,7 +842,7 @@ class Engine:
|
|||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
# Delegate join logic
|
||||
self.query = child_field_handler.apply_join(self.query)
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
# Return the pypika Field object from the handler
|
||||
return child_field_handler.field
|
||||
else:
|
||||
|
|
@ -882,7 +882,7 @@ class Engine:
|
|||
self._check_field_permission(
|
||||
df.options, target_fieldname, parent_doctype_for_perm
|
||||
)
|
||||
self.query = child_field_handler.apply_join(self.query)
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
return child_field_handler.field
|
||||
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
|
@ -1175,7 +1175,7 @@ class Engine:
|
|||
self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
||||
|
||||
# Apply join for the dynamic field
|
||||
self.query = dynamic_field.apply_join(self.query)
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
return dynamic_field.field
|
||||
else:
|
||||
# Validate as simple field name (alphanumeric + underscore only)
|
||||
|
|
@ -1334,8 +1334,12 @@ class Engine:
|
|||
|
||||
return allowed_fields
|
||||
|
||||
def get_user_permission_conditions(self) -> list[Criterion]:
|
||||
def get_user_permission_conditions(
|
||||
self, doctype: str | None = None, table: Table | None = None
|
||||
) -> list[Criterion]:
|
||||
"""Build conditions for user permissions."""
|
||||
doctype = doctype or self.permission_doctype
|
||||
table = table or self.permission_table
|
||||
conditions = []
|
||||
|
||||
if self.ignore_user_permissions:
|
||||
|
|
@ -1346,7 +1350,7 @@ class Engine:
|
|||
if not user_permissions:
|
||||
return conditions
|
||||
|
||||
doctype_link_fields = self.get_doctype_link_fields()
|
||||
doctype_link_fields = self.get_doctype_link_fields(doctype)
|
||||
for df in doctype_link_fields:
|
||||
if df.get("ignore_user_permissions"):
|
||||
continue
|
||||
|
|
@ -1364,25 +1368,26 @@ class Engine:
|
|||
elif df.get("fieldname") == "name" and self.reference_doctype:
|
||||
if permission.get("applicable_for") == self.reference_doctype:
|
||||
docs.append(permission.get("doc"))
|
||||
elif permission.get("applicable_for") == self.permission_doctype:
|
||||
elif permission.get("applicable_for") == doctype:
|
||||
docs.append(permission.get("doc"))
|
||||
|
||||
if docs:
|
||||
field_name = df.get("fieldname")
|
||||
strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions")
|
||||
if strict_user_permissions:
|
||||
conditions.append(self.permission_table[field_name].isin(docs))
|
||||
conditions.append(table[field_name].isin(docs))
|
||||
else:
|
||||
empty_value_condition = functions.IfNull(self.permission_table[field_name], "") == ""
|
||||
value_condition = self.permission_table[field_name].isin(docs)
|
||||
empty_value_condition = functions.IfNull(table[field_name], "") == ""
|
||||
value_condition = table[field_name].isin(docs)
|
||||
conditions.append(empty_value_condition | value_condition)
|
||||
|
||||
return conditions
|
||||
|
||||
def get_doctype_link_fields(self):
|
||||
meta = frappe.get_meta(self.permission_doctype)
|
||||
def get_doctype_link_fields(self, doctype: str | None = None):
|
||||
doctype = doctype or self.permission_doctype
|
||||
meta = frappe.get_meta(doctype)
|
||||
# append current doctype with fieldname as 'name' as first link field
|
||||
doctype_link_fields = [{"options": self.permission_doctype, "fieldname": "name"}]
|
||||
doctype_link_fields = [{"options": doctype, "fieldname": "name"}]
|
||||
# append other link fields
|
||||
doctype_link_fields.extend(meta.get_link_fields())
|
||||
return doctype_link_fields
|
||||
|
|
@ -1419,29 +1424,32 @@ class Engine:
|
|||
self.table.parent == self.permission_table.name
|
||||
)
|
||||
|
||||
role_permissions = frappe.permissions.get_role_permissions(self.permission_doctype, user=self.user)
|
||||
if condition := self.get_permission_conditions(self.permission_doctype, self.permission_table):
|
||||
self.query = self.query.where(condition)
|
||||
|
||||
def get_permission_conditions(self, doctype: str, table: Table) -> Criterion | None:
|
||||
role_permissions = frappe.permissions.get_role_permissions(doctype, user=self.user)
|
||||
has_role_permission = role_permissions.get("read") or role_permissions.get("select")
|
||||
|
||||
if not has_role_permission:
|
||||
# no role permissions, apply only share permissions
|
||||
shared_docs = frappe.share.get_shared(self.permission_doctype, self.user)
|
||||
shared_docs = frappe.share.get_shared(doctype, self.user)
|
||||
if not shared_docs:
|
||||
# this should NEVER happen, but being defensive
|
||||
self._raise_permission_error()
|
||||
|
||||
self.query = self.query.where(self.permission_table.name.isin(shared_docs))
|
||||
return
|
||||
return table.name.isin(shared_docs)
|
||||
|
||||
# build conditions from: if_owner constraint OR user permissions
|
||||
conditions = []
|
||||
|
||||
if self.requires_owner_constraint(role_permissions):
|
||||
# skip user perm check if owner constraint is required
|
||||
conditions.append(self.permission_table.owner == self.user)
|
||||
elif user_perm_conditions := self.get_user_permission_conditions():
|
||||
conditions.append(table.owner == self.user)
|
||||
elif user_perm_conditions := self.get_user_permission_conditions(doctype, table):
|
||||
conditions.extend(user_perm_conditions)
|
||||
|
||||
conditions.extend(self.get_permission_query_conditions())
|
||||
conditions.extend(self.get_permission_query_conditions(doctype))
|
||||
|
||||
if not conditions:
|
||||
# no conditions to apply, all documents are accessible
|
||||
|
|
@ -1450,31 +1458,28 @@ class Engine:
|
|||
where_condition = Criterion.all(conditions)
|
||||
|
||||
# since some conditions apply, we need to consider shared docs as well
|
||||
shared_docs = frappe.share.get_shared(self.permission_doctype, self.user)
|
||||
shared_docs = frappe.share.get_shared(doctype, self.user)
|
||||
if shared_docs:
|
||||
# shared docs trump all other restrictions
|
||||
where_condition |= self.permission_table.name.isin(shared_docs)
|
||||
where_condition |= table.name.isin(shared_docs)
|
||||
|
||||
self.query = self.query.where(where_condition)
|
||||
return where_condition
|
||||
|
||||
def get_permission_query_conditions(self) -> list["RawCriterion"]:
|
||||
def get_permission_query_conditions(self, doctype: str | None = None) -> list["RawCriterion"]:
|
||||
"""Add permission query conditions from hooks and server scripts"""
|
||||
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
|
||||
|
||||
doctype = doctype or self.permission_doctype
|
||||
conditions = []
|
||||
hooks = frappe.get_hooks("permission_query_conditions", {})
|
||||
condition_methods = hooks.get(self.permission_doctype, []) + hooks.get("*", [])
|
||||
condition_methods = hooks.get(doctype, []) + hooks.get("*", [])
|
||||
|
||||
for method in condition_methods:
|
||||
if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.permission_doctype):
|
||||
if c := frappe.call(frappe.get_attr(method), self.user, doctype=doctype):
|
||||
conditions.append(RawCriterion(f"({c})"))
|
||||
|
||||
# Get conditions from server scripts
|
||||
if (
|
||||
permission_script_name := get_server_script_map()
|
||||
.get("permission_query", {})
|
||||
.get(self.permission_doctype)
|
||||
):
|
||||
if permission_script_name := get_server_script_map().get("permission_query", {}).get(doctype):
|
||||
script = frappe.get_doc("Server Script", permission_script_name)
|
||||
if condition := script.get_permission_query_conditions(self.user):
|
||||
conditions.append(RawCriterion(f"({condition})"))
|
||||
|
|
@ -1730,7 +1735,10 @@ class DynamicTableField:
|
|||
|
||||
return None
|
||||
|
||||
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
||||
def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
raise NotImplementedError
|
||||
|
||||
def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
|
@ -1751,12 +1759,12 @@ class ChildTableField(DynamicTableField):
|
|||
self.table = frappe.qb.DocType(self.doctype)
|
||||
self.field = self.table[self.fieldname]
|
||||
|
||||
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
||||
def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
table = frappe.qb.DocType(self.doctype)
|
||||
query = self.apply_join(query)
|
||||
query = self.apply_join(query, engine=engine)
|
||||
return query.select(getattr(table, self.fieldname).as_(self.alias or None))
|
||||
|
||||
def apply_join(self, query: QueryBuilder) -> QueryBuilder:
|
||||
def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
main_table = frappe.qb.DocType(self.parent_doctype)
|
||||
if not query.is_joined(self.table):
|
||||
join_conditions = (self.table.parent == main_table.name) & (
|
||||
|
|
@ -1782,16 +1790,22 @@ class LinkTableField(DynamicTableField):
|
|||
self.table = frappe.qb.DocType(self.doctype)
|
||||
self.field = self.table[self.fieldname]
|
||||
|
||||
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
||||
def apply_select(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
table = frappe.qb.DocType(self.doctype)
|
||||
query = self.apply_join(query)
|
||||
query = self.apply_join(query, engine=engine)
|
||||
return query.select(getattr(table, self.fieldname).as_(self.alias or None))
|
||||
|
||||
def apply_join(self, query: QueryBuilder) -> QueryBuilder:
|
||||
def apply_join(self, query: QueryBuilder, engine: "Engine" = None) -> QueryBuilder:
|
||||
table = frappe.qb.DocType(self.doctype)
|
||||
main_table = frappe.qb.DocType(self.parent_doctype)
|
||||
if not query.is_joined(table):
|
||||
query = query.left_join(table).on(table.name == getattr(main_table, self.link_fieldname))
|
||||
clause = table.name == getattr(main_table, self.link_fieldname)
|
||||
|
||||
if engine and engine.apply_permissions:
|
||||
if condition := engine.get_permission_conditions(self.doctype, table):
|
||||
clause &= condition
|
||||
|
||||
query = query.left_join(table).on(clause)
|
||||
return query
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue