From 591d9a353510f5e3f9a48cacf66377e958f459fa Mon Sep 17 00:00:00 2001 From: Sagar Vora <16315650+sagarvora@users.noreply.github.com> Date: Wed, 17 Dec 2025 19:03:15 +0530 Subject: [PATCH] fix: update logic for child tables --- frappe/database/query.py | 55 +++++++---- frappe/tests/test_query.py | 181 +++++++++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+), 18 deletions(-) diff --git a/frappe/database/query.py b/frappe/database/query.py index a2d54fd643..3ca6b55dcf 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -274,6 +274,10 @@ class Engine: if self.apply_permissions: self.check_read_permission() + self.permission_doctype = parent_doctype or self.doctype + self.permission_table = ( + qb.DocType(self.permission_doctype) if self.permission_doctype != self.doctype else self.table + ) is_select = False if update: @@ -1324,7 +1328,7 @@ class Engine: return allowed_fields def get_user_permission_conditions(self) -> list[Criterion]: - """Build conditions for user permissions and return tuple of (conditions, fetch_shared_docs)""" + """Build conditions for user permissions.""" conditions = [] if self.ignore_user_permissions: @@ -1353,25 +1357,25 @@ class Engine: elif df.get("fieldname") == "name" and self.reference_doctype: if permission.get("applicable_for") == self.reference_doctype: docs.append(permission.get("doc")) - elif permission.get("applicable_for") == self.doctype: + elif permission.get("applicable_for") == self.permission_doctype: docs.append(permission.get("doc")) if docs: field_name = df.get("fieldname") strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions") if strict_user_permissions: - conditions.append(self.table[field_name].isin(docs)) + conditions.append(self.permission_table[field_name].isin(docs)) else: - empty_value_condition = functions.IfNull(self.table[field_name], "") == "" - value_condition = self.table[field_name].isin(docs) + empty_value_condition = functions.IfNull(self.permission_table[field_name], "") == "" + value_condition = self.permission_table[field_name].isin(docs) conditions.append(empty_value_condition | value_condition) return conditions def get_doctype_link_fields(self): - meta = frappe.get_meta(self.doctype) + meta = frappe.get_meta(self.permission_doctype) # append current doctype with fieldname as 'name' as first link field - doctype_link_fields = [{"options": self.doctype, "fieldname": "name"}] + doctype_link_fields = [{"options": self.permission_doctype, "fieldname": "name"}] # append other link fields doctype_link_fields.extend(meta.get_link_fields()) return doctype_link_fields @@ -1384,30 +1388,41 @@ class Engine: - apply only share permissions If role permissions with read/select exist: - - apply (if owner constraints OR user permissions), AND + - apply (if_owner constraints OR user permissions), AND - apply permission query conditions - If if owner / user permission / permission query constraints are applied, + If if_owner / user permission / permission query constraints are applied, final condition = (existing conditions) OR (share condtion) (rationale: shared documents trump all other restrictions) Else, all documents are accessible based on role permissions. + + For child tables (when parent_doctype is specified): + - permissions are checked against the parent doctype + - a join to the parent table is added + - conditions reference the parent table's fields """ if not self.apply_permissions: return - role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user) + # For child tables, join to parent table so permission conditions can reference it + if self.permission_doctype != self.doctype: + self.query = self.query.inner_join(self.permission_table).on( + self.table.parent == self.permission_table.name + ) + + role_permissions = frappe.permissions.get_role_permissions(self.permission_doctype, user=self.user) has_role_permission = role_permissions.get("read") or role_permissions.get("select") if not has_role_permission: # no role permissions, apply only share permissions - shared_docs = frappe.share.get_shared(self.doctype, self.user) + shared_docs = frappe.share.get_shared(self.permission_doctype, self.user) if not shared_docs: # this should NEVER happen, but being defensive self._raise_permission_error() - self.query = self.query.where(self.table.name.isin(shared_docs)) + self.query = self.query.where(self.permission_table.name.isin(shared_docs)) return # build conditions from: if_owner constraint OR user permissions @@ -1415,7 +1430,7 @@ class Engine: if self.requires_owner_constraint(role_permissions): # skip user perm check if owner constraint is required - conditions.append(self.table.owner == self.user) + conditions.append(self.permission_table.owner == self.user) elif user_perm_conditions := self.get_user_permission_conditions(): conditions.extend(user_perm_conditions) @@ -1428,10 +1443,10 @@ class Engine: where_condition = Criterion.all(conditions) # since some conditions apply, we need to consider shared docs as well - shared_docs = frappe.share.get_shared(self.doctype, self.user) + shared_docs = frappe.share.get_shared(self.permission_doctype, self.user) if shared_docs: # shared docs trump all other restrictions - where_condition |= self.table.name.isin(shared_docs) + where_condition |= self.permission_table.name.isin(shared_docs) self.query = self.query.where(where_condition) @@ -1441,14 +1456,18 @@ class Engine: conditions = [] hooks = frappe.get_hooks("permission_query_conditions", {}) - condition_methods = hooks.get(self.doctype, []) + hooks.get("*", []) + condition_methods = hooks.get(self.permission_doctype, []) + hooks.get("*", []) for method in condition_methods: - if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.doctype): + if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.permission_doctype): conditions.append(RawCriterion(f"({c})")) # Get conditions from server scripts - if permission_script_name := get_server_script_map().get("permission_query", {}).get(self.doctype): + if ( + permission_script_name := get_server_script_map() + .get("permission_query", {}) + .get(self.permission_doctype) + ): script = frappe.get_doc("Server Script", permission_script_name) if condition := script.get_permission_query_conditions(self.user): conditions.append(RawCriterion(f"({condition})")) diff --git a/frappe/tests/test_query.py b/frappe/tests/test_query.py index e4d9150014..c6a3c7a9b3 100644 --- a/frappe/tests/test_query.py +++ b/frappe/tests/test_query.py @@ -1926,6 +1926,187 @@ class TestQuery(IntegrationTestCase): self.assertIn("00:00:00", sql) self.assertIn("23:59:59", sql) + def test_share_only_access(self): + """Test that shared docs grant access when user has no role permissions.""" + import frappe.share + + test_user = "test2@example.com" + + # Create a private event (only owner can see by default) + event = frappe.get_doc( + doctype="Event", + subject="Share Only Test Event", + starts_on="2025-01-01 10:00:00", + event_type="Private", + ).insert() + + # Verify user can't access without share + frappe.set_user(test_user) + result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() + self.assertEqual(len(result), 0, "User should not see event without share") + + # Share the document + frappe.set_user("Administrator") + frappe.share.add("Event", event.name, test_user) + + # Now user should be able to access via share + frappe.set_user(test_user) + result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() + self.assertEqual(len(result), 1, "User should see event via share") + + # Cleanup + frappe.set_user("Administrator") + event.delete() + + def test_if_owner_constraint_with_shared_docs(self): + """Test that shared docs trump if_owner constraint.""" + import frappe.share + from frappe.core.page.permission_manager.permission_manager import update + + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + test_user_doc.add_roles("Blogger") + + # Create blog post owned by Administrator + blog_post = frappe.get_doc( + doctype="Test Blog Post", + title="If Owner Test Post", + content="Test Content", + blog_category="_Test Blog Category", + ).insert(ignore_permissions=True, ignore_mandatory=True) + + # Enable if_owner constraint for Test Blog Post + update("Test Blog Post", "Blogger", 0, "if_owner", 1) + + # User shouldn't see it (not owner, if_owner enabled) + frappe.set_user(test_user) + result = frappe.qb.get_query( + "Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False + ).run() + self.assertEqual(len(result), 0, "User should not see post owned by others with if_owner") + + # Share the document + frappe.set_user("Administrator") + frappe.share.add("Test Blog Post", blog_post.name, test_user) + + # Now user should see it via share (shared docs trump if_owner) + frappe.set_user(test_user) + result = frappe.qb.get_query( + "Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False + ).run() + self.assertEqual(len(result), 1, "User should see post via share despite if_owner") + + # Cleanup + frappe.set_user("Administrator") + update("Test Blog Post", "Blogger", 0, "if_owner", 0) + blog_post.delete() + test_user_doc.remove_roles("Blogger") + + def test_user_permission_with_shared_docs(self): + """Test that shared docs grant access even when user permission doesn't match.""" + import frappe.share + from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype + + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + test_user_doc.add_roles("Blogger") + + # Create two blog posts + blog_post1 = frappe.get_doc( + doctype="Test Blog Post", + title="User Perm Test Post 1", + content="Test Content", + blog_category="_Test Blog Category", + ).insert(ignore_permissions=True, ignore_mandatory=True) + + blog_post2 = frappe.get_doc( + doctype="Test Blog Post", + title="User Perm Test Post 2", + content="Test Content", + blog_category="_Test Blog Category", + ).insert(ignore_permissions=True, ignore_mandatory=True) + + clear_user_permissions_for_doctype("Test Blog Post", test_user) + + # Add user permission for only post1 + add_user_permission("Test Blog Post", blog_post1.name, test_user, True) + + # User should see post1 via user permission + frappe.set_user(test_user) + result = frappe.qb.get_query( + "Test Blog Post", filters={"name": blog_post1.name}, ignore_permissions=False + ).run() + self.assertEqual(len(result), 1, "User should see post1 via user permission") + + # User should NOT see post2 (no user permission) + result = frappe.qb.get_query( + "Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False + ).run() + self.assertEqual(len(result), 0, "User should not see post2 without user permission") + + # Share post2 with user + frappe.set_user("Administrator") + frappe.share.add("Test Blog Post", blog_post2.name, test_user) + + # Now user should see post2 via share (shared docs trump user permissions) + frappe.set_user(test_user) + result = frappe.qb.get_query( + "Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False + ).run() + self.assertEqual(len(result), 1, "User should see post2 via share") + + # Cleanup + frappe.set_user("Administrator") + clear_user_permissions_for_doctype("Test Blog Post", test_user) + blog_post1.delete() + blog_post2.delete() + test_user_doc.remove_roles("Blogger") + + def test_role_permission_without_restrictions(self): + """Test that all documents are accessible when role permissions exist without if_owner/user_perms.""" + from frappe.core.page.permission_manager.permission_manager import update + from frappe.permissions import clear_user_permissions_for_doctype + + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + test_user_doc.add_roles("Blogger") + + # Clear any user permissions + clear_user_permissions_for_doctype("Test Blog Post", test_user) + + # Ensure if_owner is disabled + update("Test Blog Post", "Blogger", 0, "if_owner", 0) + + # Create blog posts owned by Administrator + blog_post1 = frappe.get_doc( + doctype="Test Blog Post", + title="No Restriction Test 1", + content="Test Content", + blog_category="_Test Blog Category", + ).insert(ignore_permissions=True, ignore_mandatory=True) + + blog_post2 = frappe.get_doc( + doctype="Test Blog Post", + title="No Restriction Test 2", + content="Test Content", + blog_category="_Test Blog Category", + ).insert(ignore_permissions=True, ignore_mandatory=True) + + # User should see both posts (no restrictions) + frappe.set_user(test_user) + result = frappe.qb.get_query( + "Test Blog Post", + filters={"name": ["in", [blog_post1.name, blog_post2.name]]}, + ignore_permissions=False, + ).run() + self.assertEqual(len(result), 2, "User should see all posts without restrictions") + + # Cleanup + frappe.set_user("Administrator") + blog_post1.delete() + blog_post2.delete() + test_user_doc.remove_roles("Blogger") + # This function is used as a permission query condition hook def test_permission_hook_condition(user):