fix(query): wrap CombinedRawCriterion in parentheses for correct precedence

Without proper grouping, OR conditions from shared docs could bypass WHERE filters:
  WHERE filter=X AND perm_cond OR shared_cond  -- shared_cond ignores filter!

With proper grouping:
  WHERE filter=X AND (perm_cond OR shared_cond)  -- correct behavior
This commit is contained in:
Sagar Vora 2025-12-17 22:28:52 +05:30
parent b76706ac28
commit d1ffd99fe8
2 changed files with 151 additions and 22 deletions

View file

@ -1917,7 +1917,8 @@ class CombinedRawCriterion(RawCriterion):
def get_sql(self, **kwargs: Any) -> str:
left_sql = self.left.get_sql(**kwargs) if hasattr(self.left, "get_sql") else str(self.left)
right_sql = self.right.get_sql(**kwargs) if hasattr(self.right, "get_sql") else str(self.right)
return f"({left_sql}) {self.operator} ({right_sql})"
# Wrap entire expression in parentheses to ensure correct operator precedence
return f"(({left_sql}) {self.operator} ({right_sql}))"
class SQLFunctionParser:

View file

@ -67,6 +67,17 @@ class TestQuery(IntegrationTestCase):
def setUp(self):
setup_for_tests()
def ensure_system_manager(self, user_doc, should_have: bool):
"""Ensure user has/doesn't have System Manager role, with cleanup to restore original state."""
had_role = "System Manager" in [r.role for r in user_doc.roles]
cleanup_func = user_doc.add_roles if had_role else user_doc.remove_roles
self.addCleanup(lambda: cleanup_func("System Manager"))
if should_have and not had_role:
user_doc.add_roles("System Manager")
elif not should_have and had_role:
user_doc.remove_roles("System Manager")
def test_multiple_tables_in_filters(self):
query = "SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` LIKE 'f%' AND `tabDocType`.`parent`='something'"
query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE")
@ -1940,6 +1951,9 @@ class TestQuery(IntegrationTestCase):
event_type="Private",
).insert()
self.addCleanup(event.delete)
self.addCleanup(lambda: frappe.set_user("Administrator"))
# Verify user can't access without share
frappe.set_user(test_user)
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
@ -1954,10 +1968,6 @@ class TestQuery(IntegrationTestCase):
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
self.assertEqual(len(result), 1, "User should see event via share")
# Cleanup
frappe.set_user("Administrator")
event.delete()
def test_if_owner_constraint_with_shared_docs(self):
"""Test that shared docs trump if_owner constraint."""
import frappe.share
@ -1978,6 +1988,11 @@ class TestQuery(IntegrationTestCase):
# Enable if_owner constraint for Test Blog Post
update("Test Blog Post", "Blogger", 0, "if_owner", 1)
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
self.addCleanup(blog_post.delete)
self.addCleanup(lambda: update("Test Blog Post", "Blogger", 0, "if_owner", 0))
self.addCleanup(lambda: frappe.set_user("Administrator"))
# User shouldn't see it (not owner, if_owner enabled)
frappe.set_user(test_user)
result = frappe.qb.get_query(
@ -1996,12 +2011,6 @@ class TestQuery(IntegrationTestCase):
).run()
self.assertEqual(len(result), 1, "User should see post via share despite if_owner")
# Cleanup
frappe.set_user("Administrator")
update("Test Blog Post", "Blogger", 0, "if_owner", 0)
blog_post.delete()
test_user_doc.remove_roles("Blogger")
def test_user_permission_with_shared_docs(self):
"""Test that shared docs grant access even when user permission doesn't match."""
import frappe.share
@ -2031,6 +2040,12 @@ class TestQuery(IntegrationTestCase):
# Add user permission for only post1
add_user_permission("Test Blog Post", blog_post1.name, test_user, True)
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
self.addCleanup(blog_post2.delete)
self.addCleanup(blog_post1.delete)
self.addCleanup(lambda: clear_user_permissions_for_doctype("Test Blog Post", test_user))
self.addCleanup(lambda: frappe.set_user("Administrator"))
# User should see post1 via user permission
frappe.set_user(test_user)
result = frappe.qb.get_query(
@ -2055,13 +2070,6 @@ class TestQuery(IntegrationTestCase):
).run()
self.assertEqual(len(result), 1, "User should see post2 via share")
# Cleanup
frappe.set_user("Administrator")
clear_user_permissions_for_doctype("Test Blog Post", test_user)
blog_post1.delete()
blog_post2.delete()
test_user_doc.remove_roles("Blogger")
def test_role_permission_without_restrictions(self):
"""Test that all documents are accessible when role permissions exist without if_owner/user_perms."""
from frappe.core.page.permission_manager.permission_manager import update
@ -2092,6 +2100,11 @@ class TestQuery(IntegrationTestCase):
blog_category="_Test Blog Category",
).insert(ignore_permissions=True, ignore_mandatory=True)
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
self.addCleanup(blog_post2.delete)
self.addCleanup(blog_post1.delete)
self.addCleanup(lambda: frappe.set_user("Administrator"))
# User should see both posts (no restrictions)
frappe.set_user(test_user)
result = frappe.qb.get_query(
@ -2101,11 +2114,126 @@ class TestQuery(IntegrationTestCase):
).run()
self.assertEqual(len(result), 2, "User should see all posts without restrictions")
# Cleanup
def test_child_table_permission_uses_parent_doctype(self):
"""Test that child table queries use parent doctype for permission checks."""
# DocField is a child table of DocType
# When querying with parent_doctype, permissions should be checked against DocType
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
self.ensure_system_manager(test_user_doc, should_have=False)
self.addCleanup(lambda: frappe.set_user("Administrator"))
frappe.set_user(test_user)
# Query child table with parent_doctype - should use DocType's permissions
with self.assertRaises(frappe.PermissionError):
frappe.qb.get_query(
"DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False
).run()
# Give user read access to DocType
frappe.set_user("Administrator")
blog_post1.delete()
blog_post2.delete()
test_user_doc.remove_roles("Blogger")
test_user_doc.add_roles("System Manager")
frappe.set_user(test_user)
# Now query should succeed
result = frappe.qb.get_query(
"DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False, limit=1
).run()
# Query should succeed and return results (tuple or list)
self.assertTrue(len(result) >= 0, "Query should succeed with proper permissions")
def test_child_table_filters_orphaned_rows(self):
"""Test that child table queries filter out orphaned rows (rows without valid parent)."""
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
self.ensure_system_manager(test_user_doc, should_have=True)
# Create a child table row with non-existent parent
frappe.db.sql(
"""
INSERT INTO `tabDefaultValue` (name, parent, parenttype, parentfield, defkey, defvalue)
VALUES ('_test_orphan_row', '_non_existent_parent', 'User', 'defaults', 'test_key', 'test_value')
"""
)
self.addCleanup(
lambda: frappe.db.sql("DELETE FROM `tabDefaultValue` WHERE name = '_test_orphan_row'")
)
self.addCleanup(lambda: frappe.set_user("Administrator"))
frappe.set_user(test_user)
# Query with parent_doctype - orphaned row should be filtered out by inner join
result = frappe.qb.get_query(
"DefaultValue",
fields=["name"],
filters={"name": "_test_orphan_row"},
parent_doctype="User",
ignore_permissions=False,
).run()
self.assertEqual(len(result), 0, "Orphaned child row should be filtered out")
def test_combined_raw_criterion_precedence(self):
"""Test that CombinedRawCriterion properly groups OR conditions.
When permission conditions (like permission_query_conditions) are combined with
shared docs via OR, the entire expression must be wrapped in parentheses to
ensure correct operator precedence with other WHERE filters.
Without proper grouping:
WHERE filter=X AND perm_cond OR shared_cond -- shared_cond ignores filter!
With proper grouping:
WHERE filter=X AND (perm_cond OR shared_cond) -- correct behavior
"""
from frappe.database.query import CombinedRawCriterion, RawCriterion
# Test that CombinedRawCriterion wraps the entire expression
left = RawCriterion("a = 1")
right = RawCriterion("b = 2")
combined = left | right
self.assertIsInstance(combined, CombinedRawCriterion)
sql = combined.get_sql()
# Should have outer parentheses: ((a = 1) OR (b = 2))
self.assertTrue(sql.startswith("(("), f"Should start with '((' but got: {sql}")
self.assertTrue(sql.endswith("))"), f"Should end with '))' but got: {sql}")
# Test nested combinations
third = RawCriterion("c = 3")
nested = combined & third
nested_sql = nested.get_sql()
# The AND combination should also be properly grouped
self.assertIn("OR", nested_sql)
self.assertIn("AND", nested_sql)
def test_permission_query_conditions_with_filter(self):
"""Test that filters work correctly when permission_query_conditions and shares exist.
This is a regression test for the CombinedRawCriterion fix - ensures that
explicit filters are not bypassed by shared doc conditions.
"""
test_user = "test2@example.com"
test_user_doc = frappe.get_doc("User", test_user)
self.ensure_system_manager(test_user_doc, should_have=True)
self.addCleanup(lambda: frappe.set_user("Administrator"))
frappe.set_user(test_user)
# User doctype has permission_query_conditions hook
# test2@example.com is shared their own User doc
# Query with a filter that should NOT match any row
result = frappe.qb.get_query(
"User",
fields=["name"],
filters={"name": "_non_existent_user_12345"},
ignore_permissions=False,
).run()
# Even though user has shared access to their own User doc,
# the filter should still apply and return no results
self.assertEqual(len(result), 0, "Filter should not be bypassed by shared doc OR condition")
# This function is used as a permission query condition hook