fix: allow filtering by all permlevel 0 fields with select permission
Users with only 'select' permission can now filter, order by, and group by all permlevel 0 fields, not just search fields. - Added _get_filterable_fields() returning all permlevel 0 fields for select permission users - Split permission checking into check_select_field_permission() and check_filter_field_permission() - select field validation uses check_select_field_permission - Filter/order/group by validation uses check_filter_field_permission
This commit is contained in:
parent
ffe362316d
commit
369f15ac09
2 changed files with 117 additions and 16 deletions
|
|
@ -812,7 +812,7 @@ class Engine:
|
|||
if parsed := self._parse_backtick_field_notation(field):
|
||||
table_name, field_name = parsed
|
||||
|
||||
self._check_field_permission(table_name, field_name)
|
||||
self.check_filter_field_permission(table_name, field_name)
|
||||
|
||||
# Return query builder field reference
|
||||
return frappe.qb.DocType(table_name)[field_name]
|
||||
|
|
@ -835,7 +835,7 @@ class Engine:
|
|||
parent_doctype_for_perm = (
|
||||
dynamic_field.parent_doctype if isinstance(dynamic_field, ChildTableField) else None
|
||||
)
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
# Return the pypika Field object associated with the dynamic field
|
||||
|
|
@ -879,7 +879,9 @@ class Engine:
|
|||
|
||||
# If it's not a child table, check permissions
|
||||
if not parent_fieldname:
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(
|
||||
target_doctype, target_fieldname, parent_doctype_for_perm
|
||||
)
|
||||
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
||||
|
||||
# Create a ChildTableField instance to handle join and field access
|
||||
|
|
@ -893,7 +895,7 @@ class Engine:
|
|||
|
||||
# For permission check, the parent is the main doctype
|
||||
parent_doctype_for_perm = self.doctype
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
# Delegate join logic
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
|
|
@ -933,18 +935,32 @@ class Engine:
|
|||
parent_fieldname=df.fieldname,
|
||||
)
|
||||
parent_doctype_for_perm = self.doctype
|
||||
self._check_field_permission(
|
||||
self.check_filter_field_permission(
|
||||
df.options, target_fieldname, parent_doctype_for_perm
|
||||
)
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
return child_field_handler.field
|
||||
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
# Convert string field name to pypika Field object for the specified/current doctype
|
||||
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
||||
|
||||
def _check_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to access the given field"""
|
||||
def check_select_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to select the given field."""
|
||||
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=False)
|
||||
|
||||
def check_filter_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to filter/order/group by the given field.
|
||||
|
||||
It allows all permlevel 0 fields for users with select permission,
|
||||
and all permitted fields for users with read permission.
|
||||
"""
|
||||
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=True)
|
||||
|
||||
def _check_field_permission(
|
||||
self, doctype: str, fieldname: str, parent_doctype: str | None = None, for_filtering: bool = False
|
||||
):
|
||||
"""Check if the user has permission to access the given field."""
|
||||
if not self.apply_permissions:
|
||||
return
|
||||
|
||||
|
|
@ -966,7 +982,10 @@ class Engine:
|
|||
frappe.PermissionError,
|
||||
)
|
||||
|
||||
permitted_fields = self._get_cached_permitted_fields(doctype, parent_doctype, permission_type)
|
||||
permission_source = (
|
||||
self._get_filterable_fields if for_filtering else self._get_cached_permitted_fields
|
||||
)
|
||||
permitted_fields = permission_source(doctype, parent_doctype, permission_type)
|
||||
|
||||
if fieldname not in permitted_fields:
|
||||
frappe.throw(
|
||||
|
|
@ -992,6 +1011,42 @@ class Engine:
|
|||
)
|
||||
return self.permitted_fields_cache[cache_key]
|
||||
|
||||
def _get_filterable_fields(
|
||||
self, doctype: str, parenttype: str | None = None, permission_type: str | None = None
|
||||
) -> set:
|
||||
"""Get fields that can be used in filters/order by/group by.
|
||||
|
||||
For users with only select permission on parent doctypes, this returns
|
||||
all permlevel 0 fields (not just search fields which are used for selected fields).
|
||||
For users with read permission, returns standard permitted fields.
|
||||
"""
|
||||
if permission_type is None:
|
||||
permission_type = self.get_permission_type(doctype, parenttype)
|
||||
|
||||
if permission_type == "select":
|
||||
meta = frappe.get_meta(doctype)
|
||||
|
||||
# Only allow filtering by all permlevel 0 fields for parent doctypes.
|
||||
if meta.istable:
|
||||
return set()
|
||||
|
||||
# for select permission on parent doctype, allow all permlevel 0 fields in filters
|
||||
cache_key = (doctype, None, "_filterable_select")
|
||||
if cache_key not in self.permitted_fields_cache:
|
||||
if doctype in CORE_DOCTYPES:
|
||||
# core doctypes have no restrictions - return all valid columns
|
||||
self.permitted_fields_cache[cache_key] = set(meta.get_valid_columns())
|
||||
else:
|
||||
permlevel_0_fields = set(meta.default_fields) | OPTIONAL_FIELDS
|
||||
for df in meta.get_fieldnames_with_value(with_field_meta=True, with_virtual_fields=False):
|
||||
if df.permlevel == 0:
|
||||
permlevel_0_fields.add(df.fieldname)
|
||||
self.permitted_fields_cache[cache_key] = permlevel_0_fields
|
||||
return self.permitted_fields_cache[cache_key]
|
||||
else:
|
||||
# for read permission, use standard permitted fields
|
||||
return self._get_cached_permitted_fields(doctype, parenttype, permission_type)
|
||||
|
||||
def parse_string_field(self, field: str):
|
||||
"""
|
||||
Parses a field string into a pypika Field object.
|
||||
|
|
@ -1209,7 +1264,7 @@ class Engine:
|
|||
if "`" in field_name:
|
||||
if parsed := self._parse_backtick_field_notation(field_name):
|
||||
table_name, field_name = parsed
|
||||
self._check_field_permission(table_name, field_name)
|
||||
self.check_filter_field_permission(table_name, field_name)
|
||||
return frappe.qb.DocType(table_name)[field_name]
|
||||
|
||||
# If parsing failed, fall through to error handling below
|
||||
|
|
@ -1223,14 +1278,14 @@ class Engine:
|
|||
if dynamic_field:
|
||||
# Check permissions for dynamic field
|
||||
if isinstance(dynamic_field, ChildTableField):
|
||||
self._check_field_permission(
|
||||
self.check_filter_field_permission(
|
||||
dynamic_field.doctype, dynamic_field.fieldname, dynamic_field.parent_doctype
|
||||
)
|
||||
elif isinstance(dynamic_field, LinkTableField):
|
||||
# Check permission for the link field in parent doctype
|
||||
self._check_field_permission(self.doctype, dynamic_field.link_fieldname)
|
||||
self.check_filter_field_permission(self.doctype, dynamic_field.link_fieldname)
|
||||
# Check permission for the target field in linked doctype
|
||||
self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
||||
self.check_filter_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
||||
|
||||
# Apply join for the dynamic field
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
|
|
@ -1246,7 +1301,7 @@ class Engine:
|
|||
)
|
||||
|
||||
# Check permissions for simple field
|
||||
self._check_field_permission(self.doctype, field_name)
|
||||
self.check_filter_field_permission(self.doctype, field_name)
|
||||
|
||||
# Create Field object for simple field
|
||||
return self.table[field_name]
|
||||
|
|
@ -2307,7 +2362,7 @@ class SQLFunctionParser:
|
|||
elif "`" in arg:
|
||||
if parsed := self.engine._parse_backtick_field_notation(arg):
|
||||
table_name, field_name = parsed
|
||||
self.engine._check_field_permission(table_name, field_name)
|
||||
self.engine.check_select_field_permission(table_name, field_name)
|
||||
return Table(f"tab{table_name}")[field_name]
|
||||
else:
|
||||
frappe.throw(
|
||||
|
|
@ -2356,4 +2411,4 @@ class SQLFunctionParser:
|
|||
|
||||
def _check_function_field_permission(self, field_name: str):
|
||||
if self.engine.apply_permissions and self.engine.doctype:
|
||||
self.engine._check_field_permission(self.engine.doctype, field_name)
|
||||
self.engine.check_select_field_permission(self.engine.doctype, field_name)
|
||||
|
|
|
|||
|
|
@ -991,6 +991,52 @@ class TestQuery(IntegrationTestCase):
|
|||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, force=True)
|
||||
|
||||
def test_filter_with_select_permission_allows_permlevel_0_fields(self):
|
||||
"""Test that users with only select permission can filter by all permlevel 0 fields."""
|
||||
|
||||
test_role = "SelectFilterTestRole"
|
||||
test_user_email = "test2@example.com"
|
||||
test_note_title = "Select Filter Test Note"
|
||||
|
||||
# Cleanup previous runs
|
||||
frappe.set_user("Administrator")
|
||||
test_user = frappe.get_doc("User", test_user_email)
|
||||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
||||
frappe.delete_doc("Note", {"title": test_note_title}, ignore_missing=True, force=True)
|
||||
|
||||
# Setup Role with only 'select' on Note (no read)
|
||||
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
||||
add_permission("Note", test_role, 0, ptype="select")
|
||||
update_permission_property("Note", test_role, 0, "read", 0, validate=False)
|
||||
test_user.add_roles(test_role)
|
||||
|
||||
# Create a test note with specific content
|
||||
note = frappe.get_doc(
|
||||
doctype="Note", title=test_note_title, content="Specific Content", public=1
|
||||
).insert(ignore_permissions=True)
|
||||
|
||||
# Register cleanups in reverse order (LIFO) - Administrator restore must happen first
|
||||
def cleanup():
|
||||
frappe.set_user("Administrator")
|
||||
frappe.delete_doc("Note", note.name, ignore_missing=True, force=True)
|
||||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
frappe.set_user(test_user_email)
|
||||
|
||||
# 'content' is a permlevel 0 field but NOT a search field
|
||||
result = frappe.qb.get_query(
|
||||
"Note",
|
||||
filters={"content": "Specific Content"},
|
||||
fields=["name"], # Only select 'name' which is allowed
|
||||
ignore_permissions=False,
|
||||
).run(as_dict=True)
|
||||
self.assertEqual(len(result), 1, "Should find the note when filtering by permlevel 0 field")
|
||||
self.assertEqual(result[0]["name"], note.name)
|
||||
|
||||
def test_nested_permission(self):
|
||||
"""Test permission on nested doctypes"""
|
||||
frappe.set_user("Administrator")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue