fix: update logic for child tables

This commit is contained in:
Sagar Vora 2025-12-17 19:03:15 +05:30
parent 2b822cc63f
commit 591d9a3535
2 changed files with 218 additions and 18 deletions

View file

@ -274,6 +274,10 @@ class Engine:
if self.apply_permissions:
self.check_read_permission()
self.permission_doctype = parent_doctype or self.doctype
self.permission_table = (
qb.DocType(self.permission_doctype) if self.permission_doctype != self.doctype else self.table
)
is_select = False
if update:
@ -1324,7 +1328,7 @@ class Engine:
return allowed_fields
def get_user_permission_conditions(self) -> list[Criterion]:
"""Build conditions for user permissions and return tuple of (conditions, fetch_shared_docs)"""
"""Build conditions for user permissions."""
conditions = []
if self.ignore_user_permissions:
@ -1353,25 +1357,25 @@ class Engine:
elif df.get("fieldname") == "name" and self.reference_doctype:
if permission.get("applicable_for") == self.reference_doctype:
docs.append(permission.get("doc"))
elif permission.get("applicable_for") == self.doctype:
elif permission.get("applicable_for") == self.permission_doctype:
docs.append(permission.get("doc"))
if docs:
field_name = df.get("fieldname")
strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions")
if strict_user_permissions:
conditions.append(self.table[field_name].isin(docs))
conditions.append(self.permission_table[field_name].isin(docs))
else:
empty_value_condition = functions.IfNull(self.table[field_name], "") == ""
value_condition = self.table[field_name].isin(docs)
empty_value_condition = functions.IfNull(self.permission_table[field_name], "") == ""
value_condition = self.permission_table[field_name].isin(docs)
conditions.append(empty_value_condition | value_condition)
return conditions
def get_doctype_link_fields(self):
meta = frappe.get_meta(self.doctype)
meta = frappe.get_meta(self.permission_doctype)
# append current doctype with fieldname as 'name' as first link field
doctype_link_fields = [{"options": self.doctype, "fieldname": "name"}]
doctype_link_fields = [{"options": self.permission_doctype, "fieldname": "name"}]
# append other link fields
doctype_link_fields.extend(meta.get_link_fields())
return doctype_link_fields
@ -1384,30 +1388,41 @@ class Engine:
- apply only share permissions
If role permissions with read/select exist:
- apply (if owner constraints OR user permissions), AND
- apply (if_owner constraints OR user permissions), AND
- apply permission query conditions
If if owner / user permission / permission query constraints are applied,
If if_owner / user permission / permission query constraints are applied,
final condition = (existing conditions) OR (share condtion)
(rationale: shared documents trump all other restrictions)
Else, all documents are accessible based on role permissions.
For child tables (when parent_doctype is specified):
- permissions are checked against the parent doctype
- a join to the parent table is added
- conditions reference the parent table's fields
"""
if not self.apply_permissions:
return
role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user)
# For child tables, join to parent table so permission conditions can reference it
if self.permission_doctype != self.doctype:
self.query = self.query.inner_join(self.permission_table).on(
self.table.parent == self.permission_table.name
)
role_permissions = frappe.permissions.get_role_permissions(self.permission_doctype, user=self.user)
has_role_permission = role_permissions.get("read") or role_permissions.get("select")
if not has_role_permission:
# no role permissions, apply only share permissions
shared_docs = frappe.share.get_shared(self.doctype, self.user)
shared_docs = frappe.share.get_shared(self.permission_doctype, self.user)
if not shared_docs:
# this should NEVER happen, but being defensive
self._raise_permission_error()
self.query = self.query.where(self.table.name.isin(shared_docs))
self.query = self.query.where(self.permission_table.name.isin(shared_docs))
return
# build conditions from: if_owner constraint OR user permissions
@ -1415,7 +1430,7 @@ class Engine:
if self.requires_owner_constraint(role_permissions):
# skip user perm check if owner constraint is required
conditions.append(self.table.owner == self.user)
conditions.append(self.permission_table.owner == self.user)
elif user_perm_conditions := self.get_user_permission_conditions():
conditions.extend(user_perm_conditions)
@ -1428,10 +1443,10 @@ class Engine:
where_condition = Criterion.all(conditions)
# since some conditions apply, we need to consider shared docs as well
shared_docs = frappe.share.get_shared(self.doctype, self.user)
shared_docs = frappe.share.get_shared(self.permission_doctype, self.user)
if shared_docs:
# shared docs trump all other restrictions
where_condition |= self.table.name.isin(shared_docs)
where_condition |= self.permission_table.name.isin(shared_docs)
self.query = self.query.where(where_condition)
@ -1441,14 +1456,18 @@ class Engine:
conditions = []
hooks = frappe.get_hooks("permission_query_conditions", {})
condition_methods = hooks.get(self.doctype, []) + hooks.get("*", [])
condition_methods = hooks.get(self.permission_doctype, []) + hooks.get("*", [])
for method in condition_methods:
if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.doctype):
if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.permission_doctype):
conditions.append(RawCriterion(f"({c})"))
# Get conditions from server scripts
if permission_script_name := get_server_script_map().get("permission_query", {}).get(self.doctype):
if (
permission_script_name := get_server_script_map()
.get("permission_query", {})
.get(self.permission_doctype)
):
script = frappe.get_doc("Server Script", permission_script_name)
if condition := script.get_permission_query_conditions(self.user):
conditions.append(RawCriterion(f"({condition})"))

View file

@ -1926,6 +1926,187 @@ class TestQuery(IntegrationTestCase):
self.assertIn("00:00:00", sql)
self.assertIn("23:59:59", sql)
def test_share_only_access(self):
"""Test that shared docs grant access when user has no role permissions."""
import frappe.share
test_user = "test2@example.com"
# Create a private event (only owner can see by default)
event = frappe.get_doc(
doctype="Event",
subject="Share Only Test Event",
starts_on="2025-01-01 10:00:00",
event_type="Private",
).insert()
# Verify user can't access without share
frappe.set_user(test_user)
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
self.assertEqual(len(result), 0, "User should not see event without share")
# Share the document
frappe.set_user("Administrator")
frappe.share.add("Event", event.name, test_user)
# Now user should be able to access via share
frappe.set_user(test_user)
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
self.assertEqual(len(result), 1, "User should see event via share")
# Cleanup
frappe.set_user("Administrator")
event.delete()
def test_if_owner_constraint_with_shared_docs(self):
"""Test that shared docs trump if_owner constraint."""
import frappe.share
from frappe.core.page.permission_manager.permission_manager import update
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
test_user_doc.add_roles("Blogger")
# Create blog post owned by Administrator
blog_post = frappe.get_doc(
doctype="Test Blog Post",
title="If Owner Test Post",
content="Test Content",
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
# Enable if_owner constraint for Test Blog Post
update("Test Blog Post", "Blogger", 0, "if_owner", 1)
# User shouldn't see it (not owner, if_owner enabled)
frappe.set_user(test_user)
result = frappe.qb.get_query(
"Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False
).run()
self.assertEqual(len(result), 0, "User should not see post owned by others with if_owner")
# Share the document
frappe.set_user("Administrator")
frappe.share.add("Test Blog Post", blog_post.name, test_user)
# Now user should see it via share (shared docs trump if_owner)
frappe.set_user(test_user)
result = frappe.qb.get_query(
"Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False
).run()
self.assertEqual(len(result), 1, "User should see post via share despite if_owner")
# Cleanup
frappe.set_user("Administrator")
update("Test Blog Post", "Blogger", 0, "if_owner", 0)
blog_post.delete()
test_user_doc.remove_roles("Blogger")
def test_user_permission_with_shared_docs(self):
"""Test that shared docs grant access even when user permission doesn't match."""
import frappe.share
from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
test_user_doc.add_roles("Blogger")
# Create two blog posts
blog_post1 = frappe.get_doc(
doctype="Test Blog Post",
title="User Perm Test Post 1",
content="Test Content",
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
blog_post2 = frappe.get_doc(
doctype="Test Blog Post",
title="User Perm Test Post 2",
content="Test Content",
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
clear_user_permissions_for_doctype("Test Blog Post", test_user)
# Add user permission for only post1
add_user_permission("Test Blog Post", blog_post1.name, test_user, True)
# User should see post1 via user permission
frappe.set_user(test_user)
result = frappe.qb.get_query(
"Test Blog Post", filters={"name": blog_post1.name}, ignore_permissions=False
).run()
self.assertEqual(len(result), 1, "User should see post1 via user permission")
# User should NOT see post2 (no user permission)
result = frappe.qb.get_query(
"Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False
).run()
self.assertEqual(len(result), 0, "User should not see post2 without user permission")
# Share post2 with user
frappe.set_user("Administrator")
frappe.share.add("Test Blog Post", blog_post2.name, test_user)
# Now user should see post2 via share (shared docs trump user permissions)
frappe.set_user(test_user)
result = frappe.qb.get_query(
"Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False
).run()
self.assertEqual(len(result), 1, "User should see post2 via share")
# Cleanup
frappe.set_user("Administrator")
clear_user_permissions_for_doctype("Test Blog Post", test_user)
blog_post1.delete()
blog_post2.delete()
test_user_doc.remove_roles("Blogger")
def test_role_permission_without_restrictions(self):
"""Test that all documents are accessible when role permissions exist without if_owner/user_perms."""
from frappe.core.page.permission_manager.permission_manager import update
from frappe.permissions import clear_user_permissions_for_doctype
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
test_user_doc.add_roles("Blogger")
# Clear any user permissions
clear_user_permissions_for_doctype("Test Blog Post", test_user)
# Ensure if_owner is disabled
update("Test Blog Post", "Blogger", 0, "if_owner", 0)
# Create blog posts owned by Administrator
blog_post1 = frappe.get_doc(
doctype="Test Blog Post",
title="No Restriction Test 1",
content="Test Content",
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
blog_post2 = frappe.get_doc(
doctype="Test Blog Post",
title="No Restriction Test 2",
content="Test Content",
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
# User should see both posts (no restrictions)
frappe.set_user(test_user)
result = frappe.qb.get_query(
"Test Blog Post",
filters={"name": ["in", [blog_post1.name, blog_post2.name]]},
ignore_permissions=False,
).run()
self.assertEqual(len(result), 2, "User should see all posts without restrictions")
# Cleanup
frappe.set_user("Administrator")
blog_post1.delete()
blog_post2.delete()
test_user_doc.remove_roles("Blogger")
# This function is used as a permission query condition hook
def test_permission_hook_condition(user):