diff --git a/frappe/database/query.py b/frappe/database/query.py index 47a8b4d887..97e86df1b9 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -546,14 +546,23 @@ class Engine: """Filter the list of fields based on permlevel.""" allowed_fields = [] parent_permission_type = self.get_permission_type(self.doctype) + permitted_fields_cache = {} - permitted_fields_set = set( - get_permitted_fields( - doctype=self.doctype, - parenttype=self.parent_doctype, - permission_type=parent_permission_type, - ignore_virtual=True, - ) + def get_cached_permitted_fields(doctype, parenttype, permission_type): + cache_key = (doctype, parenttype, permission_type) + if cache_key not in permitted_fields_cache: + permitted_fields_cache[cache_key] = set( + get_permitted_fields( + doctype=doctype, + parenttype=parenttype, + permission_type=permission_type, + ignore_virtual=True, + ) + ) + return permitted_fields_cache[cache_key] + + permitted_fields_set = get_cached_permitted_fields( + self.doctype, self.parent_doctype, parent_permission_type ) for field in self.fields: @@ -563,13 +572,8 @@ class Engine: continue # Cache permitted fields for child doctypes if accessed multiple times - permitted_child_fields_set = set( - get_permitted_fields( - doctype=field.doctype, - parenttype=field.parent_doctype, - permission_type=self.get_permission_type(field.doctype), - ignore_virtual=True, - ) + permitted_child_fields_set = get_cached_permitted_fields( + field.doctype, field.parent_doctype, self.get_permission_type(field.doctype) ) # Check permission for the specific field in the child table if field.fieldname in permitted_child_fields_set: @@ -577,20 +581,27 @@ class Engine: elif isinstance(field, LinkTableField): # Check permission for the link field *in the parent doctype* if field.link_fieldname in permitted_fields_set: - allowed_fields.append(field) + # Also check if user has permission to read/select the target doctype + target_doctype = field.doctype + has_target_perm = frappe.has_permission( + target_doctype, "select", user=self.user + ) or frappe.has_permission(target_doctype, "read", user=self.user) + + if has_target_perm: + # Finally, check if the specific field *in the target doctype* is permitted + permitted_target_fields_set = get_cached_permitted_fields( + target_doctype, None, self.get_permission_type(target_doctype) + ) + if field.fieldname in permitted_target_fields_set: + allowed_fields.append(field) elif isinstance(field, ChildQuery): if parent_permission_type == "select": # Skip child queries if parent permission is only 'select' continue # Cache permitted fields for the child doctype of the query - permitted_child_fields_set = set( - get_permitted_fields( - doctype=field.doctype, - parenttype=field.parent_doctype, - permission_type=self.get_permission_type(field.doctype), - ignore_virtual=True, - ) + permitted_child_fields_set = get_cached_permitted_fields( + field.doctype, field.parent_doctype, self.get_permission_type(field.doctype) ) # Filter the fields *within* the ChildQuery object based on permissions field.fields = [f for f in field.fields if f in permitted_child_fields_set] diff --git a/frappe/tests/test_query.py b/frappe/tests/test_query.py index 988af95aaa..cabb7e8bcb 100644 --- a/frappe/tests/test_query.py +++ b/frappe/tests/test_query.py @@ -877,6 +877,116 @@ class TestQuery(IntegrationTestCase): frappe.clear_cache() frappe.hooks.permission_query_conditions = original_hooks + def test_link_field_target_permission(self): + """Test that accessing link_field.target_field respects target field's permlevel.""" + target_dt_name = "TargetDocForLinkPerm" + source_dt_name = "SourceDocForLinkPerm" + test_role = "LinkPermTestRole" + test_user = "test2@example.com" + + # Cleanup previous runs + frappe.set_user("Administrator") + frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True) + frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True) + frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) + test_user_doc = frappe.get_doc("User", test_user) + test_user_doc.remove_roles(test_role) + + # Create Doctypes + target_dt = new_doctype( + target_dt_name, + fields=[ + {"fieldname": "target_field", "fieldtype": "Data", "permlevel": 1, "label": "Target Field"}, + {"fieldname": "other_target_field", "fieldtype": "Data", "label": "Other Target Field"}, + ], + ).insert(ignore_if_duplicate=True) + + source_dt = new_doctype( + source_dt_name, + fields=[ + { + "fieldname": "link_field", + "fieldtype": "Link", + "options": target_dt_name, + "label": "Link Field", + } + ], + ).insert(ignore_if_duplicate=True) + + # Create Records + target_doc = frappe.get_doc( + doctype=target_dt_name, target_field="Secret Data", other_target_field="Public Data" + ).insert(ignore_permissions=True) + source_doc = frappe.get_doc(doctype=source_dt_name, link_field=target_doc.name).insert( + ignore_permissions=True + ) + + # Setup Role and Permissions + frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) + add_permission(source_dt_name, test_role, 0, ptype="read") + add_permission(target_dt_name, test_role, 0, ptype="read") + # Ensure no permlevel 1 read for test_role + update_permission_property(target_dt_name, test_role, 1, "read", 0, validate=False) + # Ensure System Manager can read permlevel 1 + add_permission(target_dt_name, "System Manager", 1, ptype="read") + test_user_doc.add_roles(test_role) + + # Test as the restricted user + frappe.set_user(test_user) + result_restricted = frappe.qb.get_query( + source_dt_name, + filters={"name": source_doc.name}, + fields=[ + "name", + "link_field.target_field as linked_secret", + "link_field.other_target_field as linked_public", + ], + ignore_permissions=False, + ).run(as_dict=True) + + self.assertEqual(len(result_restricted), 1) + self.assertIn( + "linked_public", + result_restricted[0], + "Permlevel 0 target field should be accessible via link.", + ) + self.assertNotIn( + "linked_secret", + result_restricted[0], + "Permlevel 1 target field should NOT be accessible via link for restricted user.", + ) + + # Test as Administrator (who has System Manager role) + frappe.set_user("Administrator") + result_admin = frappe.qb.get_query( + source_dt_name, + filters={"name": source_doc.name}, + fields=[ + "name", + "link_field.target_field as linked_secret", + "link_field.other_target_field as linked_public", + ], + ignore_permissions=False, # Still check permissions, but Admin has them + ).run(as_dict=True) + + self.assertEqual(len(result_admin), 1) + self.assertIn( + "linked_public", result_admin[0], "Permlevel 0 target field should be accessible for Admin." + ) + self.assertIn( + "linked_secret", result_admin[0], "Permlevel 1 target field should be accessible for Admin." + ) + self.assertEqual(result_admin[0].linked_secret, "Secret Data") + + # Cleanup + frappe.set_user("Administrator") + source_doc.delete(ignore_permissions=True) + target_doc.delete(ignore_permissions=True) + source_dt.delete() + target_dt.delete() + test_user_doc.remove_roles(test_role) + frappe.delete_doc("Role", test_role, force=True) + # This function is used as a permission query condition hook def test_permission_hook_condition(user):