import itertools import frappe from frappe.core.doctype.doctype.test_doctype import new_doctype from frappe.permissions import add_permission, update_permission_property from frappe.query_builder import Field from frappe.query_builder.functions import Abs, Count, Ifnull, Max, Now, Timestamp from frappe.tests import IntegrationTestCase from frappe.tests.classes.context_managers import enable_safe_exec from frappe.tests.test_db_query import ( create_nested_doctype, create_nested_doctype_records, setup_patched_blog_post, setup_test_user, ) from frappe.tests.test_helpers import setup_for_tests from frappe.tests.test_query_builder import db_type_is, run_only_if from frappe.utils.nestedset import get_ancestors_of, get_descendants_of EXTRA_TEST_RECORD_DEPENDENCIES = ["User"] def create_tree_docs(): records = [ { "some_fieldname": "Root Node", "parent_test_tree_doctype": None, "is_group": 1, }, { "some_fieldname": "Parent 1", "parent_test_tree_doctype": "Root Node", "is_group": 1, }, { "some_fieldname": "Parent 2", "parent_test_tree_doctype": "Root Node", "is_group": 1, }, { "some_fieldname": "Child 1", "parent_test_tree_doctype": "Parent 1", "is_group": 0, }, { "some_fieldname": "Child 2", "parent_test_tree_doctype": "Parent 1", "is_group": 0, }, { "some_fieldname": "Child 3", "parent_test_tree_doctype": "Parent 2", "is_group": 0, }, ] tree_doctype = new_doctype("Test Tree DocType", is_tree=True, autoname="field:some_fieldname") tree_doctype.insert() for record in records: d = frappe.new_doc("Test Tree DocType") d.update(record) d.insert() class TestQuery(IntegrationTestCase): def setUp(self): setup_for_tests() def ensure_system_manager(self, user_doc, should_have: bool): """Ensure user has/doesn't have System Manager role, with cleanup to restore original state.""" had_role = "System Manager" in [r.role for r in user_doc.roles] cleanup_func = user_doc.add_roles if had_role else user_doc.remove_roles self.addCleanup(lambda: cleanup_func("System Manager")) if should_have and not had_role: user_doc.add_roles("System Manager") elif not should_have and had_role: user_doc.remove_roles("System Manager") def test_multiple_tables_in_filters(self): query = "SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` LIKE 'f%' AND `tabDocType`.`parent`='something'" query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE") self.assertQueryEqual( frappe.qb.get_query( "DocType", ["*"], [ ["DocField", "name", "like", "f%"], ["DocType", "parent", "=", "something"], ], ).get_sql(), query, ) def test_string_fields(self): self.assertEqual( frappe.qb.get_query("User", fields="name, email", filters={"name": "Administrator"}).get_sql(), frappe.qb.from_("User") .select(Field("name"), Field("email")) .where(Field("name") == "Administrator") .get_sql(), ) self.assertEqual( frappe.qb.get_query( "User", fields=["`name`, `email`"], filters={"name": "Administrator"} ).get_sql(), frappe.qb.from_("User") .select(Field("name"), Field("email")) .where(Field("name") == "Administrator") .get_sql(), ) self.assertEqual( frappe.qb.get_query( "User", fields=["`tabUser`.`name`", "`tabUser`.`email`"], filters={"name": "Administrator"} ).run(), frappe.qb.from_("User") .select(Field("name"), Field("email")) .where(Field("name") == "Administrator") .run(), ) self.assertEqual( frappe.qb.get_query( "User", fields=["`tabUser`.`name` as owner", "`tabUser`.`email`"], filters={"name": "Administrator"}, ).run(as_dict=1), frappe.qb.from_("User") .select(Field("name").as_("owner"), Field("email")) .where(Field("name") == "Administrator") .run(as_dict=1), ) self.assertEqual( frappe.qb.get_query("User", fields=[Count("*")]).get_sql(), frappe.qb.from_("User").select(Count("*")).get_sql(), ) def test_qb_fields(self): user_doctype = frappe.qb.DocType("User") self.assertEqual( frappe.qb.get_query( user_doctype, fields=[user_doctype.name, user_doctype.email], filters={} ).get_sql(), frappe.qb.from_(user_doctype).select(user_doctype.name, user_doctype.email).get_sql(), ) self.assertEqual( frappe.qb.get_query(user_doctype, fields=user_doctype.email, filters={}).get_sql(), frappe.qb.from_(user_doctype).select(user_doctype.email).get_sql(), ) def test_field_validation_select(self): """Test validation for fields in SELECT clause.""" valid_fields = [ "name", "`name`", "tabUser.name", "`tabUser`.`name`", "name as alias", "`name` as alias", "tabUser.name as alias", "`tabUser`.`name` as alias", "*", "`tabHas Role`.`name`", "field as `alias with space`", ] invalid_fields = [ "name; DROP TABLE users", "`name` ; SELECT * FROM secrets", "name--comment", "name /* comment */", "name AS alias; --", "invalid-field-name", "table.invalid-field", "`table`.`invalid-field`", "field with space", "`field with space`", "field as alias with space", "COUNT(*)", "COUNT(name)", "SUM(amount) as total", "COUNT(name) as alias; SELECT 1", "COUNT(name;)", "`name", "name`", "`tabUser.name`", "tabUser.`name", "tabUser`.`name`", "tab`User.name", ] for field in valid_fields: try: frappe.qb.get_query("User", fields=field).get_sql() # Test as list item too frappe.qb.get_query("User", fields=[field]).get_sql() except Exception as e: self.fail(f"Valid SELECT field '{field}' failed validation: {e}") for field in invalid_fields: with self.assertRaises( (frappe.PermissionError, frappe.ValidationError), msg=f"Invalid SELECT field '[{field}]' passed validation", ): frappe.qb.get_query("User", fields=[field]).get_sql() def test_field_validation_filters(self): """Test validation for fields used in filters (WHERE clause).""" valid_fields = ["name", "creation", "language.name", "`tabUser`.`name`"] # Filters should not allow aliases or functions directly as field names invalid_fields = [ "tabUser.name", "name as alias", "`name` as alias", "tabUser.name as alias", "`tabUser`.`name` as alias", "COUNT(*)", "COUNT(name)", "name; DROP TABLE users", "`name` ; SELECT * FROM secrets", "name--comment", "name /* comment */", "invalid-field-name", "table.invalid-field", "`table`.`invalid-field`", "field with space", "`field with space`", "`name`", "`name", "name`", "tabUser.`name`", "`tabUser.name`", ] for field in valid_fields: try: # Test in dict filter frappe.qb.get_query("User", filters={field: "value"}).get_sql() # Test in list filter frappe.qb.get_query("User", filters=[[field, "=", "value"]]).get_sql() # Test in list filter with doctype frappe.qb.get_query("User", filters=[["User", field, "=", "value"]]).get_sql() except Exception as e: self.fail(f"Valid filter field '{field}' failed validation: {e}") for field in invalid_fields: with self.assertRaises( frappe.ValidationError, msg=f"Invalid filter field '{{{field}: val}}' passed validation" ): frappe.qb.get_query("User", filters={field: "value"}).get_sql() def test_field_validation_group_by(self): """Test validation for fields in GROUP BY clause.""" valid_fields = [ "name", "1", # Allow numeric indices "name, email", "1, 2", "`tabUser`.`name`", ] # GROUP BY should not allow aliases or functions invalid_fields = [ "name as alias", "COUNT(*)", "COUNT(name)", "name; DROP TABLE users", "`name` ; SELECT * FROM secrets", "name--comment", "name /* comment */", "invalid-field-name", "table.invalid-field", "tabUser.name", "`name`", "`name`, `tabUser`.`email`", "`table`.`invalid-field`", "field with space", "`field with space`", "name, email; SELECT 1", ] for group_by_str in valid_fields: try: frappe.qb.get_query("User", group_by=group_by_str).get_sql() except Exception as e: self.fail(f"Valid GROUP BY string '{group_by_str}' failed validation: {e}") for group_by_str in invalid_fields: with self.assertRaises( (frappe.PermissionError, frappe.ValidationError), msg=f"Invalid GROUP BY string '{group_by_str}' passed validation", ): frappe.qb.get_query("User", group_by=group_by_str).get_sql() def test_field_validation_order_by(self): """Test validation for fields in ORDER BY clause.""" valid_fields = [ "name", "1", # Allow numeric indices "name asc", "1 asc", "2 DESC", "name, email", "1 asc, 2 desc", "`tabUser`.`name`", "`tabUser`.`name` desc", ] # ORDER BY should not allow aliases or functions, or invalid directions invalid_fields = [ "name as alias", "COUNT(*)", "COUNT(name)", "name; DROP TABLE users", "`name` ; SELECT * FROM secrets", "name--comment", "name /* comment */", "`name`", "tabUser.name", "`name` DESC", "tabUser.name Asc", "`name` asc, `tabUser`.`email` DESC", "invalid-field-name", "table.invalid-field", "`table`.`invalid-field`", "field with space", "`field with space`", "name sideways", "name ASC;", "name, email; SELECT 1", "name INVALID_DIRECTION", ] for order_by_str in valid_fields: try: frappe.qb.get_query("User", order_by=order_by_str).get_sql() except Exception as e: self.fail(f"Valid ORDER BY string '{order_by_str}' failed validation: {e}") for order_by_str in invalid_fields: with self.assertRaises( (frappe.PermissionError, ValueError, frappe.ValidationError), msg=f"Invalid ORDER BY string '{order_by_str}' passed validation", ): frappe.qb.get_query("User", order_by=order_by_str).get_sql() def test_aliasing(self): user_doctype = frappe.qb.DocType("User") self.assertEqual( frappe.qb.get_query("User", fields=["name as owner", "email as id"], filters={}).get_sql(), frappe.qb.from_(user_doctype) .select(user_doctype.name.as_("owner"), user_doctype.email.as_("id")) .get_sql(), ) self.assertEqual( frappe.qb.get_query(user_doctype, fields="name as owner, email as id", filters={}).get_sql(), frappe.qb.from_(user_doctype) .select(user_doctype.name.as_("owner"), user_doctype.email.as_("id")) .get_sql(), ) def test_filters(self): self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"module.app_name": "frappe"}, ).get_sql(), "SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabModule Def` ON `tabModule Def`.`name`=`tabDocType`.`module` WHERE `tabModule Def`.`app_name`='frappe'", ) query = "SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabModule Def` ON `tabModule Def`.`name`=`tabDocType`.`module` WHERE `tabModule Def`.`app_name` LIKE 'frap%'" query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE") self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"module.app_name": ("like", "frap%")}, ).get_sql(), query, ) self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"permissions.role": "System Manager"}, ).get_sql(), "SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabDocPerm` ON `tabDocPerm`.`parent`=`tabDocType`.`name` AND `tabDocPerm`.`parenttype`='DocType' AND `tabDocPerm`.`parentfield`='permissions' WHERE `tabDocPerm`.`role`='System Manager'", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["module"], filters="", ).get_sql(), "SELECT `module` FROM `tabDocType` WHERE `name`=''", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", filters=["ToDo", "Note"], ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name` IN ('ToDo','Note')", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", filters={"name": ("in", [])}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name` IN ('')", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", filters=[1, 2, 3], ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name` IN (1,2,3)", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", filters=[], ).get_sql(), "SELECT `name` FROM `tabDocType`", ) def test_or_filters(self): """Test OR filter conditions.""" # Test 1: Basic dict or_filters self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters={"name": "User", "module": "Core"}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'", ) # Test 2: List format or_filters self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters=[["name", "=", "User"], ["module", "=", "Core"]], ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'", ) # Test 3: OR filters with operators query = "SELECT `name` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `module` IN ('Core','Custom')" query = query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE") self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters={"name": ("like", "User%"), "module": ("in", ["Core", "Custom"])}, ).get_sql(), query, ) # Test 4: Combining filters (AND) with or_filters (OR) self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"issingle": 0}, or_filters={"name": "User", "module": "Core"}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND (`name`='User' OR `module`='Core')", ) # Test 5: Multiple AND filters with OR filters self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"issingle": 0, "custom": 0}, or_filters={"name": "User", "module": "Core"}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND `custom`=0 AND (`name`='User' OR `module`='Core')", ) # Test 6: OR filters with simple list (name IN) self.assertQueryEqual( frappe.qb.get_query( "DocType", or_filters=["User", "Role", "Note"], ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name` IN ('User','Role','Note')", ) # Test 7: OR filters with greater than and less than self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters={"idx": (">", 5), "issingle": ("=", 1)}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `idx`>5 OR `issingle`=1", ) # Test 8: OR filters with list including doctype self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters=[["DocType", "name", "=", "User"], ["DocType", "name", "=", "Role"]], ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `name`='Role'", ) # Test 9: OR filters with != operator self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters={"name": ("!=", "User"), "module": ("!=", "Core")}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name`<>'User' OR `module`<>'Core'", ) # Test 10: Empty or_filters should return query without OR conditions self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], filters={"custom": 0}, or_filters={}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `custom`=0", ) # Test 11: OR filters with not in operator self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name"], or_filters={"name": ("not in", ["User", "Role"]), "module": ("=", "Core")}, ).get_sql(), "SELECT `name` FROM `tabDocType` WHERE `name` NOT IN ('User','Role') OR `module`='Core'", ) # Test 12: OR filters with mixed field types query = ( "SELECT `name`,`module` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `issingle`=1 OR `custom`=0" ) query = query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE") self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name", "module"], or_filters=[ ["name", "like", "User%"], ["issingle", "=", 1], ["custom", "=", 0], ], ).get_sql(), query, ) def test_nested_filters(self): """Test nested filter conditions with AND/OR logic.""" User = frappe.qb.DocType("User") # Simple AND filters_and = [ ["email", "=", "admin@example.com"], "and", ["first_name", "=", "Admin"], ] expected_sql_and = ( frappe.qb.from_(User) .select(User.name) .where((User.email == "admin@example.com") & (User.first_name == "Admin")) .get_sql() ) self.assertEqual(frappe.qb.get_query("User", filters=filters_and).get_sql(), expected_sql_and) # Simple OR filters_or = [ ["email", "=", "admin@example.com"], "or", ["email", "=", "guest@example.com"], ] expected_sql_or = ( frappe.qb.from_(User) .select(User.name) .where((User.email == "admin@example.com") | (User.email == "guest@example.com")) .get_sql() ) self.assertEqual(frappe.qb.get_query("User", filters=filters_or).get_sql(), expected_sql_or) # Mixed AND/OR filters_mixed = [ ["first_name", "=", "Admin"], "and", [["email", "=", "admin@example.com"], "or", ["email", "=", "guest@example.com"]], ] expected_sql_mixed = ( frappe.qb.from_(User) .select(User.name) .where( (User.first_name == "Admin") & ((User.email == "admin@example.com") | (User.email == "guest@example.com")) ) .get_sql() ) self.assertEqual(frappe.qb.get_query("User", filters=filters_mixed).get_sql(), expected_sql_mixed) # Nested AND/OR filters_nested = [ [["first_name", "=", "Admin"], "and", ["enabled", "=", 1]], "or", [["first_name", "=", "Guest"], "and", ["enabled", "=", 0]], ] expected_sql_nested = ( frappe.qb.from_(User) .select(User.name) .where( ((User.first_name == "Admin") & (User.enabled == 1)) | ((User.first_name == "Guest") & (User.enabled == 0)) ) .get_sql() ) self.assertEqual(frappe.qb.get_query("User", filters=filters_nested).get_sql(), expected_sql_nested) # Single Grouped Condition (wrapped in extra list) filters_single_group = [[["first_name", "=", "Admin"], "and", ["enabled", "=", 1]]] expected_sql_single_group = ( frappe.qb.from_(User) .select(User.name) .where((User.first_name == "Admin") & (User.enabled == 1)) .get_sql() ) self.assertEqual( frappe.qb.get_query("User", filters=filters_single_group).get_sql(), expected_sql_single_group ) # Test with different operators and values filters_complex = [ ["creation", ">", "2023-01-01"], "and", [ ["email", "like", "%@example.com"], "or", [["first_name", "in", ["Admin", "Guest"]], "and", ["enabled", "!=", 1]], ], ] expected_sql_complex = ( frappe.qb.from_(User) .select(User.name) .where( (User.creation > "2023-01-01") & ( ( User.email.ilike("%@example.com") if frappe.db.db_type == "postgres" else User.email.like("%@example.com") ) | ((User.first_name.isin(["Admin", "Guest"])) & (User.enabled != 1)) ) ) .get_sql() ) self.assertEqual(frappe.qb.get_query("User", filters=filters_complex).get_sql(), expected_sql_complex) def test_invalid_nested_filters(self): """Test invalid formats for nested filters.""" # Invalid operator with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", filters=[["email", "=", "a"], "xor", ["email", "=", "b"]]).get_sql() self.assertIn("Expected 'and' or 'or' operator", str(cm.exception)) # Missing condition after operator with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", filters=[["email", "=", "a"], "and"]).get_sql() self.assertIn("Filter condition missing after operator", str(cm.exception)) # Starting with operator with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", filters=["and", ["email", "=", "a"]]).get_sql() self.assertIn("Invalid start for filter condition", str(cm.exception)) # Invalid condition type (string instead of list/tuple) with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", filters=[["email", "=", "a"], "and", "enabled = 1"]).get_sql() self.assertIn("Invalid filter condition", str(cm.exception)) # Malformed simple filter inside nested with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query( "User", filters=[["email", "=", "a", "extra"], "and", ["enabled", "=", 1]] ).get_sql() self.assertIn("Invalid simple filter format", str(cm.exception)) # Nested list doesn't start with a condition list/tuple with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", filters=["email", "and", ["enabled", "=", 1]]).get_sql() self.assertIn("Invalid start for filter condition", str(cm.exception)) def test_implicit_join_query(self): self.maxDiff = None self.assertQueryEqual( frappe.qb.get_query( "Note", filters={"name": "Test Note Title"}, fields=["name", "`tabNote Seen By`.`user` as seen_by"], ).get_sql(), "SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' WHERE `tabNote`.`name`='Test Note Title'", ) # output doesn't contain parentfield condition because it can't be inferred self.assertQueryEqual( frappe.qb.get_query( "Note", filters={"name": "Test Note Title"}, fields=["name", "`tabNote Seen By`.`user` as seen_by", "`tabNote Seen By`.`idx` as idx"], ).get_sql(), "SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by`,`tabNote Seen By`.`idx` `idx` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' WHERE `tabNote`.`name`='Test Note Title'", ) # output contains parentfield condition because it can be inferred by "seen_by.user" self.assertQueryEqual( frappe.qb.get_query( "Note", filters={"name": "Test Note Title"}, fields=["name", "seen_by.user as seen_by", "`tabNote Seen By`.`idx` as idx"], ).get_sql(), "SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by`,`tabNote Seen By`.`idx` `idx` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' AND `tabNote Seen By`.`parentfield`='seen_by' WHERE `tabNote`.`name`='Test Note Title'", ) self.assertQueryEqual( frappe.qb.get_query( "DocType", fields=["name", "module.app_name as app_name"], ).get_sql(), "SELECT `tabDocType`.`name`,`tabModule Def`.`app_name` `app_name` FROM `tabDocType` LEFT JOIN `tabModule Def` ON `tabModule Def`.`name`=`tabDocType`.`module`", ) # fields now has strict validation, so this test is not valid anymore # @run_only_if(db_type_is.MARIADB) # def test_comment_stripping(self): # self.assertNotIn( # "email", frappe.qb.get_query("User", fields=["name", "#email"], filters={}).get_sql() # ) def test_nestedset(self): frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'") frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") create_tree_docs() descendants_result = frappe.qb.get_query( "Test Tree DocType", fields=["name"], filters={"name": ("descendants of", "Parent 1")}, order_by="creation desc", ).run(as_list=1) # Format decendants result descendants_result = list(itertools.chain.from_iterable(descendants_result)) self.assertListEqual(descendants_result, get_descendants_of("Test Tree DocType", "Parent 1")) ancestors_result = frappe.qb.get_query( "Test Tree DocType", fields=["name"], filters={"name": ("ancestors of", "Child 2")}, order_by="creation desc", ).run(as_list=1) # Format ancestors result ancestors_result = list(itertools.chain.from_iterable(ancestors_result)) self.assertListEqual(ancestors_result, get_ancestors_of("Test Tree DocType", "Child 2")) not_descendants_result = frappe.qb.get_query( "Test Tree DocType", fields=["name"], filters={"name": ("not descendants of", "Parent 1")}, order_by="creation desc", ).run(as_dict=1) self.assertListEqual( not_descendants_result, frappe.db.get_all( "Test Tree DocType", fields=["name"], filters={"name": ("not descendants of", "Parent 1")}, ), ) not_ancestors_result = frappe.qb.get_query( "Test Tree DocType", fields=["name"], filters={"name": ("not ancestors of", "Child 2")}, order_by="creation desc", ).run(as_dict=1) self.assertListEqual( not_ancestors_result, frappe.db.get_all( "Test Tree DocType", fields=["name"], filters={"name": ("not ancestors of", "Child 2")}, ), ) frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'") frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") def test_child_field_syntax(self): note1 = frappe.get_doc(doctype="Note", title="Note 1", seen_by=[{"user": "Administrator"}]).insert() note2 = frappe.get_doc( doctype="Note", title="Note 2", seen_by=[{"user": "Administrator"}, {"user": "Guest"}] ).insert() result = frappe.qb.get_query( "Note", filters={"name": ["in", [note1.name, note2.name]]}, fields=["name", {"seen_by": ["*"]}], order_by="title asc", ).run(as_dict=1) self.assertTrue(isinstance(result[0].seen_by, list)) self.assertTrue(isinstance(result[1].seen_by, list)) self.assertEqual(len(result[0].seen_by), 1) self.assertEqual(len(result[1].seen_by), 2) self.assertEqual(result[0].seen_by[0].user, "Administrator") result = frappe.qb.get_query( "Note", filters={"name": ["in", [note1.name, note2.name]]}, fields=["name", {"seen_by": ["user"]}], order_by="title asc", ).run(as_dict=1) self.assertEqual(len(result[0].seen_by[0].keys()), 1) self.assertEqual(result[1].seen_by[1].user, "Guest") note1.delete() note2.delete() def test_build_match_conditions(self): from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype clear_user_permissions_for_doctype("Test Blog Post", "test2@example.com") test2user = frappe.get_doc("User", "test2@example.com") test2user.add_roles("Blogger") frappe.set_user("test2@example.com") # Before any user permission is applied, there should be no conditions query = frappe.qb.get_query("Test Blog Post", ignore_permissions=False) self.assertNotIn("(`tabBlog Post`.`name` in (", str(query)) # Add user permissions add_user_permission("Test Blog Post", "_Test Blog Post", "test2@example.com", True) add_user_permission("Test Blog Post", "_Test Blog Post 1", "test2@example.com", True) # After applying user permission, condition should be in query query = str(frappe.qb.get_query("Test Blog Post", ignore_permissions=False)) # Check for user permission condition in the query string if frappe.db.db_type == "mariadb": self.assertIn("IFNULL(`name`,'')='' OR `name` IN ('_Test Blog Post 1','_Test Blog Post')", query) elif frappe.db.db_type == "postgres": self.assertIn( "IFNULL(\"name\",'')='' OR \"name\" IN ('_Test Blog Post 1','_Test Blog Post')", query ) # works in pg due to `coalesce` sub during sql execution frappe.set_user("Administrator") clear_user_permissions_for_doctype("Test Blog Post", "test2@example.com") test2user.remove_roles("Blogger") def test_ignore_permissions_for_query(self): frappe.set_user("test2@example.com") with self.assertRaises(frappe.PermissionError): frappe.qb.get_query("DocType", filters={"istable": 1}, ignore_permissions=False) result = frappe.qb.get_query("DocType", filters={"istable": 1}, ignore_permissions=True).run() self.assertTrue(len(result) > 0) frappe.set_user("Administrator") def test_permlevel_fields(self): """Test permission level check when querying fields""" with setup_patched_blog_post(), setup_test_user(set_user=True): # Create a test blog post test_post = frappe.get_doc( { "doctype": "Test Blog Post", "title": "Test Permission Post", "content": "Test Content", "blog_category": "_Test Blog Category", "published": 1, } ).insert(ignore_permissions=True, ignore_mandatory=True) # Without proper permission, published field should be filtered out data = frappe.qb.get_query( "Test Blog Post", filters={"name": test_post.name}, fields=["name", "published", "title"], ignore_permissions=False, ).run(as_dict=1) field_list = [field for d in data for field in d.keys()] self.assertIn("title", field_list) self.assertIn("name", field_list) self.assertNotIn("published", field_list) # With Administrator, all fields should be accessible frappe.set_user("Administrator") data = frappe.qb.get_query( "Test Blog Post", filters={"name": test_post.name}, fields=["name", "published", "title"], ignore_permissions=False, ).run(as_dict=1) field_list = [field for d in data for field in d.keys()] self.assertIn("published", field_list) test_post.delete() def test_child_table_access_with_select_permission(self): """Test that child table fields are inaccessible if user only has select perm on parent.""" test_role = "Select Note Test Role" test_user_email = "test2@example.com" # Use existing test user test_note_title = "Child Select Test Note" # Cleanup frappe.set_user("Administrator") test_user = frappe.get_doc("User", test_user_email) test_user.remove_roles(test_role) frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) frappe.delete_doc("Note", {"title": test_note_title}, ignore_missing=True, force=True) # Setup Role with 'select' on Note and 'read' on Note Seen By frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) # Grant select on Note, read on Note Seen By add_permission("Note", test_role, 0, ptype="select") add_permission("Note Seen By", test_role, 0, ptype="read") # Ensure no read permission on Note for this role by explicitly setting it to 0 update_permission_property("Note", test_role, 0, "read", 0, validate=False) test_user.add_roles(test_role) note = frappe.get_doc( doctype="Note", title=test_note_title, public=1, seen_by=[{"user": "Administrator"}] ).insert(ignore_permissions=True) frappe.set_user(test_user_email) query = frappe.qb.get_query( "Note", filters={"name": note.name}, fields=["name", {"seen_by": ["user"]}], ignore_permissions=False, ) result = query.run(as_dict=True) self.assertEqual(len(result), 1, "Should find the note record") self.assertIn("name", result[0], "Parent field 'name' should be accessible") self.assertNotIn( "seen_by", result[0], "Child table field 'seen_by' should NOT be accessible with only 'select' on parent", ) # Cleanup frappe.set_user("Administrator") note.delete(ignore_permissions=True) test_user.remove_roles(test_role) frappe.delete_doc("Role", test_role, force=True) def test_nested_permission(self): """Test permission on nested doctypes""" frappe.set_user("Administrator") create_nested_doctype() create_nested_doctype_records() from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype clear_user_permissions_for_doctype("Nested DocType") # Add user permission for only one root folder add_user_permission("Nested DocType", "Level 1 A", "test2@example.com") from frappe.core.page.permission_manager.permission_manager import update # To avoid if_owner filter update("Nested DocType", "All", 0, "if_owner", 0) test2user = frappe.get_doc("User", "test2@example.com") test2user.add_roles("Blogger") with self.set_user("test2@example.com"): data = frappe.qb.get_query("Nested DocType", ignore_permissions=False).run(as_dict=1) # Children of the permitted node should be accessible self.assertTrue(any(d.name == "Level 2 A" for d in data)) # Other nodes should not be accessible self.assertFalse(any(d.name == "Level 1 B" for d in data)) self.assertFalse(any(d.name == "Level 2 B" for d in data)) update("Nested DocType", "All", 0, "if_owner", 1) # Reset to default def test_is_set_is_not_set(self): """Test is set and is not set filters""" result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "not set"]}).run(as_dict=1) self.assertTrue({"name": "Integration Request"} in result) self.assertTrue({"name": "User"} in result) self.assertFalse({"name": "Blogger"} in result) result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1) self.assertTrue({"name": "DocField"} in result) self.assertTrue({"name": "Prepared Report"} in result) self.assertFalse({"name": "Property Setter"} in result) # Test with updating value to NULL frappe.db.set_value("DocType", "Property Setter", "autoname", None, update_modified=False) result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1) self.assertFalse(any(d.name == "Property Setter" for d in result)) def test_permission_query_condition(self): """Test permission query condition being applied from hooks and server script""" from frappe.desk.doctype.dashboard_settings.dashboard_settings import create_dashboard_settings # Create a Dashboard Settings for test user self.doctype = "Dashboard Settings" self.user = "test@example.com" original_hooks = frappe.get_hooks("permission_query_conditions") or {} # Create test data create_dashboard_settings(self.user) # Hook condition will restrict to only name=Administrator, so our test user's record should not be found query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False) self.assertIn("`tabDashboard Settings`.name = ", str(query)) # Create a server script for permission query script = frappe.new_doc( doctype="Server Script", name="Dashboard Settings Permission Query", script_type="Permission Query", enabled=1, reference_doctype="Dashboard Settings", script=f"""conditions = '`tabDashboard Settings`.`user` = "{self.user}"'""", ).insert() # Test with server script # Script condition should allow the record to be found frappe.clear_cache() frappe.hooks.permission_query_conditions = {} # Clear hooks to test server script alone with enable_safe_exec(): query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False) self.assertIn(f'`tabDashboard Settings`.`user` = "{self.user}"', str(query)) # Cleanup script.delete() frappe.clear_cache() frappe.hooks.permission_query_conditions = original_hooks def test_link_field_target_permission(self): """Test that accessing link_field.target_field respects target field's permlevel.""" target_dt_name = "TargetDocForLinkPerm" source_dt_name = "SourceDocForLinkPerm" test_role = "LinkPermTestRole" test_user = "test2@example.com" # Cleanup previous runs frappe.set_user("Administrator") frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True) frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True) frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) test_user_doc = frappe.get_doc("User", test_user) test_user_doc.remove_roles(test_role) # Create Doctypes target_dt = new_doctype( target_dt_name, fields=[ {"fieldname": "target_field", "fieldtype": "Data", "permlevel": 1, "label": "Target Field"}, {"fieldname": "other_target_field", "fieldtype": "Data", "label": "Other Target Field"}, ], ).insert(ignore_if_duplicate=True) source_dt = new_doctype( source_dt_name, fields=[ { "fieldname": "link_field", "fieldtype": "Link", "options": target_dt_name, "label": "Link Field", } ], ).insert(ignore_if_duplicate=True) # Create Records target_doc = frappe.get_doc( doctype=target_dt_name, target_field="Secret Data", other_target_field="Public Data" ).insert(ignore_permissions=True) source_doc = frappe.get_doc(doctype=source_dt_name, link_field=target_doc.name).insert( ignore_permissions=True ) # Setup Role and Permissions frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) add_permission(source_dt_name, test_role, 0, ptype="read") add_permission(target_dt_name, test_role, 0, ptype="read") # Ensure no permlevel 1 read for test_role update_permission_property(target_dt_name, test_role, 1, "read", 0, validate=False) # Ensure System Manager can read permlevel 1 add_permission(target_dt_name, "System Manager", 1, ptype="read") test_user_doc.add_roles(test_role) # Test as the restricted user frappe.set_user(test_user) result_restricted = frappe.qb.get_query( source_dt_name, filters={"name": source_doc.name}, fields=[ "name", "link_field.target_field as linked_secret", "link_field.other_target_field as linked_public", ], ignore_permissions=False, ).run(as_dict=True) self.assertEqual(len(result_restricted), 1) self.assertIn( "linked_public", result_restricted[0], "Permlevel 0 target field should be accessible via link.", ) self.assertNotIn( "linked_secret", result_restricted[0], "Permlevel 1 target field should NOT be accessible via link for restricted user.", ) # Test as Administrator (who has System Manager role) frappe.set_user("Administrator") result_admin = frappe.qb.get_query( source_dt_name, filters={"name": source_doc.name}, fields=[ "name", "link_field.target_field as linked_secret", "link_field.other_target_field as linked_public", ], ignore_permissions=False, # Still check permissions, but Admin has them ).run(as_dict=True) self.assertEqual(len(result_admin), 1) self.assertIn( "linked_public", result_admin[0], "Permlevel 0 target field should be accessible for Admin." ) self.assertIn( "linked_secret", result_admin[0], "Permlevel 1 target field should be accessible for Admin." ) self.assertEqual(result_admin[0].linked_secret, "Secret Data") # Cleanup frappe.set_user("Administrator") source_doc.delete(ignore_permissions=True) target_doc.delete(ignore_permissions=True) source_dt.delete() target_dt.delete() test_user_doc.remove_roles(test_role) frappe.delete_doc("Role", test_role, force=True) def test_filter_direct_field_permission(self): """Test that filtering is only allowed on permitted direct fields.""" with setup_patched_blog_post(), setup_test_user(set_user=True) as user: # Create a test blog post test_post = frappe.get_doc( { "doctype": "Test Blog Post", "title": "Test Filter Permission Post", "content": "Test Content", "blog_category": "_Test Blog Category", "published": 1, # permlevel 1 } ).insert(ignore_permissions=True, ignore_mandatory=True, ignore_if_duplicate=True) # User has read permlevel 0, but not 1 (published field) # Try filtering on permitted field (title - permlevel 0) try: frappe.qb.get_query( "Test Blog Post", filters={"title": test_post.title}, ignore_permissions=False, user=user.name, ).run() except frappe.PermissionError as e: self.fail(f"Filtering on permitted field 'title' failed: {e}") # Try filtering on non-permitted field (published - permlevel 1) with self.assertRaises(frappe.PermissionError) as cm: frappe.qb.get_query( "Test Blog Post", filters={"published": 1}, ignore_permissions=False, user=user.name, ).run() self.assertIn("You do not have permission to access field", str(cm.exception)) self.assertIn("Blog Post.published", str(cm.exception)) # Cleanup frappe.set_user("Administrator") test_post.delete() def test_filter_linked_field_permission(self): """Test that filtering is only allowed on permitted linked fields.""" with setup_test_user(set_user=True) as user: target_dt_name = "TargetDocForFilterPerm" source_dt_name = "SourceDocForFilterPerm" test_role = "FilterPermTestRole" # Cleanup previous runs frappe.set_user("Administrator") frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True) frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True) frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) test_user_doc = frappe.get_doc("User", user.name) test_user_doc.remove_roles(test_role) # Create Doctypes target_dt = new_doctype( target_dt_name, fields=[ { "fieldname": "target_field", "fieldtype": "Data", "permlevel": 1, "label": "Target Field", }, {"fieldname": "other_target_field", "fieldtype": "Data", "label": "Other Target Field"}, ], ).insert(ignore_if_duplicate=True) source_dt = new_doctype( source_dt_name, fields=[ { "fieldname": "link_field", "fieldtype": "Link", "options": target_dt_name, "label": "Link Field", } ], ).insert(ignore_if_duplicate=True) # Create Records target_doc = frappe.get_doc( doctype=target_dt_name, target_field="Secret Data", other_target_field="Public Data" ).insert(ignore_permissions=True) source_doc = frappe.get_doc(doctype=source_dt_name, link_field=target_doc.name).insert( ignore_permissions=True ) # Setup Role and Permissions frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) add_permission(source_dt_name, test_role, 0, ptype="read") add_permission(target_dt_name, test_role, 0, ptype="read") update_permission_property( target_dt_name, test_role, 1, "read", 0, validate=False ) # No permlevel 1 read test_user_doc.add_roles(test_role) # Test as the restricted user frappe.set_user(user.name) # Try filtering on permitted linked field (other_target_field - permlevel 0) try: frappe.qb.get_query( source_dt_name, filters={"link_field.other_target_field": "Public Data"}, ignore_permissions=False, user=user.name, ).run() except frappe.PermissionError as e: self.fail(f"Filtering on permitted linked field 'link_field.other_target_field' failed: {e}") # Try filtering on non-permitted linked field (target_field - permlevel 1) with self.assertRaises(frappe.PermissionError) as cm_link: frappe.qb.get_query( source_dt_name, filters={"link_field.target_field": "Secret Data"}, ignore_permissions=False, user=user.name, ).run() self.assertIn("You do not have permission to access field", str(cm_link.exception)) self.assertIn(f"{target_dt_name}.target_field", str(cm_link.exception)) # Cleanup frappe.set_user("Administrator") source_doc.delete(ignore_permissions=True) target_doc.delete(ignore_permissions=True) source_dt.delete() target_dt.delete() test_user_doc.remove_roles(test_role) frappe.delete_doc("Role", test_role, force=True) def test_dynamic_fields_in_group_by(self): """Test dynamic field support in GROUP BY clause.""" try: query = frappe.qb.get_query( "DocType", fields=["module.app_name", "name"], group_by="module.app_name, name", ) result = query.run(as_dict=True) self.assertTrue(len(result) > 0) sql = query.get_sql() self.assertIn("LEFT JOIN", sql) self.assertIn("tabModule Def", sql) except Exception as e: self.fail(f"Dynamic link field in GROUP BY failed: {e}") note = frappe.get_doc( doctype="Note", title="Group By Test Note", seen_by=[{"user": "Administrator"}, {"user": "Guest"}] ).insert() try: query = frappe.qb.get_query( "Note", fields=["seen_by.user", "name"], filters={"name": note.name}, group_by="seen_by.user, name", ) result = query.run(as_dict=True) self.assertTrue(len(result) >= 1) sql = query.get_sql() self.assertIn("LEFT JOIN", sql) self.assertIn("tabNote Seen By", sql) except Exception as e: self.fail(f"Dynamic child field in GROUP BY failed: {e}") finally: note.delete() def test_dynamic_fields_in_order_by(self): """Test dynamic field support in ORDER BY clause.""" try: query = frappe.qb.get_query( "DocType", fields=["name", "module.app_name"], order_by="module.app_name DESC", limit=5 ) result = query.run(as_dict=True) self.assertTrue(len(result) > 0) sql = query.get_sql() self.assertIn("LEFT JOIN", sql) self.assertIn("tabModule Def", sql) self.assertIn("ORDER BY", sql) except Exception as e: self.fail(f"Dynamic link field in ORDER BY failed: {e}") note1 = frappe.get_doc( doctype="Note", title="Order Test Note 1", seen_by=[{"user": "Administrator"}] ).insert() note2 = frappe.get_doc( doctype="Note", title="Order Test Note 2", seen_by=[{"user": "Guest"}] ).insert() try: query = frappe.qb.get_query( "Note", fields=["name", "seen_by.user"], filters={"name": ["in", [note1.name, note2.name]]}, order_by="seen_by.user ASC", ) result = query.run(as_dict=True) self.assertTrue(len(result) >= 2) sql = query.get_sql() self.assertIn("LEFT JOIN", sql) self.assertIn("tabNote Seen By", sql) except Exception as e: self.fail(f"Dynamic child field in ORDER BY failed: {e}") finally: note1.delete() note2.delete() def test_multiple_dynamic_fields_group_order(self): """Test multiple dynamic fields in GROUP BY and ORDER BY.""" try: query = frappe.qb.get_query( "DocType", fields=["module", "module.app_name", "name"], group_by="module, module.app_name, name", order_by="module.app_name", ) result = query.run(as_dict=True) self.assertTrue(len(result) > 0) except Exception as e: self.fail(f"Multiple dynamic fields in GROUP BY/ORDER BY failed: {e}") def test_group_by_order_by_permission_checks(self): """Test permission checks for dynamic fields in GROUP BY and ORDER BY.""" target_dt_name = "TargetDocForGroupOrderPerm" source_dt_name = "SourceDocForGroupOrderPerm" test_role = "GroupOrderPermTestRole" test_user = "test2@example.com" frappe.set_user("Administrator") frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True) frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True) frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) test_user_doc = frappe.get_doc("User", test_user) test_user_doc.remove_roles(test_role) target_dt = new_doctype( target_dt_name, fields=[ { "fieldname": "restricted_field", "fieldtype": "Data", "permlevel": 1, "label": "Restricted Field", }, {"fieldname": "public_field", "fieldtype": "Data", "label": "Public Field"}, ], ).insert(ignore_if_duplicate=True) source_dt = new_doctype( source_dt_name, fields=[ { "fieldname": "link_field", "fieldtype": "Link", "options": target_dt_name, "label": "Link Field", }, ], ).insert(ignore_if_duplicate=True) frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) add_permission(source_dt_name, test_role, 0, ptype="read") add_permission(target_dt_name, test_role, 0, ptype="read") update_permission_property(target_dt_name, test_role, 1, "read", 0, validate=False) test_user_doc.add_roles(test_role) frappe.set_user(test_user) try: frappe.qb.get_query( source_dt_name, fields=["link_field.public_field", "name"], group_by="link_field.public_field", ignore_permissions=False, user=test_user, ).get_sql() except frappe.PermissionError as e: self.fail(f"GROUP BY with permitted field failed: {e}") with self.assertRaises(frappe.PermissionError) as cm: frappe.qb.get_query( source_dt_name, fields=["link_field.restricted_field", "name"], group_by="link_field.restricted_field", ignore_permissions=False, user=test_user, ).get_sql() self.assertIn("You do not have permission to access field", str(cm.exception)) self.assertIn("restricted_field", str(cm.exception)) try: frappe.qb.get_query( source_dt_name, fields=["name", "link_field.public_field"], order_by="link_field.public_field", ignore_permissions=False, user=test_user, ).get_sql() except frappe.PermissionError as e: self.fail(f"ORDER BY with permitted field failed: {e}") with self.assertRaises(frappe.PermissionError) as cm: frappe.qb.get_query( source_dt_name, fields=["name"], order_by="link_field.restricted_field", ignore_permissions=False, user=test_user, ).get_sql() self.assertIn("You do not have permission to access field", str(cm.exception)) self.assertIn("restricted_field", str(cm.exception)) frappe.set_user("Administrator") source_dt.delete() target_dt.delete() test_user_doc.remove_roles(test_role) frappe.delete_doc("Role", test_role, force=True) def test_child_table_group_by_order_by_permissions(self): """Test permission checks for child table fields in GROUP BY and ORDER BY.""" child_dt_name = "ChildDocForGroupOrderPerm" parent_dt_name = "ParentDocForGroupOrderPerm" test_role = "ChildGroupOrderPermTestRole" test_user_email = "test2@example.com" frappe.set_user("Administrator") frappe.delete_doc("DocType", child_dt_name, ignore_missing=True, force=True) frappe.delete_doc("DocType", parent_dt_name, ignore_missing=True, force=True) frappe.delete_doc("Role", test_role, ignore_missing=True, force=True) test_user_doc = frappe.get_doc("User", test_user_email) test_user_doc.remove_roles(test_role) child_dt = new_doctype( child_dt_name, fields=[ { "fieldname": "restricted_child_field", "fieldtype": "Data", "permlevel": 1, "label": "Restricted Child Field", }, {"fieldname": "public_child_field", "fieldtype": "Data", "label": "Public Child Field"}, ], istable=1, ).insert(ignore_if_duplicate=True) parent_dt = new_doctype( parent_dt_name, fields=[ { "fieldname": "child_table", "fieldtype": "Table", "options": child_dt_name, "label": "Child Table", }, ], ).insert(ignore_if_duplicate=True) frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True) add_permission(parent_dt_name, test_role, 0, ptype="read") add_permission(child_dt_name, test_role, 0, ptype="read") update_permission_property(child_dt_name, test_role, 1, "read", 0, validate=False) test_user_doc.add_roles(test_role) frappe.set_user(test_user_email) try: frappe.qb.get_query( parent_dt_name, fields=["child_table.public_child_field", "name"], group_by="child_table.public_child_field", ignore_permissions=False, user=test_user_email, ).get_sql() except frappe.PermissionError as e: self.fail(f"GROUP BY with permitted child field failed: {e}") with self.assertRaises(frappe.PermissionError) as cm: frappe.qb.get_query( parent_dt_name, fields=["child_table.restricted_child_field", "name"], group_by="child_table.restricted_child_field", ignore_permissions=False, user=test_user_email, ).get_sql() self.assertIn("You do not have permission to access field", str(cm.exception)) self.assertIn("restricted_child_field", str(cm.exception)) with self.assertRaises(frappe.PermissionError) as cm: frappe.qb.get_query( parent_dt_name, fields=["name"], order_by="child_table.restricted_child_field", ignore_permissions=False, user=test_user_email, ).get_sql() self.assertIn("You do not have permission to access field", str(cm.exception)) self.assertIn("restricted_child_field", str(cm.exception)) frappe.set_user("Administrator") parent_dt.delete() child_dt.delete() test_user_doc.remove_roles(test_role) frappe.delete_doc("Role", test_role, force=True) def test_group_by_order_by_validation_errors(self): """Test validation errors for invalid GROUP BY and ORDER BY fields.""" invalid_group_by_fields = [ "name; DROP TABLE users", "name--comment", "name /* comment */", "invalid-field-name", "field with space", "`field with space`", "name, email; SELECT 1", ] for field in invalid_group_by_fields: with self.assertRaises( frappe.ValidationError, msg=f"Invalid GROUP BY field '{field}' passed validation" ): frappe.qb.get_query("User", group_by=field).get_sql() invalid_order_by_fields = [ "name sideways", "name INVALID_DIRECTION", "name ASC;", "name, email; SELECT 1", ] for field in invalid_order_by_fields: with self.assertRaises( (frappe.ValidationError, ValueError), msg=f"Invalid ORDER BY field '{field}' passed validation", ): frappe.qb.get_query("User", order_by=field).get_sql() def test_backtick_rejection_group_order(self): """Test that malformed backticks are properly rejected in GROUP BY and ORDER BY.""" # Test single backtick (invalid notation - should be `tabTable`.`field`) with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", group_by="`name`").get_sql() self.assertIn("invalid backtick notation", str(cm.exception)) # Test single backtick with direction (invalid notation) with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", order_by="`name` ASC").get_sql() self.assertIn("invalid backtick notation", str(cm.exception)) # Test multiple single backticks (invalid notation) with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", group_by="`name`, `email`").get_sql() self.assertIn("invalid backtick notation", str(cm.exception)) # Valid backtick notation should work frappe.qb.get_query("User", group_by="`tabUser`.`name`").get_sql() frappe.qb.get_query("User", order_by="`tabUser`.`name` ASC").get_sql() def test_sql_functions_in_fields(self): """Test SQL function support in fields with various syntaxes.""" # Test simple function without alias query = frappe.qb.get_query("User", fields=["user_type", {"COUNT": "name"}], group_by="user_type") sql = query.get_sql() self.assertIn(self.normalize_sql("COUNT(`name`)"), sql) self.assertIn("GROUP BY", sql) # Test function with alias query = frappe.qb.get_query( "User", fields=[{"COUNT": "name", "as": "total_users"}], group_by="user_type" ) sql = query.get_sql() self.assertIn(self.normalize_sql("COUNT(`name`) `total_users`"), sql) # Test SUM function with alias query = frappe.qb.get_query( "User", fields=[{"SUM": "enabled", "as": "total_enabled"}], group_by="user_type" ) sql = query.get_sql() self.assertIn(self.normalize_sql("SUM(`enabled`) `total_enabled`"), sql) # Test MAX function query = frappe.qb.get_query( "User", fields=[{"MAX": "creation", "as": "latest_user"}], group_by="user_type" ) sql = query.get_sql() self.assertIn(self.normalize_sql("MAX(`creation`) `latest_user`"), sql) # Test MIN function query = frappe.qb.get_query( "User", fields=[{"MIN": "creation", "as": "earliest_user"}], group_by="user_type" ) sql = query.get_sql() self.assertIn(self.normalize_sql("MIN(`creation`) `earliest_user`"), sql) # Test AVG function query = frappe.qb.get_query( "User", fields=[{"AVG": "enabled", "as": "avg_enabled"}], group_by="user_type" ) sql = query.get_sql() self.assertIn(self.normalize_sql("AVG(`enabled`) `avg_enabled`"), sql) # Test ABS function query = frappe.qb.get_query("User", fields=[{"ABS": "enabled", "as": "abs_enabled"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("ABS(`enabled`) `abs_enabled`"), sql) # Test IFNULL function with two parameters query = frappe.qb.get_query( "User", fields=[{"IFNULL": ["first_name", "'Unknown'"], "as": "safe_name"}] ) sql = query.get_sql() self.assertIn( self.normalize_sql("IFNULL(`first_name`,'Unknown') `safe_name`"), self.normalize_sql(sql) ) # Test TIMESTAMP function query = frappe.qb.get_query("User", fields=[{"TIMESTAMP": "creation", "as": "ts"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("TIMESTAMP(`creation`) `ts`"), self.normalize_sql(sql)) # Test mixed regular fields and function fields query = frappe.qb.get_query( "User", fields=[ "user_type", {"COUNT": "name", "as": "total_users"}, {"MAX": "creation", "as": "latest_creation"}, ], group_by="user_type", ) sql = query.get_sql() self.assertIn(self.normalize_sql("`user_type`"), sql) self.assertIn(self.normalize_sql("COUNT(`name`) `total_users`"), sql) self.assertIn(self.normalize_sql("MAX(`creation`) `latest_creation`"), sql) # Test NOW function with no arguments query = frappe.qb.get_query("User", fields=[{"NOW": None, "as": "current_time"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("NOW() `current_time`"), sql) # Test CONCAT function (which is supported) query = frappe.qb.get_query( "User", fields=[{"CONCAT": ["first_name", "last_name"], "as": "full_name"}] ) sql = query.get_sql() self.assertIn( self.normalize_sql("CONCAT(`first_name`,`last_name`) `full_name`"), self.normalize_sql(sql) ) # Test unsupported function validation with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"UNSUPPORTED_FUNC": "name"}]).get_sql() self.assertIn("Unsupported function or operator: UNSUPPORTED_FUNC", str(cm.exception)) # Test unsupported function that might be confused with child field with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"UPPER": ["first_name"]}]).get_sql() self.assertIn("Unsupported function or operator: UPPER", str(cm.exception)) # Test SQL injection attempt with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"DROP": "TABLE users"}]).get_sql() self.assertIn("Unsupported function or operator: DROP", str(cm.exception)) def test_arithmetic_operators_in_fields(self): """Test arithmetic operator support in fields.""" # Test simple addition query = frappe.qb.get_query("User", fields=[{"ADD": [1, 2], "as": "sum_result"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("1+2 `sum_result`"), sql) # Test simple subtraction query = frappe.qb.get_query("User", fields=[{"SUB": [10, 5], "as": "diff_result"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("10-5 `diff_result`"), sql) # Test simple multiplication query = frappe.qb.get_query("User", fields=[{"MUL": [3, 4], "as": "prod_result"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("3*4 `prod_result`"), sql) # Test simple division query = frappe.qb.get_query("User", fields=[{"DIV": [10, 2], "as": "div_result"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("10/2 `div_result`"), sql) # Test operator with field names query = frappe.qb.get_query("User", fields=[{"ADD": ["enabled", "login_after"], "as": "field_sum"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("`enabled`+`login_after` `field_sum`"), sql) # Test nested operators query = frappe.qb.get_query("User", fields=[{"ADD": [{"MUL": [2, 3]}, 4], "as": "nested_result"}]) sql = query.get_sql() self.assertIn(self.normalize_sql("2*3+4 `nested_result`"), sql) # Test operator with function - NULLIF query = frappe.qb.get_query( "User", fields=[{"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "safe_div"}] ) sql = query.get_sql() self.assertIn(self.normalize_sql("1/NULLIF(`enabled`,0) `safe_div`"), self.normalize_sql(sql)) # Test complex nested expression: (1 / NULLIF(value, 0)) query = frappe.qb.get_query( "User", fields=[ "name", {"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "inverse"}, ], ) sql = query.get_sql() self.assertIn(self.normalize_sql("`name`"), sql) self.assertIn(self.normalize_sql("1/NULLIF(`enabled`,0) `inverse`"), self.normalize_sql(sql)) # Test operator with LOCATE function (search relevance pattern) query = frappe.qb.get_query( "User", fields=[ "name", {"DIV": [1, {"NULLIF": [{"LOCATE": ["'test'", "name"]}, 0]}], "as": "relevance"}, ], ) sql = query.get_sql() self.assertIn( self.normalize_sql("1/NULLIF(LOCATE('test',`name`),0) `relevance`"), self.normalize_sql(sql), ) # Test multiple operators in fields query = frappe.qb.get_query( "User", fields=[ "name", {"ADD": ["enabled", 1], "as": "enabled_plus_one"}, {"MUL": ["enabled", 2], "as": "enabled_times_two"}, ], ) sql = query.get_sql() self.assertIn(self.normalize_sql("`name`"), sql) self.assertIn(self.normalize_sql("`enabled`+1 `enabled_plus_one`"), sql) self.assertIn(self.normalize_sql("`enabled`*2 `enabled_times_two`"), sql) # Test operator without alias query = frappe.qb.get_query("User", fields=[{"ADD": [1, 1]}]) sql = query.get_sql() self.assertIn("1+1", sql) # Test validation: operator requires exactly 2 arguments with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"ADD": [1, 2, 3]}]).get_sql() self.assertIn("requires exactly 2 arguments", str(cm.exception)) # Test validation: operator with only 1 argument with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"DIV": [10]}]).get_sql() self.assertIn("requires exactly 2 arguments", str(cm.exception)) # Test validation: operator with non-list arguments with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"MUL": "invalid"}]).get_sql() self.assertIn("requires exactly 2 arguments", str(cm.exception)) # Test validation: unsupported operator with self.assertRaises(frappe.ValidationError) as cm: frappe.qb.get_query("User", fields=[{"XOR": [1, 2]}]).get_sql() self.assertIn("Unsupported function or operator: XOR", str(cm.exception)) # Test deeply nested expression query = frappe.qb.get_query( "User", fields=[ { "DIV": [ {"ADD": [{"MUL": [2, 3]}, 4]}, {"SUB": [10, 5]}, ], "as": "complex_expr", } ], ) sql = query.get_sql() # PyPika adds parentheses for clarity in complex expressions self.assertIn("complex_expr", sql) self.assertIn("/", sql) def test_not_equal_condition_on_none(self): self.assertQueryEqual( frappe.qb.get_query( "DocType", ["*"], [ ["DocField", "name", "=", None], ["DocType", "parent", "!=", None], ], ).get_sql(), "SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` IS NULL AND `tabDocType`.`parent`<>''", ) def test_field_alias_in_group_by(self): query = frappe.qb.get_query( "User", fields=["creation as created_date", {"COUNT": "*"}], group_by="created_date", order_by="created_date", ) sql = query.get_sql() self.assertIn(self.normalize_sql("GROUP BY `created_date`"), self.normalize_sql(sql)) if ( frappe.db.db_type != "postgres" ): # since Postgres requires fields in Order by to be grouped or aggregated, order by is dropped self.assertIn(self.normalize_sql("ORDER BY `created_date`"), self.normalize_sql(sql)) self.assertIn(self.normalize_sql("`creation` `created_date`"), self.normalize_sql(sql)) def test_field_alias_permission_check(self): query = frappe.qb.get_query( "User", fields=["creation as created_date", {"COUNT": "*"}], group_by="created_date", ) sql = query.get_sql() # If we get here without PermissionError, the test passes self.assertIn(self.normalize_sql("GROUP BY `created_date`"), self.normalize_sql(sql)) def test_between_datetime_expansion(self): """Test that date strings are expanded to datetime ranges for Datetime fields with 'between' operator""" # Test with creation field (standard datetime field) query = frappe.qb.get_query( "User", filters={"creation": ["between", ["2025-12-01", "2025-12-01"]]}, ) sql = query.get_sql() # Date strings should be expanded to datetime ranges self.assertIn("2025-12-01 00:00:00", sql) self.assertIn("2025-12-01 23:59:59", sql) def test_timespan_datetime_expansion(self): """Test that timespan operator expands dates to datetime ranges for Datetime fields""" query = frappe.qb.get_query( "User", filters={"creation": ["timespan", "last 7 days"]}, ) sql = query.get_sql() # Timespan should expand dates to datetime ranges (start of first day, end of last day) # Should have times like 00:00:00 and 23:59:59 self.assertIn("00:00:00", sql) self.assertIn("23:59:59", sql) def test_share_only_access(self): """Test that shared docs grant access when user has no role permissions.""" import frappe.share test_user = "test2@example.com" # Create a private event (only owner can see by default) event = frappe.get_doc( doctype="Event", subject="Share Only Test Event", starts_on="2025-01-01 10:00:00", event_type="Private", ).insert() self.addCleanup(event.delete) self.addCleanup(lambda: frappe.set_user("Administrator")) # Verify user can't access without share frappe.set_user(test_user) result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() self.assertEqual(len(result), 0, "User should not see event without share") # Share the document frappe.set_user("Administrator") frappe.share.add("Event", event.name, test_user) # Now user should be able to access via share frappe.set_user(test_user) result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run() self.assertEqual(len(result), 1, "User should see event via share") def test_if_owner_constraint_with_shared_docs(self): """Test that shared docs trump if_owner constraint.""" import frappe.share from frappe.core.page.permission_manager.permission_manager import update test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) test_user_doc.add_roles("Blogger") # Create blog post owned by Administrator blog_post = frappe.get_doc( doctype="Test Blog Post", title="If Owner Test Post", content="Test Content", blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) # Enable if_owner constraint for Test Blog Post update("Test Blog Post", "Blogger", 0, "if_owner", 1) self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) self.addCleanup(blog_post.delete) self.addCleanup(lambda: update("Test Blog Post", "Blogger", 0, "if_owner", 0)) self.addCleanup(lambda: frappe.set_user("Administrator")) # User shouldn't see it (not owner, if_owner enabled) frappe.set_user(test_user) result = frappe.qb.get_query( "Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False ).run() self.assertEqual(len(result), 0, "User should not see post owned by others with if_owner") # Share the document frappe.set_user("Administrator") frappe.share.add("Test Blog Post", blog_post.name, test_user) # Now user should see it via share (shared docs trump if_owner) frappe.set_user(test_user) result = frappe.qb.get_query( "Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False ).run() self.assertEqual(len(result), 1, "User should see post via share despite if_owner") def test_user_permission_with_shared_docs(self): """Test that shared docs grant access even when user permission doesn't match.""" import frappe.share from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) test_user_doc.add_roles("Blogger") # Create two blog posts blog_post1 = frappe.get_doc( doctype="Test Blog Post", title="User Perm Test Post 1", content="Test Content", blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) blog_post2 = frappe.get_doc( doctype="Test Blog Post", title="User Perm Test Post 2", content="Test Content", blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) clear_user_permissions_for_doctype("Test Blog Post", test_user) # Add user permission for only post1 add_user_permission("Test Blog Post", blog_post1.name, test_user, True) self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) self.addCleanup(blog_post2.delete) self.addCleanup(blog_post1.delete) self.addCleanup(lambda: clear_user_permissions_for_doctype("Test Blog Post", test_user)) self.addCleanup(lambda: frappe.set_user("Administrator")) # User should see post1 via user permission frappe.set_user(test_user) result = frappe.qb.get_query( "Test Blog Post", filters={"name": blog_post1.name}, ignore_permissions=False ).run() self.assertEqual(len(result), 1, "User should see post1 via user permission") # User should NOT see post2 (no user permission) result = frappe.qb.get_query( "Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False ).run() self.assertEqual(len(result), 0, "User should not see post2 without user permission") # Share post2 with user frappe.set_user("Administrator") frappe.share.add("Test Blog Post", blog_post2.name, test_user) # Now user should see post2 via share (shared docs trump user permissions) frappe.set_user(test_user) result = frappe.qb.get_query( "Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False ).run() self.assertEqual(len(result), 1, "User should see post2 via share") def test_role_permission_without_restrictions(self): """Test that all documents are accessible when role permissions exist without if_owner/user_perms.""" from frappe.core.page.permission_manager.permission_manager import update from frappe.permissions import clear_user_permissions_for_doctype test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) test_user_doc.add_roles("Blogger") # Clear any user permissions clear_user_permissions_for_doctype("Test Blog Post", test_user) # Ensure if_owner is disabled update("Test Blog Post", "Blogger", 0, "if_owner", 0) # Create blog posts owned by Administrator blog_post1 = frappe.get_doc( doctype="Test Blog Post", title="No Restriction Test 1", content="Test Content", blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) blog_post2 = frappe.get_doc( doctype="Test Blog Post", title="No Restriction Test 2", content="Test Content", blog_category="_Test Blog Category", ).insert(ignore_permissions=True, ignore_mandatory=True) self.addCleanup(lambda: test_user_doc.remove_roles("Blogger")) self.addCleanup(blog_post2.delete) self.addCleanup(blog_post1.delete) self.addCleanup(lambda: frappe.set_user("Administrator")) # User should see both posts (no restrictions) frappe.set_user(test_user) result = frappe.qb.get_query( "Test Blog Post", filters={"name": ["in", [blog_post1.name, blog_post2.name]]}, ignore_permissions=False, ).run() self.assertEqual(len(result), 2, "User should see all posts without restrictions") def test_child_table_permission_uses_parent_doctype(self): """Test that child table queries use parent doctype for permission checks.""" # DocField is a child table of DocType # When querying with parent_doctype, permissions should be checked against DocType test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) self.ensure_system_manager(test_user_doc, should_have=False) self.addCleanup(lambda: frappe.set_user("Administrator")) frappe.set_user(test_user) # Query child table with parent_doctype - should use DocType's permissions with self.assertRaises(frappe.PermissionError): frappe.qb.get_query( "DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False ).run() # Give user read access to DocType frappe.set_user("Administrator") test_user_doc.add_roles("System Manager") frappe.set_user(test_user) # Now query should succeed result = frappe.qb.get_query( "DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False, limit=1 ).run() # Query should succeed and return results (tuple or list) self.assertTrue(len(result) >= 0, "Query should succeed with proper permissions") def test_child_table_filters_orphaned_rows(self): """Test that child table queries filter out orphaned rows (rows without valid parent).""" test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) self.ensure_system_manager(test_user_doc, should_have=True) # Create a child table row with non-existent parent frappe.db.sql( """ INSERT INTO `tabDefaultValue` (name, parent, parenttype, parentfield, defkey, defvalue) VALUES ('_test_orphan_row', '_non_existent_parent', 'User', 'defaults', 'test_key', 'test_value') """ ) self.addCleanup( lambda: frappe.db.sql("DELETE FROM `tabDefaultValue` WHERE name = '_test_orphan_row'") ) self.addCleanup(lambda: frappe.set_user("Administrator")) frappe.set_user(test_user) # Query with parent_doctype - orphaned row should be filtered out by inner join result = frappe.qb.get_query( "DefaultValue", fields=["name"], filters={"name": "_test_orphan_row"}, parent_doctype="User", ignore_permissions=False, ).run() self.assertEqual(len(result), 0, "Orphaned child row should be filtered out") def test_child_table_of_single_doctype(self): """Test querying child tables whose parent is a Single doctype. Single doctypes don't have physical tables, so we can't join to them. This tests that the query works correctly without the join. """ test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) self.ensure_system_manager(test_user_doc, should_have=True) self.addCleanup(lambda: frappe.set_user("Administrator")) frappe.set_user(test_user) # Log Settings is a Single doctype with child table "Logs To Clear" # Query should work without trying to join the non-existent parent table result = frappe.qb.get_query( "Logs To Clear", fields=["name", "ref_doctype", "days"], parent_doctype="Log Settings", ignore_permissions=False, ).run() # Query should succeed (may return empty if no logs configured) self.assertIsInstance(result, (list, tuple), "Query should return results without SQL error") def test_child_table_of_single_doctype_without_permission(self): """Test that permission checks work for child tables of Single doctypes.""" test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) self.ensure_system_manager(test_user_doc, should_have=False) self.addCleanup(lambda: frappe.set_user("Administrator")) frappe.set_user(test_user) # User without System Manager role should not be able to access Log Settings children with self.assertRaises(frappe.PermissionError): frappe.qb.get_query( "Logs To Clear", fields=["name"], parent_doctype="Log Settings", ignore_permissions=False, ).run() def test_combined_raw_criterion_precedence(self): """Test that CombinedRawCriterion properly groups OR conditions. When permission conditions (like permission_query_conditions) are combined with shared docs via OR, the entire expression must be wrapped in parentheses to ensure correct operator precedence with other WHERE filters. Without proper grouping: WHERE filter=X AND perm_cond OR shared_cond -- shared_cond ignores filter! With proper grouping: WHERE filter=X AND (perm_cond OR shared_cond) -- correct behavior """ from frappe.database.query import CombinedRawCriterion, RawCriterion # Test that CombinedRawCriterion wraps the entire expression left = RawCriterion("a = 1") right = RawCriterion("b = 2") combined = left | right self.assertIsInstance(combined, CombinedRawCriterion) sql = combined.get_sql() # Should have outer parentheses: ((a = 1) OR (b = 2)) self.assertTrue(sql.startswith("(("), f"Should start with '((' but got: {sql}") self.assertTrue(sql.endswith("))"), f"Should end with '))' but got: {sql}") # Test nested combinations third = RawCriterion("c = 3") nested = combined & third nested_sql = nested.get_sql() # The AND combination should also be properly grouped self.assertIn("OR", nested_sql) self.assertIn("AND", nested_sql) def test_permission_query_conditions_with_filter(self): """Test that filters work correctly when permission_query_conditions and shares exist. This is a regression test for the CombinedRawCriterion fix - ensures that explicit filters are not bypassed by shared doc conditions. """ test_user = "test2@example.com" test_user_doc = frappe.get_doc("User", test_user) self.ensure_system_manager(test_user_doc, should_have=True) self.addCleanup(lambda: frappe.set_user("Administrator")) frappe.set_user(test_user) # User doctype has permission_query_conditions hook # test2@example.com is shared their own User doc # Query with a filter that should NOT match any row result = frappe.qb.get_query( "User", fields=["name"], filters={"name": "_non_existent_user_12345"}, ignore_permissions=False, ).run() # Even though user has shared access to their own User doc, # the filter should still apply and return no results self.assertEqual(len(result), 0, "Filter should not be bypassed by shared doc OR condition") # This function is used as a permission query condition hook def test_permission_hook_condition(user): return "`tabDashboard Settings`.`name` = 'Administrator'"