diff --git a/frappe/database/query.py b/frappe/database/query.py index d10d0ccd90..73ffc6a25d 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -1917,7 +1917,8 @@ class CombinedRawCriterion(RawCriterion): def get_sql(self, **kwargs: Any) -> str: left_sql = self.left.get_sql(**kwargs) if hasattr(self.left, "get_sql") else str(self.left) right_sql = self.right.get_sql(**kwargs) if hasattr(self.right, "get_sql") else str(self.right) - return f"({left_sql}) {self.operator} ({right_sql})" + # Wrap entire expression in parentheses to ensure correct operator precedence + return f"(({left_sql}) {self.operator} ({right_sql}))" class SQLFunctionParser: diff --git a/frappe/tests/test_query.py b/frappe/tests/test_query.py index c6a3c7a9b3..16bf27e293 100644 --- a/frappe/tests/test_query.py +++ b/frappe/tests/test_query.py @@ -67,6 +67,17 @@ class TestQuery(IntegrationTestCase): def setUp(self): setup_for_tests() + def ensure_system_manager(self, user_doc, should_have: bool): + """Ensure user has/doesn't have System Manager role, with cleanup to restore original state.""" + had_role = "System Manager" in [r.role for r in user_doc.roles] + cleanup_func = user_doc.add_roles if had_role else user_doc.remove_roles + self.addCleanup(lambda: cleanup_func("System Manager")) + + if should_have and not had_role: + user_doc.add_roles("System Manager") + elif not should_have and had_role: + user_doc.remove_roles("System Manager") + def test_multiple_tables_in_filters(self): query = "SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` LIKE 'f%' AND `tabDocType`.`parent`='something'" query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE") @@ -1940,6 +1951,9 @@ class TestQuery(IntegrationTestCase): event_type="Private", ).insert() + self.addCleanup(event.delete) + self.addCleanup(lambda: frappe.set_user("Administrator")) + # Verify user can't access without share frappe.set_user(test_user) result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() @@ -1954,10 +1968,6 @@ class TestQuery(IntegrationTestCase): result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() self.assertEqual(len(result), 1, "User should see event via share") - # Cleanup - frappe.set_user("Administrator") - event.delete() - def test_if_owner_constraint_with_shared_docs(self): """Test that shared docs trump if_owner constraint.""" import frappe.share @@ -1978,6 +1988,11 @@ class TestQuery(IntegrationTestCase): # Enable if_owner constraint for Test Blog Post update("Test Blog Post", "Blogger", 0, "if_owner", 1) + self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) + self.addCleanup(blog_post.delete) + self.addCleanup(lambda: update("Test Blog Post", "Blogger", 0, "if_owner", 0)) + self.addCleanup(lambda: frappe.set_user("Administrator")) + # User shouldn't see it (not owner, if_owner enabled) frappe.set_user(test_user) result = frappe.qb.get_query( @@ -1996,12 +2011,6 @@ class TestQuery(IntegrationTestCase): ).run() self.assertEqual(len(result), 1, "User should see post via share despite if_owner") - # Cleanup - frappe.set_user("Administrator") - update("Test Blog Post", "Blogger", 0, "if_owner", 0) - blog_post.delete() - test_user_doc.remove_roles("Blogger") - def test_user_permission_with_shared_docs(self): """Test that shared docs grant access even when user permission doesn't match.""" import frappe.share @@ -2031,6 +2040,12 @@ class TestQuery(IntegrationTestCase): # Add user permission for only post1 add_user_permission("Test Blog Post", blog_post1.name, test_user, True) + self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) + self.addCleanup(blog_post2.delete) + self.addCleanup(blog_post1.delete) + self.addCleanup(lambda: clear_user_permissions_for_doctype("Test Blog Post", test_user)) + self.addCleanup(lambda: frappe.set_user("Administrator")) + # User should see post1 via user permission frappe.set_user(test_user) result = frappe.qb.get_query( @@ -2055,13 +2070,6 @@ class TestQuery(IntegrationTestCase): ).run() self.assertEqual(len(result), 1, "User should see post2 via share") - # Cleanup - frappe.set_user("Administrator") - clear_user_permissions_for_doctype("Test Blog Post", test_user) - blog_post1.delete() - blog_post2.delete() - test_user_doc.remove_roles("Blogger") - def test_role_permission_without_restrictions(self): """Test that all documents are accessible when role permissions exist without if_owner/user_perms.""" from frappe.core.page.permission_manager.permission_manager import update @@ -2092,6 +2100,11 @@ class TestQuery(IntegrationTestCase): blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) + self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) + self.addCleanup(blog_post2.delete) + self.addCleanup(blog_post1.delete) + self.addCleanup(lambda: frappe.set_user("Administrator")) + # User should see both posts (no restrictions) frappe.set_user(test_user) result = frappe.qb.get_query( @@ -2101,11 +2114,126 @@ class TestQuery(IntegrationTestCase): ).run() self.assertEqual(len(result), 2, "User should see all posts without restrictions") - # Cleanup + def test_child_table_permission_uses_parent_doctype(self): + """Test that child table queries use parent doctype for permission checks.""" + # DocField is a child table of DocType + # When querying with parent_doctype, permissions should be checked against DocType + + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + self.ensure_system_manager(test_user_doc, should_have=False) + self.addCleanup(lambda: frappe.set_user("Administrator")) + + frappe.set_user(test_user) + + # Query child table with parent_doctype - should use DocType's permissions + with self.assertRaises(frappe.PermissionError): + frappe.qb.get_query( + "DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False + ).run() + + # Give user read access to DocType frappe.set_user("Administrator") - blog_post1.delete() - blog_post2.delete() - test_user_doc.remove_roles("Blogger") + test_user_doc.add_roles("System Manager") + + frappe.set_user(test_user) + # Now query should succeed + result = frappe.qb.get_query( + "DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False, limit=1 + ).run() + # Query should succeed and return results (tuple or list) + self.assertTrue(len(result) >= 0, "Query should succeed with proper permissions") + + def test_child_table_filters_orphaned_rows(self): + """Test that child table queries filter out orphaned rows (rows without valid parent).""" + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + self.ensure_system_manager(test_user_doc, should_have=True) + + # Create a child table row with non-existent parent + frappe.db.sql( + """ + INSERT INTO `tabDefaultValue` (name, parent, parenttype, parentfield, defkey, defvalue) + VALUES ('_test_orphan_row', '_non_existent_parent', 'User', 'defaults', 'test_key', 'test_value') + """ + ) + self.addCleanup( + lambda: frappe.db.sql("DELETE FROM `tabDefaultValue` WHERE name = '_test_orphan_row'") + ) + self.addCleanup(lambda: frappe.set_user("Administrator")) + + frappe.set_user(test_user) + + # Query with parent_doctype - orphaned row should be filtered out by inner join + result = frappe.qb.get_query( + "DefaultValue", + fields=["name"], + filters={"name": "_test_orphan_row"}, + parent_doctype="User", + ignore_permissions=False, + ).run() + self.assertEqual(len(result), 0, "Orphaned child row should be filtered out") + + def test_combined_raw_criterion_precedence(self): + """Test that CombinedRawCriterion properly groups OR conditions. + + When permission conditions (like permission_query_conditions) are combined with + shared docs via OR, the entire expression must be wrapped in parentheses to + ensure correct operator precedence with other WHERE filters. + + Without proper grouping: + WHERE filter=X AND perm_cond OR shared_cond -- shared_cond ignores filter! + + With proper grouping: + WHERE filter=X AND (perm_cond OR shared_cond) -- correct behavior + """ + from frappe.database.query import CombinedRawCriterion, RawCriterion + + # Test that CombinedRawCriterion wraps the entire expression + left = RawCriterion("a = 1") + right = RawCriterion("b = 2") + combined = left | right + + self.assertIsInstance(combined, CombinedRawCriterion) + sql = combined.get_sql() + # Should have outer parentheses: ((a = 1) OR (b = 2)) + self.assertTrue(sql.startswith("(("), f"Should start with '((' but got: {sql}") + self.assertTrue(sql.endswith("))"), f"Should end with '))' but got: {sql}") + + # Test nested combinations + third = RawCriterion("c = 3") + nested = combined & third + nested_sql = nested.get_sql() + # The AND combination should also be properly grouped + self.assertIn("OR", nested_sql) + self.assertIn("AND", nested_sql) + + def test_permission_query_conditions_with_filter(self): + """Test that filters work correctly when permission_query_conditions and shares exist. + + This is a regression test for the CombinedRawCriterion fix - ensures that + explicit filters are not bypassed by shared doc conditions. + """ + test_user = "test2@example.com" + test_user_doc = frappe.get_doc("User", test_user) + self.ensure_system_manager(test_user_doc, should_have=True) + self.addCleanup(lambda: frappe.set_user("Administrator")) + + frappe.set_user(test_user) + + # User doctype has permission_query_conditions hook + # test2@example.com is shared their own User doc + # Query with a filter that should NOT match any row + result = frappe.qb.get_query( + "User", + fields=["name"], + filters={"name": "_non_existent_user_12345"}, + ignore_permissions=False, + ).run() + + # Even though user has shared access to their own User doc, + # the filter should still apply and return no results + self.assertEqual(len(result), 0, "Filter should not be bypassed by shared doc OR condition") # This function is used as a permission query condition hook