2644 lines
87 KiB
Python
2644 lines
87 KiB
Python
import itertools
|
|
|
|
import frappe
|
|
from frappe.core.doctype.doctype.test_doctype import new_doctype
|
|
from frappe.permissions import add_permission, update_permission_property
|
|
from frappe.query_builder import Field
|
|
from frappe.query_builder.functions import Abs, Count, Ifnull, Max, Now, Timestamp
|
|
from frappe.tests import IntegrationTestCase
|
|
from frappe.tests.classes.context_managers import enable_safe_exec
|
|
from frappe.tests.test_db_query import (
|
|
create_nested_doctype,
|
|
create_nested_doctype_records,
|
|
setup_patched_blog_post,
|
|
setup_test_user,
|
|
)
|
|
from frappe.tests.test_helpers import setup_for_tests
|
|
from frappe.tests.test_query_builder import db_type_is, run_only_if
|
|
from frappe.utils.nestedset import get_ancestors_of, get_descendants_of
|
|
|
|
EXTRA_TEST_RECORD_DEPENDENCIES = ["User"]
|
|
|
|
|
|
def create_tree_docs():
|
|
records = [
|
|
{
|
|
"some_fieldname": "Root Node",
|
|
"parent_test_tree_doctype": None,
|
|
"is_group": 1,
|
|
},
|
|
{
|
|
"some_fieldname": "Parent 1",
|
|
"parent_test_tree_doctype": "Root Node",
|
|
"is_group": 1,
|
|
},
|
|
{
|
|
"some_fieldname": "Parent 2",
|
|
"parent_test_tree_doctype": "Root Node",
|
|
"is_group": 1,
|
|
},
|
|
{
|
|
"some_fieldname": "Child 1",
|
|
"parent_test_tree_doctype": "Parent 1",
|
|
"is_group": 0,
|
|
},
|
|
{
|
|
"some_fieldname": "Child 2",
|
|
"parent_test_tree_doctype": "Parent 1",
|
|
"is_group": 0,
|
|
},
|
|
{
|
|
"some_fieldname": "Child 3",
|
|
"parent_test_tree_doctype": "Parent 2",
|
|
"is_group": 0,
|
|
},
|
|
]
|
|
|
|
tree_doctype = new_doctype("Test Tree DocType", is_tree=True, autoname="field:some_fieldname")
|
|
tree_doctype.insert()
|
|
|
|
for record in records:
|
|
d = frappe.new_doc("Test Tree DocType")
|
|
d.update(record)
|
|
d.insert()
|
|
|
|
|
|
class TestQuery(IntegrationTestCase):
|
|
def setUp(self):
|
|
setup_for_tests()
|
|
|
|
def ensure_system_manager(self, user_doc, should_have: bool):
|
|
"""Ensure user has/doesn't have System Manager role, with cleanup to restore original state."""
|
|
had_role = "System Manager" in [r.role for r in user_doc.roles]
|
|
cleanup_func = user_doc.add_roles if had_role else user_doc.remove_roles
|
|
self.addCleanup(lambda: cleanup_func("System Manager"))
|
|
|
|
if should_have and not had_role:
|
|
user_doc.add_roles("System Manager")
|
|
elif not should_have and had_role:
|
|
user_doc.remove_roles("System Manager")
|
|
|
|
def test_multiple_tables_in_filters(self):
|
|
query = "SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` LIKE 'f%' AND `tabDocType`.`parent`='something'"
|
|
query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE")
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
["*"],
|
|
[
|
|
["DocField", "name", "like", "f%"],
|
|
["DocType", "parent", "=", "something"],
|
|
],
|
|
).get_sql(),
|
|
query,
|
|
)
|
|
|
|
def test_string_fields(self):
|
|
self.assertEqual(
|
|
frappe.qb.get_query("User", fields="name, email", filters={"name": "Administrator"}).get_sql(),
|
|
frappe.qb.from_("User")
|
|
.select(Field("name"), Field("email"))
|
|
.where(Field("name") == "Administrator")
|
|
.get_sql(),
|
|
)
|
|
self.assertEqual(
|
|
frappe.qb.get_query(
|
|
"User", fields=["`name`, `email`"], filters={"name": "Administrator"}
|
|
).get_sql(),
|
|
frappe.qb.from_("User")
|
|
.select(Field("name"), Field("email"))
|
|
.where(Field("name") == "Administrator")
|
|
.get_sql(),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.qb.get_query(
|
|
"User", fields=["`tabUser`.`name`", "`tabUser`.`email`"], filters={"name": "Administrator"}
|
|
).run(),
|
|
frappe.qb.from_("User")
|
|
.select(Field("name"), Field("email"))
|
|
.where(Field("name") == "Administrator")
|
|
.run(),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.qb.get_query(
|
|
"User",
|
|
fields=["`tabUser`.`name` as owner", "`tabUser`.`email`"],
|
|
filters={"name": "Administrator"},
|
|
).run(as_dict=1),
|
|
frappe.qb.from_("User")
|
|
.select(Field("name").as_("owner"), Field("email"))
|
|
.where(Field("name") == "Administrator")
|
|
.run(as_dict=1),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.qb.get_query("User", fields=[Count("*")]).get_sql(),
|
|
frappe.qb.from_("User").select(Count("*")).get_sql(),
|
|
)
|
|
|
|
def test_qb_fields(self):
|
|
user_doctype = frappe.qb.DocType("User")
|
|
self.assertEqual(
|
|
frappe.qb.get_query(
|
|
user_doctype, fields=[user_doctype.name, user_doctype.email], filters={}
|
|
).get_sql(),
|
|
frappe.qb.from_(user_doctype).select(user_doctype.name, user_doctype.email).get_sql(),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.qb.get_query(user_doctype, fields=user_doctype.email, filters={}).get_sql(),
|
|
frappe.qb.from_(user_doctype).select(user_doctype.email).get_sql(),
|
|
)
|
|
|
|
def test_field_validation_select(self):
|
|
"""Test validation for fields in SELECT clause."""
|
|
|
|
valid_fields = [
|
|
"name",
|
|
"`name`",
|
|
"tabUser.name",
|
|
"`tabUser`.`name`",
|
|
"name as alias",
|
|
"`name` as alias",
|
|
"tabUser.name as alias",
|
|
"`tabUser`.`name` as alias",
|
|
"*",
|
|
"`tabHas Role`.`name`",
|
|
"field as `alias with space`",
|
|
"frappé", # unicode field names should be valid
|
|
]
|
|
|
|
invalid_fields = [
|
|
"name; DROP TABLE users",
|
|
"`name` ; SELECT * FROM secrets",
|
|
"name--comment",
|
|
"name /* comment */",
|
|
"name AS alias; --",
|
|
"invalid-field-name",
|
|
"table.invalid-field",
|
|
"`table`.`invalid-field`",
|
|
"field with space",
|
|
"`field with space`",
|
|
"field as alias with space",
|
|
"COUNT(*)",
|
|
"COUNT(name)",
|
|
"SUM(amount) as total",
|
|
"COUNT(name) as alias; SELECT 1",
|
|
"COUNT(name;)",
|
|
"`name",
|
|
"name`",
|
|
"`tabUser.name`",
|
|
"tabUser.`name",
|
|
"tabUser`.`name`",
|
|
"tab`User.name",
|
|
]
|
|
|
|
for field in valid_fields:
|
|
try:
|
|
frappe.qb.get_query("User", fields=field).get_sql()
|
|
# Test as list item too
|
|
frappe.qb.get_query("User", fields=[field]).get_sql()
|
|
except Exception as e:
|
|
self.fail(f"Valid SELECT field '{field}' failed validation: {e}")
|
|
|
|
for field in invalid_fields:
|
|
with self.assertRaises(
|
|
(frappe.PermissionError, frappe.ValidationError),
|
|
msg=f"Invalid SELECT field '[{field}]' passed validation",
|
|
):
|
|
frappe.qb.get_query("User", fields=[field]).get_sql()
|
|
|
|
def test_field_validation_filters(self):
|
|
"""Test validation for fields used in filters (WHERE clause)."""
|
|
valid_fields = ["name", "creation", "language.name", "`tabUser`.`name`"]
|
|
# Filters should not allow aliases or functions directly as field names
|
|
invalid_fields = [
|
|
"tabUser.name",
|
|
"name as alias",
|
|
"`name` as alias",
|
|
"tabUser.name as alias",
|
|
"`tabUser`.`name` as alias",
|
|
"COUNT(*)",
|
|
"COUNT(name)",
|
|
"name; DROP TABLE users",
|
|
"`name` ; SELECT * FROM secrets",
|
|
"name--comment",
|
|
"name /* comment */",
|
|
"invalid-field-name",
|
|
"table.invalid-field",
|
|
"`table`.`invalid-field`",
|
|
"field with space",
|
|
"`field with space`",
|
|
"`name`",
|
|
"`name",
|
|
"name`",
|
|
"tabUser.`name`",
|
|
"`tabUser.name`",
|
|
]
|
|
|
|
for field in valid_fields:
|
|
try:
|
|
# Test in dict filter
|
|
frappe.qb.get_query("User", filters={field: "value"}).get_sql()
|
|
# Test in list filter
|
|
frappe.qb.get_query("User", filters=[[field, "=", "value"]]).get_sql()
|
|
# Test in list filter with doctype
|
|
frappe.qb.get_query("User", filters=[["User", field, "=", "value"]]).get_sql()
|
|
except Exception as e:
|
|
self.fail(f"Valid filter field '{field}' failed validation: {e}")
|
|
|
|
for field in invalid_fields:
|
|
with self.assertRaises(
|
|
frappe.ValidationError, msg=f"Invalid filter field '{{{field}: val}}' passed validation"
|
|
):
|
|
frappe.qb.get_query("User", filters={field: "value"}).get_sql()
|
|
|
|
def test_field_validation_group_by(self):
|
|
"""Test validation for fields in GROUP BY clause."""
|
|
valid_fields = [
|
|
"name",
|
|
"1", # Allow numeric indices
|
|
"name, email",
|
|
"1, 2",
|
|
"`tabUser`.`name`",
|
|
]
|
|
# GROUP BY should not allow aliases or functions
|
|
invalid_fields = [
|
|
"name as alias",
|
|
"COUNT(*)",
|
|
"COUNT(name)",
|
|
"name; DROP TABLE users",
|
|
"`name` ; SELECT * FROM secrets",
|
|
"name--comment",
|
|
"name /* comment */",
|
|
"invalid-field-name",
|
|
"table.invalid-field",
|
|
"tabUser.name",
|
|
"`name`",
|
|
"`name`, `tabUser`.`email`",
|
|
"`table`.`invalid-field`",
|
|
"field with space",
|
|
"`field with space`",
|
|
"name, email; SELECT 1",
|
|
]
|
|
|
|
for group_by_str in valid_fields:
|
|
try:
|
|
frappe.qb.get_query("User", group_by=group_by_str).get_sql()
|
|
except Exception as e:
|
|
self.fail(f"Valid GROUP BY string '{group_by_str}' failed validation: {e}")
|
|
|
|
for group_by_str in invalid_fields:
|
|
with self.assertRaises(
|
|
(frappe.PermissionError, frappe.ValidationError),
|
|
msg=f"Invalid GROUP BY string '{group_by_str}' passed validation",
|
|
):
|
|
frappe.qb.get_query("User", group_by=group_by_str).get_sql()
|
|
|
|
def test_field_validation_order_by(self):
|
|
"""Test validation for fields in ORDER BY clause."""
|
|
valid_fields = [
|
|
"name",
|
|
"1", # Allow numeric indices
|
|
"name asc",
|
|
"1 asc",
|
|
"2 DESC",
|
|
"name, email",
|
|
"1 asc, 2 desc",
|
|
"`tabUser`.`name`",
|
|
"`tabUser`.`name` desc",
|
|
]
|
|
# ORDER BY should not allow aliases or functions, or invalid directions
|
|
invalid_fields = [
|
|
"name as alias",
|
|
"COUNT(*)",
|
|
"COUNT(name)",
|
|
"name; DROP TABLE users",
|
|
"`name` ; SELECT * FROM secrets",
|
|
"name--comment",
|
|
"name /* comment */",
|
|
"`name`",
|
|
"tabUser.name",
|
|
"`name` DESC",
|
|
"tabUser.name Asc",
|
|
"`name` asc, `tabUser`.`email` DESC",
|
|
"invalid-field-name",
|
|
"table.invalid-field",
|
|
"`table`.`invalid-field`",
|
|
"field with space",
|
|
"`field with space`",
|
|
"name sideways",
|
|
"name ASC;",
|
|
"name, email; SELECT 1",
|
|
"name INVALID_DIRECTION",
|
|
]
|
|
|
|
for order_by_str in valid_fields:
|
|
try:
|
|
frappe.qb.get_query("User", order_by=order_by_str).get_sql()
|
|
except Exception as e:
|
|
self.fail(f"Valid ORDER BY string '{order_by_str}' failed validation: {e}")
|
|
|
|
for order_by_str in invalid_fields:
|
|
with self.assertRaises(
|
|
(frappe.PermissionError, ValueError, frappe.ValidationError),
|
|
msg=f"Invalid ORDER BY string '{order_by_str}' passed validation",
|
|
):
|
|
frappe.qb.get_query("User", order_by=order_by_str).get_sql()
|
|
|
|
def test_aliasing(self):
|
|
user_doctype = frappe.qb.DocType("User")
|
|
self.assertEqual(
|
|
frappe.qb.get_query("User", fields=["name as owner", "email as id"], filters={}).get_sql(),
|
|
frappe.qb.from_(user_doctype)
|
|
.select(user_doctype.name.as_("owner"), user_doctype.email.as_("id"))
|
|
.get_sql(),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.qb.get_query(user_doctype, fields="name as owner, email as id", filters={}).get_sql(),
|
|
frappe.qb.from_(user_doctype)
|
|
.select(user_doctype.name.as_("owner"), user_doctype.email.as_("id"))
|
|
.get_sql(),
|
|
)
|
|
|
|
def test_filters(self):
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"module.app_name": "frappe"},
|
|
).get_sql(),
|
|
"SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabModule Def` `tabModule Def_module` ON `tabModule Def_module`.`name`=`tabDocType`.`module` WHERE `tabModule Def_module`.`app_name`='frappe'",
|
|
)
|
|
|
|
query = "SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabModule Def` `tabModule Def_module` ON `tabModule Def_module`.`name`=`tabDocType`.`module` WHERE `tabModule Def_module`.`app_name` LIKE 'frap%'"
|
|
query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE")
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"module.app_name": ("like", "frap%")},
|
|
).get_sql(),
|
|
query,
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"permissions.role": "System Manager"},
|
|
).get_sql(),
|
|
"SELECT `tabDocType`.`name` FROM `tabDocType` LEFT JOIN `tabDocPerm` ON `tabDocPerm`.`parent`=`tabDocType`.`name` AND `tabDocPerm`.`parenttype`='DocType' AND `tabDocPerm`.`parentfield`='permissions' WHERE `tabDocPerm`.`role`='System Manager'",
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["module"],
|
|
filters="",
|
|
).get_sql(),
|
|
"SELECT `module` FROM `tabDocType` WHERE `name`=''",
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
filters=["ToDo", "Note"],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name` IN ('ToDo','Note')",
|
|
)
|
|
|
|
# Empty list with IN operator should return 0 results (1=0 condition)
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
filters={"name": ("in", [])},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE 1=0",
|
|
)
|
|
|
|
# Empty list with NOT IN operator should return all results (1=1 condition)
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
filters={"name": ("not in", [])},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE 1=1",
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
filters=[1, 2, 3],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name` IN (1,2,3)",
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
filters=[],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType`",
|
|
)
|
|
|
|
def test_or_filters(self):
|
|
"""Test OR filter conditions."""
|
|
# Test 1: Basic dict or_filters
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters={"name": "User", "module": "Core"},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'",
|
|
)
|
|
|
|
# Test 2: List format or_filters
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters=[["name", "=", "User"], ["module", "=", "Core"]],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'",
|
|
)
|
|
|
|
# Test 3: OR filters with operators
|
|
query = "SELECT `name` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `module` IN ('Core','Custom')"
|
|
query = query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE")
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters={"name": ("like", "User%"), "module": ("in", ["Core", "Custom"])},
|
|
).get_sql(),
|
|
query,
|
|
)
|
|
|
|
# Test 4: Combining filters (AND) with or_filters (OR)
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"issingle": 0},
|
|
or_filters={"name": "User", "module": "Core"},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND (`name`='User' OR `module`='Core')",
|
|
)
|
|
|
|
# Test 5: Multiple AND filters with OR filters
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"issingle": 0, "custom": 0},
|
|
or_filters={"name": "User", "module": "Core"},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND `custom`=0 AND (`name`='User' OR `module`='Core')",
|
|
)
|
|
|
|
# Test 6: OR filters with simple list (name IN)
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
or_filters=["User", "Role", "Note"],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name` IN ('User','Role','Note')",
|
|
)
|
|
|
|
# Test 7: OR filters with greater than and less than
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters={"idx": (">", 5), "issingle": ("=", 1)},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `idx`>5 OR `issingle`=1",
|
|
)
|
|
|
|
# Test 8: OR filters with list including doctype
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters=[["DocType", "name", "=", "User"], ["DocType", "name", "=", "Role"]],
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `name`='Role'",
|
|
)
|
|
|
|
# Test 9: OR filters with != operator
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters={"name": ("!=", "User"), "module": ("!=", "Core")},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name`<>'User' OR `module`<>'Core'",
|
|
)
|
|
|
|
# Test 10: Empty or_filters should return query without OR conditions
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
filters={"custom": 0},
|
|
or_filters={},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `custom`=0",
|
|
)
|
|
|
|
# Test 11: OR filters with not in operator
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name"],
|
|
or_filters={"name": ("not in", ["User", "Role"]), "module": ("=", "Core")},
|
|
).get_sql(),
|
|
"SELECT `name` FROM `tabDocType` WHERE `name` NOT IN ('User','Role') OR `module`='Core'",
|
|
)
|
|
|
|
# Test 12: OR filters with mixed field types
|
|
query = (
|
|
"SELECT `name`,`module` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `issingle`=1 OR `custom`=0"
|
|
)
|
|
query = query = query.replace("LIKE", "ILIKE" if frappe.db.db_type == "postgres" else "LIKE")
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name", "module"],
|
|
or_filters=[
|
|
["name", "like", "User%"],
|
|
["issingle", "=", 1],
|
|
["custom", "=", 0],
|
|
],
|
|
).get_sql(),
|
|
query,
|
|
)
|
|
|
|
def test_nested_filters(self):
|
|
"""Test nested filter conditions with AND/OR logic."""
|
|
User = frappe.qb.DocType("User")
|
|
|
|
# Simple AND
|
|
filters_and = [
|
|
["email", "=", "admin@example.com"],
|
|
"and",
|
|
["first_name", "=", "Admin"],
|
|
]
|
|
expected_sql_and = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where((User.email == "admin@example.com") & (User.first_name == "Admin"))
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(frappe.qb.get_query("User", filters=filters_and).get_sql(), expected_sql_and)
|
|
|
|
# Simple OR
|
|
filters_or = [
|
|
["email", "=", "admin@example.com"],
|
|
"or",
|
|
["email", "=", "guest@example.com"],
|
|
]
|
|
expected_sql_or = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where((User.email == "admin@example.com") | (User.email == "guest@example.com"))
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(frappe.qb.get_query("User", filters=filters_or).get_sql(), expected_sql_or)
|
|
|
|
# Mixed AND/OR
|
|
filters_mixed = [
|
|
["first_name", "=", "Admin"],
|
|
"and",
|
|
[["email", "=", "admin@example.com"], "or", ["email", "=", "guest@example.com"]],
|
|
]
|
|
expected_sql_mixed = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where(
|
|
(User.first_name == "Admin")
|
|
& ((User.email == "admin@example.com") | (User.email == "guest@example.com"))
|
|
)
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(frappe.qb.get_query("User", filters=filters_mixed).get_sql(), expected_sql_mixed)
|
|
|
|
# Nested AND/OR
|
|
filters_nested = [
|
|
[["first_name", "=", "Admin"], "and", ["enabled", "=", 1]],
|
|
"or",
|
|
[["first_name", "=", "Guest"], "and", ["enabled", "=", 0]],
|
|
]
|
|
expected_sql_nested = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where(
|
|
((User.first_name == "Admin") & (User.enabled == 1))
|
|
| ((User.first_name == "Guest") & (User.enabled == 0))
|
|
)
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(frappe.qb.get_query("User", filters=filters_nested).get_sql(), expected_sql_nested)
|
|
|
|
# Single Grouped Condition (wrapped in extra list)
|
|
filters_single_group = [[["first_name", "=", "Admin"], "and", ["enabled", "=", 1]]]
|
|
expected_sql_single_group = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where((User.first_name == "Admin") & (User.enabled == 1))
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(
|
|
frappe.qb.get_query("User", filters=filters_single_group).get_sql(), expected_sql_single_group
|
|
)
|
|
|
|
# Test with different operators and values
|
|
filters_complex = [
|
|
["creation", ">", "2023-01-01"],
|
|
"and",
|
|
[
|
|
["email", "like", "%@example.com"],
|
|
"or",
|
|
[["first_name", "in", ["Admin", "Guest"]], "and", ["enabled", "!=", 1]],
|
|
],
|
|
]
|
|
expected_sql_complex = (
|
|
frappe.qb.from_(User)
|
|
.select(User.name)
|
|
.where(
|
|
(User.creation > "2023-01-01")
|
|
& (
|
|
(
|
|
User.email.ilike("%@example.com")
|
|
if frappe.db.db_type == "postgres"
|
|
else User.email.like("%@example.com")
|
|
)
|
|
| ((User.first_name.isin(["Admin", "Guest"])) & (User.enabled != 1))
|
|
)
|
|
)
|
|
.get_sql()
|
|
)
|
|
self.assertEqual(frappe.qb.get_query("User", filters=filters_complex).get_sql(), expected_sql_complex)
|
|
|
|
def test_invalid_nested_filters(self):
|
|
"""Test invalid formats for nested filters."""
|
|
# Invalid operator
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", filters=[["email", "=", "a"], "xor", ["email", "=", "b"]]).get_sql()
|
|
self.assertIn("Expected 'and' or 'or' operator", str(cm.exception))
|
|
|
|
# Missing condition after operator
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", filters=[["email", "=", "a"], "and"]).get_sql()
|
|
self.assertIn("Filter condition missing after operator", str(cm.exception))
|
|
|
|
# Starting with operator
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", filters=["and", ["email", "=", "a"]]).get_sql()
|
|
self.assertIn("Invalid start for filter condition", str(cm.exception))
|
|
|
|
# Invalid condition type (string instead of list/tuple)
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", filters=[["email", "=", "a"], "and", "enabled = 1"]).get_sql()
|
|
self.assertIn("Invalid filter condition", str(cm.exception))
|
|
|
|
# Malformed simple filter inside nested
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query(
|
|
"User", filters=[["email", "=", "a", "extra"], "and", ["enabled", "=", 1]]
|
|
).get_sql()
|
|
self.assertIn("Invalid simple filter format", str(cm.exception))
|
|
|
|
# Nested list doesn't start with a condition list/tuple
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", filters=["email", "and", ["enabled", "=", 1]]).get_sql()
|
|
self.assertIn("Invalid start for filter condition", str(cm.exception))
|
|
|
|
def test_implicit_join_query(self):
|
|
self.maxDiff = None
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": "Test Note Title"},
|
|
fields=["name", "`tabNote Seen By`.`user` as seen_by"],
|
|
).get_sql(),
|
|
"SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' WHERE `tabNote`.`name`='Test Note Title'",
|
|
)
|
|
|
|
# output doesn't contain parentfield condition because it can't be inferred
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": "Test Note Title"},
|
|
fields=["name", "`tabNote Seen By`.`user` as seen_by", "`tabNote Seen By`.`idx` as idx"],
|
|
).get_sql(),
|
|
"SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by`,`tabNote Seen By`.`idx` `idx` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' WHERE `tabNote`.`name`='Test Note Title'",
|
|
)
|
|
|
|
# output contains parentfield condition because it can be inferred by "seen_by.user"
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": "Test Note Title"},
|
|
fields=["name", "seen_by.user as seen_by", "`tabNote Seen By`.`idx` as idx"],
|
|
).get_sql(),
|
|
"SELECT `tabNote`.`name`,`tabNote Seen By`.`user` `seen_by`,`tabNote Seen By`.`idx` `idx` FROM `tabNote` LEFT JOIN `tabNote Seen By` ON `tabNote Seen By`.`parent`=`tabNote`.`name` AND `tabNote Seen By`.`parenttype`='Note' AND `tabNote Seen By`.`parentfield`='seen_by' WHERE `tabNote`.`name`='Test Note Title'",
|
|
)
|
|
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["name", "module.app_name as app_name"],
|
|
).get_sql(),
|
|
"SELECT `tabDocType`.`name`,`tabModule Def_module`.`app_name` `app_name` FROM `tabDocType` LEFT JOIN `tabModule Def` `tabModule Def_module` ON `tabModule Def_module`.`name`=`tabDocType`.`module`",
|
|
)
|
|
|
|
# fields now has strict validation, so this test is not valid anymore
|
|
# @run_only_if(db_type_is.MARIADB)
|
|
# def test_comment_stripping(self):
|
|
# self.assertNotIn(
|
|
# "email", frappe.qb.get_query("User", fields=["name", "#email"], filters={}).get_sql()
|
|
# )
|
|
|
|
def test_nestedset(self):
|
|
frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'")
|
|
frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`")
|
|
create_tree_docs()
|
|
descendants_result = frappe.qb.get_query(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("descendants of", "Parent 1")},
|
|
order_by="creation desc",
|
|
).run(as_list=1)
|
|
|
|
# Format decendants result
|
|
descendants_result = list(itertools.chain.from_iterable(descendants_result))
|
|
self.assertListEqual(descendants_result, get_descendants_of("Test Tree DocType", "Parent 1"))
|
|
|
|
ancestors_result = frappe.qb.get_query(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("ancestors of", "Child 2")},
|
|
order_by="creation desc",
|
|
).run(as_list=1)
|
|
|
|
# Format ancestors result
|
|
ancestors_result = list(itertools.chain.from_iterable(ancestors_result))
|
|
self.assertListEqual(ancestors_result, get_ancestors_of("Test Tree DocType", "Child 2"))
|
|
|
|
not_descendants_result = frappe.qb.get_query(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("not descendants of", "Parent 1")},
|
|
order_by="creation desc",
|
|
).run(as_dict=1)
|
|
|
|
self.assertListEqual(
|
|
not_descendants_result,
|
|
frappe.db.get_all(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("not descendants of", "Parent 1")},
|
|
),
|
|
)
|
|
|
|
not_ancestors_result = frappe.qb.get_query(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("not ancestors of", "Child 2")},
|
|
order_by="creation desc",
|
|
).run(as_dict=1)
|
|
|
|
self.assertListEqual(
|
|
not_ancestors_result,
|
|
frappe.db.get_all(
|
|
"Test Tree DocType",
|
|
fields=["name"],
|
|
filters={"name": ("not ancestors of", "Child 2")},
|
|
),
|
|
)
|
|
|
|
frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'")
|
|
frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`")
|
|
|
|
def test_child_field_syntax(self):
|
|
note1 = frappe.get_doc(doctype="Note", title="Note 1", seen_by=[{"user": "Administrator"}]).insert()
|
|
note2 = frappe.get_doc(
|
|
doctype="Note", title="Note 2", seen_by=[{"user": "Administrator"}, {"user": "Guest"}]
|
|
).insert()
|
|
|
|
result = frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": ["in", [note1.name, note2.name]]},
|
|
fields=["name", {"seen_by": ["*"]}],
|
|
order_by="title asc",
|
|
).run(as_dict=1)
|
|
|
|
self.assertTrue(isinstance(result[0].seen_by, list))
|
|
self.assertTrue(isinstance(result[1].seen_by, list))
|
|
self.assertEqual(len(result[0].seen_by), 1)
|
|
self.assertEqual(len(result[1].seen_by), 2)
|
|
self.assertEqual(result[0].seen_by[0].user, "Administrator")
|
|
|
|
result = frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": ["in", [note1.name, note2.name]]},
|
|
fields=["name", {"seen_by": ["user"]}],
|
|
order_by="title asc",
|
|
).run(as_dict=1)
|
|
|
|
self.assertEqual(len(result[0].seen_by[0].keys()), 1)
|
|
self.assertEqual(result[1].seen_by[1].user, "Guest")
|
|
|
|
note1.delete()
|
|
note2.delete()
|
|
|
|
def test_build_match_conditions(self):
|
|
from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype
|
|
|
|
clear_user_permissions_for_doctype("Test Blog Post", "test2@example.com")
|
|
|
|
test2user = frappe.get_doc("User", "test2@example.com")
|
|
test2user.add_roles("Blogger")
|
|
frappe.set_user("test2@example.com")
|
|
|
|
# Before any user permission is applied, there should be no conditions
|
|
query = frappe.qb.get_query("Test Blog Post", ignore_permissions=False)
|
|
self.assertNotIn("(`tabBlog Post`.`name` in (", str(query))
|
|
# Add user permissions
|
|
add_user_permission("Test Blog Post", "_Test Blog Post", "test2@example.com", True)
|
|
add_user_permission("Test Blog Post", "_Test Blog Post 1", "test2@example.com", True)
|
|
|
|
# After applying user permission, condition should be in query
|
|
query = str(frappe.qb.get_query("Test Blog Post", ignore_permissions=False))
|
|
|
|
# Check for user permission condition in the query string
|
|
if frappe.db.db_type == "mariadb":
|
|
self.assertIn("IFNULL(`name`,'')='' OR `name` IN ('_Test Blog Post 1','_Test Blog Post')", query)
|
|
elif frappe.db.db_type == "postgres":
|
|
self.assertIn(
|
|
"IFNULL(\"name\",'')='' OR \"name\" IN ('_Test Blog Post 1','_Test Blog Post')", query
|
|
) # works in pg due to `coalesce` sub during sql execution
|
|
|
|
frappe.set_user("Administrator")
|
|
clear_user_permissions_for_doctype("Test Blog Post", "test2@example.com")
|
|
test2user.remove_roles("Blogger")
|
|
|
|
def test_ignore_permissions_for_query(self):
|
|
frappe.set_user("test2@example.com")
|
|
|
|
with self.assertRaises(frappe.PermissionError):
|
|
frappe.qb.get_query("DocType", filters={"istable": 1}, ignore_permissions=False)
|
|
|
|
result = frappe.qb.get_query("DocType", filters={"istable": 1}, ignore_permissions=True).run()
|
|
self.assertTrue(len(result) > 0)
|
|
|
|
frappe.set_user("Administrator")
|
|
|
|
def test_permlevel_fields(self):
|
|
"""Test permission level check when querying fields"""
|
|
with setup_patched_blog_post(), setup_test_user(set_user=True):
|
|
# Create a test blog post
|
|
test_post = frappe.get_doc(
|
|
{
|
|
"doctype": "Test Blog Post",
|
|
"title": "Test Permission Post",
|
|
"content": "Test Content",
|
|
"blog_category": "_Test Blog Category",
|
|
"published": 1,
|
|
}
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
# Without proper permission, published field should be filtered out
|
|
data = frappe.qb.get_query(
|
|
"Test Blog Post",
|
|
filters={"name": test_post.name},
|
|
fields=["name", "published", "title"],
|
|
ignore_permissions=False,
|
|
).run(as_dict=1)
|
|
|
|
field_list = [field for d in data for field in d.keys()]
|
|
self.assertIn("title", field_list)
|
|
self.assertIn("name", field_list)
|
|
self.assertNotIn("published", field_list)
|
|
|
|
# With Administrator, all fields should be accessible
|
|
frappe.set_user("Administrator")
|
|
data = frappe.qb.get_query(
|
|
"Test Blog Post",
|
|
filters={"name": test_post.name},
|
|
fields=["name", "published", "title"],
|
|
ignore_permissions=False,
|
|
).run(as_dict=1)
|
|
|
|
field_list = [field for d in data for field in d.keys()]
|
|
self.assertIn("published", field_list)
|
|
|
|
test_post.delete()
|
|
|
|
def test_child_table_access_with_select_permission(self):
|
|
"""Test that child table fields are inaccessible if user only has select perm on parent."""
|
|
|
|
test_role = "Select Note Test Role"
|
|
test_user_email = "test2@example.com" # Use existing test user
|
|
test_note_title = "Child Select Test Note"
|
|
|
|
# Cleanup
|
|
frappe.set_user("Administrator")
|
|
test_user = frappe.get_doc("User", test_user_email)
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Note", {"title": test_note_title}, ignore_missing=True, force=True)
|
|
|
|
# Setup Role with 'select' on Note and 'read' on Note Seen By
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
# Grant select on Note, read on Note Seen By
|
|
add_permission("Note", test_role, 0, ptype="select")
|
|
add_permission("Note Seen By", test_role, 0, ptype="read")
|
|
# Ensure no read permission on Note for this role by explicitly setting it to 0
|
|
update_permission_property("Note", test_role, 0, "read", 0, validate=False)
|
|
test_user.add_roles(test_role)
|
|
|
|
note = frappe.get_doc(
|
|
doctype="Note", title=test_note_title, public=1, seen_by=[{"user": "Administrator"}]
|
|
).insert(ignore_permissions=True)
|
|
|
|
frappe.set_user(test_user_email)
|
|
query = frappe.qb.get_query(
|
|
"Note",
|
|
filters={"name": note.name},
|
|
fields=["name", {"seen_by": ["user"]}],
|
|
ignore_permissions=False,
|
|
)
|
|
result = query.run(as_dict=True)
|
|
|
|
self.assertEqual(len(result), 1, "Should find the note record")
|
|
self.assertIn("name", result[0], "Parent field 'name' should be accessible")
|
|
self.assertNotIn(
|
|
"seen_by",
|
|
result[0],
|
|
"Child table field 'seen_by' should NOT be accessible with only 'select' on parent",
|
|
)
|
|
|
|
# Cleanup
|
|
frappe.set_user("Administrator")
|
|
note.delete(ignore_permissions=True)
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, force=True)
|
|
|
|
def test_filter_with_select_permission_allows_permlevel_0_fields(self):
|
|
"""Test that users with only select permission can filter by all permlevel 0 fields."""
|
|
|
|
test_role = "SelectFilterTestRole"
|
|
test_user_email = "test2@example.com"
|
|
test_note_title = "Select Filter Test Note"
|
|
|
|
# Cleanup previous runs
|
|
frappe.set_user("Administrator")
|
|
test_user = frappe.get_doc("User", test_user_email)
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Note", {"title": test_note_title}, ignore_missing=True, force=True)
|
|
|
|
# Setup Role with only 'select' on Note (no read)
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission("Note", test_role, 0, ptype="select")
|
|
update_permission_property("Note", test_role, 0, "read", 0, validate=False)
|
|
test_user.add_roles(test_role)
|
|
|
|
# Create a test note with specific content
|
|
note = frappe.get_doc(
|
|
doctype="Note", title=test_note_title, content="Specific Content", public=1
|
|
).insert(ignore_permissions=True)
|
|
|
|
# Register cleanups in reverse order (LIFO) - Administrator restore must happen first
|
|
def cleanup():
|
|
frappe.set_user("Administrator")
|
|
frappe.delete_doc("Note", note.name, ignore_missing=True, force=True)
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
|
|
self.addCleanup(cleanup)
|
|
|
|
frappe.set_user(test_user_email)
|
|
|
|
# 'content' is a permlevel 0 field but NOT a search field
|
|
result = frappe.qb.get_query(
|
|
"Note",
|
|
filters={"content": "Specific Content"},
|
|
fields=["name"], # Only select 'name' which is allowed
|
|
ignore_permissions=False,
|
|
).run(as_dict=True)
|
|
self.assertEqual(len(result), 1, "Should find the note when filtering by permlevel 0 field")
|
|
self.assertEqual(result[0]["name"], note.name)
|
|
|
|
def test_core_doctype_filterable_fields_with_select_permission(self):
|
|
"""Core doctypes like User should allow filtering by any field when the user
|
|
only has select permission. Regression test for #37923."""
|
|
test_role = "CoreSelectTestRole"
|
|
test_user_email = "test2@example.com"
|
|
|
|
frappe.set_user("Administrator")
|
|
test_user = frappe.get_doc("User", test_user_email)
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission("User", test_role, 0, ptype="select")
|
|
update_permission_property("User", test_role, 0, "read", 0, validate=False)
|
|
test_user.add_roles(test_role)
|
|
|
|
def cleanup():
|
|
frappe.set_user("Administrator")
|
|
test_user.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
|
|
self.addCleanup(cleanup)
|
|
|
|
frappe.set_user(test_user_email)
|
|
|
|
# filter by user_type and enabled — the exact filters used by search_link for assignment
|
|
result = frappe.qb.get_query(
|
|
"User",
|
|
filters={"user_type": "System User", "enabled": 1},
|
|
fields=["name"],
|
|
ignore_permissions=False,
|
|
).run(as_dict=True)
|
|
self.assertTrue(len(result) > 0, "Should be able to filter User by user_type and enabled")
|
|
|
|
def test_nested_permission(self):
|
|
"""Test permission on nested doctypes"""
|
|
frappe.set_user("Administrator")
|
|
create_nested_doctype()
|
|
create_nested_doctype_records()
|
|
|
|
from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype
|
|
|
|
clear_user_permissions_for_doctype("Nested DocType")
|
|
|
|
# Add user permission for only one root folder
|
|
add_user_permission("Nested DocType", "Level 1 A", "test2@example.com")
|
|
|
|
from frappe.core.page.permission_manager.permission_manager import update
|
|
|
|
# To avoid if_owner filter
|
|
update("Nested DocType", "All", 0, "if_owner", 0)
|
|
|
|
test2user = frappe.get_doc("User", "test2@example.com")
|
|
test2user.add_roles("Blogger")
|
|
with self.set_user("test2@example.com"):
|
|
data = frappe.qb.get_query("Nested DocType", ignore_permissions=False).run(as_dict=1)
|
|
|
|
# Children of the permitted node should be accessible
|
|
self.assertTrue(any(d.name == "Level 2 A" for d in data))
|
|
|
|
# Other nodes should not be accessible
|
|
self.assertFalse(any(d.name == "Level 1 B" for d in data))
|
|
self.assertFalse(any(d.name == "Level 2 B" for d in data))
|
|
|
|
update("Nested DocType", "All", 0, "if_owner", 1) # Reset to default
|
|
|
|
def test_is_set_is_not_set(self):
|
|
"""Test is set and is not set filters"""
|
|
result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "not set"]}).run(as_dict=1)
|
|
self.assertTrue({"name": "Integration Request"} in result)
|
|
self.assertTrue({"name": "User"} in result)
|
|
self.assertFalse({"name": "Blogger"} in result)
|
|
|
|
result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1)
|
|
self.assertTrue({"name": "DocField"} in result)
|
|
self.assertTrue({"name": "Prepared Report"} in result)
|
|
self.assertFalse({"name": "Property Setter"} in result)
|
|
|
|
# Test with updating value to NULL
|
|
frappe.db.set_value("DocType", "Property Setter", "autoname", None, update_modified=False)
|
|
|
|
result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1)
|
|
self.assertFalse(any(d.name == "Property Setter" for d in result))
|
|
|
|
def test_permission_query_condition(self):
|
|
"""Test permission query condition being applied from hooks and server script"""
|
|
from frappe.desk.doctype.dashboard_settings.dashboard_settings import create_dashboard_settings
|
|
|
|
# Create a Dashboard Settings for test user
|
|
self.doctype = "Dashboard Settings"
|
|
self.user = "test@example.com"
|
|
|
|
original_hooks = frappe.get_hooks("permission_query_conditions") or {}
|
|
|
|
# Create test data
|
|
create_dashboard_settings(self.user)
|
|
|
|
# Hook condition will restrict to only name=Administrator, so our test user's record should not be found
|
|
query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False)
|
|
self.assertIn("`tabDashboard Settings`.name = ", str(query))
|
|
|
|
# Create a server script for permission query
|
|
script = frappe.new_doc(
|
|
doctype="Server Script",
|
|
name="Dashboard Settings Permission Query",
|
|
script_type="Permission Query",
|
|
enabled=1,
|
|
reference_doctype="Dashboard Settings",
|
|
script=f"""conditions = '`tabDashboard Settings`.`user` = "{self.user}"'""",
|
|
).insert()
|
|
|
|
# Test with server script
|
|
# Script condition should allow the record to be found
|
|
frappe.clear_cache()
|
|
frappe.hooks.permission_query_conditions = {} # Clear hooks to test server script alone
|
|
|
|
with enable_safe_exec():
|
|
query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False)
|
|
self.assertIn(f'`tabDashboard Settings`.`user` = "{self.user}"', str(query))
|
|
|
|
# Cleanup
|
|
script.delete()
|
|
frappe.clear_cache()
|
|
frappe.hooks.permission_query_conditions = original_hooks
|
|
|
|
def test_link_field_target_permission(self):
|
|
"""Test that accessing link_field.target_field respects target field's permlevel."""
|
|
target_dt_name = "TargetDocForLinkPerm"
|
|
source_dt_name = "SourceDocForLinkPerm"
|
|
test_role = "LinkPermTestRole"
|
|
test_user = "test2@example.com"
|
|
|
|
# Cleanup previous runs
|
|
frappe.set_user("Administrator")
|
|
frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
test_user_doc.remove_roles(test_role)
|
|
|
|
# Create Doctypes
|
|
target_dt = new_doctype(
|
|
target_dt_name,
|
|
fields=[
|
|
{"fieldname": "target_field", "fieldtype": "Data", "permlevel": 1, "label": "Target Field"},
|
|
{"fieldname": "other_target_field", "fieldtype": "Data", "label": "Other Target Field"},
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
source_dt = new_doctype(
|
|
source_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "link_field",
|
|
"fieldtype": "Link",
|
|
"options": target_dt_name,
|
|
"label": "Link Field",
|
|
}
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
# Create Records
|
|
target_doc = frappe.get_doc(
|
|
doctype=target_dt_name, target_field="Secret Data", other_target_field="Public Data"
|
|
).insert(ignore_permissions=True)
|
|
source_doc = frappe.get_doc(doctype=source_dt_name, link_field=target_doc.name).insert(
|
|
ignore_permissions=True
|
|
)
|
|
|
|
# Setup Role and Permissions
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission(source_dt_name, test_role, 0, ptype="read")
|
|
add_permission(target_dt_name, test_role, 0, ptype="read")
|
|
# Ensure no permlevel 1 read for test_role
|
|
update_permission_property(target_dt_name, test_role, 1, "read", 0, validate=False)
|
|
# Ensure System Manager can read permlevel 1
|
|
add_permission(target_dt_name, "System Manager", 1, ptype="read")
|
|
test_user_doc.add_roles(test_role)
|
|
|
|
# Test as the restricted user
|
|
frappe.set_user(test_user)
|
|
result_restricted = frappe.qb.get_query(
|
|
source_dt_name,
|
|
filters={"name": source_doc.name},
|
|
fields=[
|
|
"name",
|
|
"link_field.target_field as linked_secret",
|
|
"link_field.other_target_field as linked_public",
|
|
],
|
|
ignore_permissions=False,
|
|
).run(as_dict=True)
|
|
|
|
self.assertEqual(len(result_restricted), 1)
|
|
self.assertIn(
|
|
"linked_public",
|
|
result_restricted[0],
|
|
"Permlevel 0 target field should be accessible via link.",
|
|
)
|
|
self.assertNotIn(
|
|
"linked_secret",
|
|
result_restricted[0],
|
|
"Permlevel 1 target field should NOT be accessible via link for restricted user.",
|
|
)
|
|
|
|
# Test as Administrator (who has System Manager role)
|
|
frappe.set_user("Administrator")
|
|
result_admin = frappe.qb.get_query(
|
|
source_dt_name,
|
|
filters={"name": source_doc.name},
|
|
fields=[
|
|
"name",
|
|
"link_field.target_field as linked_secret",
|
|
"link_field.other_target_field as linked_public",
|
|
],
|
|
ignore_permissions=False, # Still check permissions, but Admin has them
|
|
).run(as_dict=True)
|
|
|
|
self.assertEqual(len(result_admin), 1)
|
|
self.assertIn(
|
|
"linked_public", result_admin[0], "Permlevel 0 target field should be accessible for Admin."
|
|
)
|
|
self.assertIn(
|
|
"linked_secret", result_admin[0], "Permlevel 1 target field should be accessible for Admin."
|
|
)
|
|
self.assertEqual(result_admin[0].linked_secret, "Secret Data")
|
|
|
|
# Cleanup
|
|
frappe.set_user("Administrator")
|
|
source_doc.delete(ignore_permissions=True)
|
|
target_doc.delete(ignore_permissions=True)
|
|
source_dt.delete()
|
|
target_dt.delete()
|
|
test_user_doc.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, force=True)
|
|
|
|
def test_filter_direct_field_permission(self):
|
|
"""Test that filtering is only allowed on permitted direct fields."""
|
|
with setup_patched_blog_post(), setup_test_user(set_user=True) as user:
|
|
# Create a test blog post
|
|
test_post = frappe.get_doc(
|
|
{
|
|
"doctype": "Test Blog Post",
|
|
"title": "Test Filter Permission Post",
|
|
"content": "Test Content",
|
|
"blog_category": "_Test Blog Category",
|
|
"published": 1, # permlevel 1
|
|
}
|
|
).insert(ignore_permissions=True, ignore_mandatory=True, ignore_if_duplicate=True)
|
|
|
|
# User has read permlevel 0, but not 1 (published field)
|
|
# Try filtering on permitted field (title - permlevel 0)
|
|
try:
|
|
frappe.qb.get_query(
|
|
"Test Blog Post",
|
|
filters={"title": test_post.title},
|
|
ignore_permissions=False,
|
|
user=user.name,
|
|
).run()
|
|
except frappe.PermissionError as e:
|
|
self.fail(f"Filtering on permitted field 'title' failed: {e}")
|
|
|
|
# Try filtering on non-permitted field (published - permlevel 1)
|
|
with self.assertRaises(frappe.PermissionError) as cm:
|
|
frappe.qb.get_query(
|
|
"Test Blog Post",
|
|
filters={"published": 1},
|
|
ignore_permissions=False,
|
|
user=user.name,
|
|
).run()
|
|
self.assertIn("You do not have permission to access field", str(cm.exception))
|
|
self.assertIn("Blog Post.published", str(cm.exception))
|
|
|
|
# Cleanup
|
|
frappe.set_user("Administrator")
|
|
test_post.delete()
|
|
|
|
def test_filter_linked_field_permission(self):
|
|
"""Test that filtering is only allowed on permitted linked fields."""
|
|
with setup_test_user(set_user=True) as user:
|
|
target_dt_name = "TargetDocForFilterPerm"
|
|
source_dt_name = "SourceDocForFilterPerm"
|
|
test_role = "FilterPermTestRole"
|
|
|
|
# Cleanup previous runs
|
|
frappe.set_user("Administrator")
|
|
frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
test_user_doc = frappe.get_doc("User", user.name)
|
|
test_user_doc.remove_roles(test_role)
|
|
|
|
# Create Doctypes
|
|
target_dt = new_doctype(
|
|
target_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "target_field",
|
|
"fieldtype": "Data",
|
|
"permlevel": 1,
|
|
"label": "Target Field",
|
|
},
|
|
{"fieldname": "other_target_field", "fieldtype": "Data", "label": "Other Target Field"},
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
source_dt = new_doctype(
|
|
source_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "link_field",
|
|
"fieldtype": "Link",
|
|
"options": target_dt_name,
|
|
"label": "Link Field",
|
|
}
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
# Create Records
|
|
target_doc = frappe.get_doc(
|
|
doctype=target_dt_name, target_field="Secret Data", other_target_field="Public Data"
|
|
).insert(ignore_permissions=True)
|
|
source_doc = frappe.get_doc(doctype=source_dt_name, link_field=target_doc.name).insert(
|
|
ignore_permissions=True
|
|
)
|
|
|
|
# Setup Role and Permissions
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission(source_dt_name, test_role, 0, ptype="read")
|
|
add_permission(target_dt_name, test_role, 0, ptype="read")
|
|
update_permission_property(
|
|
target_dt_name, test_role, 1, "read", 0, validate=False
|
|
) # No permlevel 1 read
|
|
test_user_doc.add_roles(test_role)
|
|
|
|
# Test as the restricted user
|
|
frappe.set_user(user.name)
|
|
|
|
# Try filtering on permitted linked field (other_target_field - permlevel 0)
|
|
try:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
filters={"link_field.other_target_field": "Public Data"},
|
|
ignore_permissions=False,
|
|
user=user.name,
|
|
).run()
|
|
except frappe.PermissionError as e:
|
|
self.fail(f"Filtering on permitted linked field 'link_field.other_target_field' failed: {e}")
|
|
|
|
# Try filtering on non-permitted linked field (target_field - permlevel 1)
|
|
with self.assertRaises(frappe.PermissionError) as cm_link:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
filters={"link_field.target_field": "Secret Data"},
|
|
ignore_permissions=False,
|
|
user=user.name,
|
|
).run()
|
|
self.assertIn("You do not have permission to access field", str(cm_link.exception))
|
|
self.assertIn(f"{target_dt_name}.target_field", str(cm_link.exception))
|
|
|
|
# Cleanup
|
|
frappe.set_user("Administrator")
|
|
source_doc.delete(ignore_permissions=True)
|
|
target_doc.delete(ignore_permissions=True)
|
|
source_dt.delete()
|
|
target_dt.delete()
|
|
test_user_doc.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, force=True)
|
|
|
|
def test_dynamic_fields_in_group_by(self):
|
|
"""Test dynamic field support in GROUP BY clause."""
|
|
try:
|
|
query = frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["module.app_name", "name"],
|
|
group_by="module.app_name, name",
|
|
)
|
|
result = query.run(as_dict=True)
|
|
self.assertTrue(len(result) > 0)
|
|
sql = query.get_sql()
|
|
self.assertIn("LEFT JOIN", sql)
|
|
self.assertIn("tabModule Def", sql)
|
|
except Exception as e:
|
|
self.fail(f"Dynamic link field in GROUP BY failed: {e}")
|
|
|
|
note = frappe.get_doc(
|
|
doctype="Note", title="Group By Test Note", seen_by=[{"user": "Administrator"}, {"user": "Guest"}]
|
|
).insert()
|
|
|
|
try:
|
|
query = frappe.qb.get_query(
|
|
"Note",
|
|
fields=["seen_by.user", "name"],
|
|
filters={"name": note.name},
|
|
group_by="seen_by.user, name",
|
|
)
|
|
result = query.run(as_dict=True)
|
|
self.assertTrue(len(result) >= 1)
|
|
sql = query.get_sql()
|
|
self.assertIn("LEFT JOIN", sql)
|
|
self.assertIn("tabNote Seen By", sql)
|
|
except Exception as e:
|
|
self.fail(f"Dynamic child field in GROUP BY failed: {e}")
|
|
finally:
|
|
note.delete()
|
|
|
|
def test_dynamic_fields_in_order_by(self):
|
|
"""Test dynamic field support in ORDER BY clause."""
|
|
try:
|
|
query = frappe.qb.get_query(
|
|
"DocType", fields=["name", "module.app_name"], order_by="module.app_name DESC", limit=5
|
|
)
|
|
result = query.run(as_dict=True)
|
|
self.assertTrue(len(result) > 0)
|
|
sql = query.get_sql()
|
|
self.assertIn("LEFT JOIN", sql)
|
|
self.assertIn("tabModule Def", sql)
|
|
self.assertIn("ORDER BY", sql)
|
|
except Exception as e:
|
|
self.fail(f"Dynamic link field in ORDER BY failed: {e}")
|
|
|
|
note1 = frappe.get_doc(
|
|
doctype="Note", title="Order Test Note 1", seen_by=[{"user": "Administrator"}]
|
|
).insert()
|
|
note2 = frappe.get_doc(
|
|
doctype="Note", title="Order Test Note 2", seen_by=[{"user": "Guest"}]
|
|
).insert()
|
|
|
|
try:
|
|
query = frappe.qb.get_query(
|
|
"Note",
|
|
fields=["name", "seen_by.user"],
|
|
filters={"name": ["in", [note1.name, note2.name]]},
|
|
order_by="seen_by.user ASC",
|
|
)
|
|
result = query.run(as_dict=True)
|
|
self.assertTrue(len(result) >= 2)
|
|
sql = query.get_sql()
|
|
self.assertIn("LEFT JOIN", sql)
|
|
self.assertIn("tabNote Seen By", sql)
|
|
except Exception as e:
|
|
self.fail(f"Dynamic child field in ORDER BY failed: {e}")
|
|
finally:
|
|
note1.delete()
|
|
note2.delete()
|
|
|
|
def test_multiple_dynamic_fields_group_order(self):
|
|
"""Test multiple dynamic fields in GROUP BY and ORDER BY."""
|
|
try:
|
|
query = frappe.qb.get_query(
|
|
"DocType",
|
|
fields=["module", "module.app_name", "name"],
|
|
group_by="module, module.app_name, name",
|
|
order_by="module.app_name",
|
|
)
|
|
result = query.run(as_dict=True)
|
|
self.assertTrue(len(result) > 0)
|
|
except Exception as e:
|
|
self.fail(f"Multiple dynamic fields in GROUP BY/ORDER BY failed: {e}")
|
|
|
|
def test_group_by_order_by_permission_checks(self):
|
|
"""Test permission checks for dynamic fields in GROUP BY and ORDER BY."""
|
|
target_dt_name = "TargetDocForGroupOrderPerm"
|
|
source_dt_name = "SourceDocForGroupOrderPerm"
|
|
test_role = "GroupOrderPermTestRole"
|
|
test_user = "test2@example.com"
|
|
|
|
frappe.set_user("Administrator")
|
|
frappe.delete_doc("DocType", target_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("DocType", source_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
test_user_doc.remove_roles(test_role)
|
|
|
|
target_dt = new_doctype(
|
|
target_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "restricted_field",
|
|
"fieldtype": "Data",
|
|
"permlevel": 1,
|
|
"label": "Restricted Field",
|
|
},
|
|
{"fieldname": "public_field", "fieldtype": "Data", "label": "Public Field"},
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
source_dt = new_doctype(
|
|
source_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "link_field",
|
|
"fieldtype": "Link",
|
|
"options": target_dt_name,
|
|
"label": "Link Field",
|
|
},
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission(source_dt_name, test_role, 0, ptype="read")
|
|
add_permission(target_dt_name, test_role, 0, ptype="read")
|
|
update_permission_property(target_dt_name, test_role, 1, "read", 0, validate=False)
|
|
test_user_doc.add_roles(test_role)
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
try:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
fields=["link_field.public_field", "name"],
|
|
group_by="link_field.public_field",
|
|
ignore_permissions=False,
|
|
user=test_user,
|
|
).get_sql()
|
|
except frappe.PermissionError as e:
|
|
self.fail(f"GROUP BY with permitted field failed: {e}")
|
|
|
|
with self.assertRaises(frappe.PermissionError) as cm:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
fields=["link_field.restricted_field", "name"],
|
|
group_by="link_field.restricted_field",
|
|
ignore_permissions=False,
|
|
user=test_user,
|
|
).get_sql()
|
|
self.assertIn("You do not have permission to access field", str(cm.exception))
|
|
self.assertIn("restricted_field", str(cm.exception))
|
|
|
|
try:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
fields=["name", "link_field.public_field"],
|
|
order_by="link_field.public_field",
|
|
ignore_permissions=False,
|
|
user=test_user,
|
|
).get_sql()
|
|
except frappe.PermissionError as e:
|
|
self.fail(f"ORDER BY with permitted field failed: {e}")
|
|
|
|
with self.assertRaises(frappe.PermissionError) as cm:
|
|
frappe.qb.get_query(
|
|
source_dt_name,
|
|
fields=["name"],
|
|
order_by="link_field.restricted_field",
|
|
ignore_permissions=False,
|
|
user=test_user,
|
|
).get_sql()
|
|
self.assertIn("You do not have permission to access field", str(cm.exception))
|
|
self.assertIn("restricted_field", str(cm.exception))
|
|
|
|
frappe.set_user("Administrator")
|
|
source_dt.delete()
|
|
target_dt.delete()
|
|
test_user_doc.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, force=True)
|
|
|
|
def test_child_table_group_by_order_by_permissions(self):
|
|
"""Test permission checks for child table fields in GROUP BY and ORDER BY."""
|
|
child_dt_name = "ChildDocForGroupOrderPerm"
|
|
parent_dt_name = "ParentDocForGroupOrderPerm"
|
|
test_role = "ChildGroupOrderPermTestRole"
|
|
test_user_email = "test2@example.com"
|
|
|
|
frappe.set_user("Administrator")
|
|
frappe.delete_doc("DocType", child_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("DocType", parent_dt_name, ignore_missing=True, force=True)
|
|
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
|
|
|
test_user_doc = frappe.get_doc("User", test_user_email)
|
|
test_user_doc.remove_roles(test_role)
|
|
|
|
child_dt = new_doctype(
|
|
child_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "restricted_child_field",
|
|
"fieldtype": "Data",
|
|
"permlevel": 1,
|
|
"label": "Restricted Child Field",
|
|
},
|
|
{"fieldname": "public_child_field", "fieldtype": "Data", "label": "Public Child Field"},
|
|
],
|
|
istable=1,
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
parent_dt = new_doctype(
|
|
parent_dt_name,
|
|
fields=[
|
|
{
|
|
"fieldname": "child_table",
|
|
"fieldtype": "Table",
|
|
"options": child_dt_name,
|
|
"label": "Child Table",
|
|
},
|
|
],
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
|
add_permission(parent_dt_name, test_role, 0, ptype="read")
|
|
add_permission(child_dt_name, test_role, 0, ptype="read")
|
|
update_permission_property(child_dt_name, test_role, 1, "read", 0, validate=False)
|
|
test_user_doc.add_roles(test_role)
|
|
|
|
frappe.set_user(test_user_email)
|
|
|
|
try:
|
|
frappe.qb.get_query(
|
|
parent_dt_name,
|
|
fields=["child_table.public_child_field", "name"],
|
|
group_by="child_table.public_child_field",
|
|
ignore_permissions=False,
|
|
user=test_user_email,
|
|
).get_sql()
|
|
except frappe.PermissionError as e:
|
|
self.fail(f"GROUP BY with permitted child field failed: {e}")
|
|
|
|
with self.assertRaises(frappe.PermissionError) as cm:
|
|
frappe.qb.get_query(
|
|
parent_dt_name,
|
|
fields=["child_table.restricted_child_field", "name"],
|
|
group_by="child_table.restricted_child_field",
|
|
ignore_permissions=False,
|
|
user=test_user_email,
|
|
).get_sql()
|
|
self.assertIn("You do not have permission to access field", str(cm.exception))
|
|
self.assertIn("restricted_child_field", str(cm.exception))
|
|
|
|
with self.assertRaises(frappe.PermissionError) as cm:
|
|
frappe.qb.get_query(
|
|
parent_dt_name,
|
|
fields=["name"],
|
|
order_by="child_table.restricted_child_field",
|
|
ignore_permissions=False,
|
|
user=test_user_email,
|
|
).get_sql()
|
|
self.assertIn("You do not have permission to access field", str(cm.exception))
|
|
self.assertIn("restricted_child_field", str(cm.exception))
|
|
|
|
frappe.set_user("Administrator")
|
|
parent_dt.delete()
|
|
child_dt.delete()
|
|
test_user_doc.remove_roles(test_role)
|
|
frappe.delete_doc("Role", test_role, force=True)
|
|
|
|
def test_group_by_order_by_validation_errors(self):
|
|
"""Test validation errors for invalid GROUP BY and ORDER BY fields."""
|
|
invalid_group_by_fields = [
|
|
"name; DROP TABLE users",
|
|
"name--comment",
|
|
"name /* comment */",
|
|
"invalid-field-name",
|
|
"field with space",
|
|
"`field with space`",
|
|
"name, email; SELECT 1",
|
|
]
|
|
|
|
for field in invalid_group_by_fields:
|
|
with self.assertRaises(
|
|
frappe.ValidationError, msg=f"Invalid GROUP BY field '{field}' passed validation"
|
|
):
|
|
frappe.qb.get_query("User", group_by=field).get_sql()
|
|
|
|
invalid_order_by_fields = [
|
|
"name sideways",
|
|
"name INVALID_DIRECTION",
|
|
"name ASC;",
|
|
"name, email; SELECT 1",
|
|
]
|
|
|
|
for field in invalid_order_by_fields:
|
|
with self.assertRaises(
|
|
(frappe.ValidationError, ValueError),
|
|
msg=f"Invalid ORDER BY field '{field}' passed validation",
|
|
):
|
|
frappe.qb.get_query("User", order_by=field).get_sql()
|
|
|
|
def test_backtick_rejection_group_order(self):
|
|
"""Test that malformed backticks are properly rejected in GROUP BY and ORDER BY."""
|
|
# Test single backtick (invalid notation - should be `tabTable`.`field`)
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", group_by="`name`").get_sql()
|
|
self.assertIn("invalid backtick notation", str(cm.exception))
|
|
|
|
# Test single backtick with direction (invalid notation)
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", order_by="`name` ASC").get_sql()
|
|
self.assertIn("invalid backtick notation", str(cm.exception))
|
|
|
|
# Test multiple single backticks (invalid notation)
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", group_by="`name`, `email`").get_sql()
|
|
self.assertIn("invalid backtick notation", str(cm.exception))
|
|
|
|
# Valid backtick notation should work
|
|
frappe.qb.get_query("User", group_by="`tabUser`.`name`").get_sql()
|
|
frappe.qb.get_query("User", order_by="`tabUser`.`name` ASC").get_sql()
|
|
|
|
def test_sql_functions_in_fields(self):
|
|
"""Test SQL function support in fields with various syntaxes."""
|
|
|
|
# Test simple function without alias
|
|
query = frappe.qb.get_query("User", fields=["user_type", {"COUNT": "name"}], group_by="user_type")
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("COUNT(`name`)"), sql)
|
|
self.assertIn("GROUP BY", sql)
|
|
|
|
# Test function with alias
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"COUNT": "name", "as": "total_users"}], group_by="user_type"
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("COUNT(`name`) `total_users`"), sql)
|
|
|
|
# Test SUM function with alias
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"SUM": "enabled", "as": "total_enabled"}], group_by="user_type"
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("SUM(`enabled`) `total_enabled`"), sql)
|
|
|
|
# Test MAX function
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"MAX": "creation", "as": "latest_user"}], group_by="user_type"
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("MAX(`creation`) `latest_user`"), sql)
|
|
|
|
# Test MIN function
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"MIN": "creation", "as": "earliest_user"}], group_by="user_type"
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("MIN(`creation`) `earliest_user`"), sql)
|
|
|
|
# Test AVG function
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"AVG": "enabled", "as": "avg_enabled"}], group_by="user_type"
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("AVG(`enabled`) `avg_enabled`"), sql)
|
|
|
|
# Test ABS function
|
|
query = frappe.qb.get_query("User", fields=[{"ABS": "enabled", "as": "abs_enabled"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("ABS(`enabled`) `abs_enabled`"), sql)
|
|
|
|
# Test IFNULL function with two parameters
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"IFNULL": ["first_name", "'Unknown'"], "as": "safe_name"}]
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(
|
|
self.normalize_sql("IFNULL(`first_name`,'Unknown') `safe_name`"), self.normalize_sql(sql)
|
|
)
|
|
|
|
# Test TIMESTAMP function
|
|
query = frappe.qb.get_query("User", fields=[{"TIMESTAMP": "creation", "as": "ts"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("TIMESTAMP(`creation`) `ts`"), self.normalize_sql(sql))
|
|
|
|
# Test mixed regular fields and function fields
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=[
|
|
"user_type",
|
|
{"COUNT": "name", "as": "total_users"},
|
|
{"MAX": "creation", "as": "latest_creation"},
|
|
],
|
|
group_by="user_type",
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("`user_type`"), sql)
|
|
self.assertIn(self.normalize_sql("COUNT(`name`) `total_users`"), sql)
|
|
self.assertIn(self.normalize_sql("MAX(`creation`) `latest_creation`"), sql)
|
|
|
|
# Test NOW function with no arguments
|
|
query = frappe.qb.get_query("User", fields=[{"NOW": None, "as": "current_time"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("NOW() `current_time`"), sql)
|
|
|
|
# Test CONCAT function (which is supported)
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"CONCAT": ["first_name", "last_name"], "as": "full_name"}]
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(
|
|
self.normalize_sql("CONCAT(`first_name`,`last_name`) `full_name`"), self.normalize_sql(sql)
|
|
)
|
|
|
|
# Test unsupported function validation
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"UNSUPPORTED_FUNC": "name"}]).get_sql()
|
|
self.assertIn("Unsupported function or operator: UNSUPPORTED_FUNC", str(cm.exception))
|
|
|
|
# Test unsupported function that might be confused with child field
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"UPPER": ["first_name"]}]).get_sql()
|
|
self.assertIn("Unsupported function or operator: UPPER", str(cm.exception))
|
|
|
|
# Test SQL injection attempt
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"DROP": "TABLE users"}]).get_sql()
|
|
self.assertIn("Unsupported function or operator: DROP", str(cm.exception))
|
|
|
|
def test_arithmetic_operators_in_fields(self):
|
|
"""Test arithmetic operator support in fields."""
|
|
|
|
# Test simple addition
|
|
query = frappe.qb.get_query("User", fields=[{"ADD": [1, 2], "as": "sum_result"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("1+2 `sum_result`"), sql)
|
|
|
|
# Test simple subtraction
|
|
query = frappe.qb.get_query("User", fields=[{"SUB": [10, 5], "as": "diff_result"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("10-5 `diff_result`"), sql)
|
|
|
|
# Test simple multiplication
|
|
query = frappe.qb.get_query("User", fields=[{"MUL": [3, 4], "as": "prod_result"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("3*4 `prod_result`"), sql)
|
|
|
|
# Test simple division
|
|
query = frappe.qb.get_query("User", fields=[{"DIV": [10, 2], "as": "div_result"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("10/2 `div_result`"), sql)
|
|
|
|
# Test operator with field names
|
|
query = frappe.qb.get_query("User", fields=[{"ADD": ["enabled", "login_after"], "as": "field_sum"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("`enabled`+`login_after` `field_sum`"), sql)
|
|
|
|
# Test nested operators
|
|
query = frappe.qb.get_query("User", fields=[{"ADD": [{"MUL": [2, 3]}, 4], "as": "nested_result"}])
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("2*3+4 `nested_result`"), sql)
|
|
|
|
# Test operator with function - NULLIF
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "safe_div"}]
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("1/NULLIF(`enabled`,0) `safe_div`"), self.normalize_sql(sql))
|
|
|
|
# Test complex nested expression: (1 / NULLIF(value, 0))
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=[
|
|
"name",
|
|
{"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "inverse"},
|
|
],
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("`name`"), sql)
|
|
self.assertIn(self.normalize_sql("1/NULLIF(`enabled`,0) `inverse`"), self.normalize_sql(sql))
|
|
|
|
# Test operator with LOCATE function (search relevance pattern)
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=[
|
|
"name",
|
|
{"DIV": [1, {"NULLIF": [{"LOCATE": ["'test'", "name"]}, 0]}], "as": "relevance"},
|
|
],
|
|
)
|
|
sql = query.get_sql()
|
|
if frappe.db.db_type == "mariadb":
|
|
self.assertIn(
|
|
self.normalize_sql("1/NULLIF(LOCATE('test',`name`),0) `relevance`"),
|
|
self.normalize_sql(sql),
|
|
)
|
|
elif frappe.db.db_type == "postgres":
|
|
self.assertIn(
|
|
self.normalize_sql("1/NULLIF(STRPOS(`name`,'test'),0) `relevance`"),
|
|
self.normalize_sql(sql),
|
|
)
|
|
elif frappe.db.db_type == "sqlite":
|
|
self.assertIn(
|
|
self.normalize_sql("1/NULLIF(INSTR(`name`,'test'),0) `relevance`"),
|
|
self.normalize_sql(sql),
|
|
)
|
|
|
|
# Test multiple operators in fields
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=[
|
|
"name",
|
|
{"ADD": ["enabled", 1], "as": "enabled_plus_one"},
|
|
{"MUL": ["enabled", 2], "as": "enabled_times_two"},
|
|
],
|
|
)
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("`name`"), sql)
|
|
self.assertIn(self.normalize_sql("`enabled`+1 `enabled_plus_one`"), sql)
|
|
self.assertIn(self.normalize_sql("`enabled`*2 `enabled_times_two`"), sql)
|
|
|
|
# Test operator without alias
|
|
query = frappe.qb.get_query("User", fields=[{"ADD": [1, 1]}])
|
|
sql = query.get_sql()
|
|
self.assertIn("1+1", sql)
|
|
|
|
# Test validation: operator requires exactly 2 arguments
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"ADD": [1, 2, 3]}]).get_sql()
|
|
self.assertIn("requires exactly 2 arguments", str(cm.exception))
|
|
|
|
# Test validation: operator with only 1 argument
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"DIV": [10]}]).get_sql()
|
|
self.assertIn("requires exactly 2 arguments", str(cm.exception))
|
|
|
|
# Test validation: operator with non-list arguments
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"MUL": "invalid"}]).get_sql()
|
|
self.assertIn("requires exactly 2 arguments", str(cm.exception))
|
|
|
|
# Test validation: unsupported operator
|
|
with self.assertRaises(frappe.ValidationError) as cm:
|
|
frappe.qb.get_query("User", fields=[{"XOR": [1, 2]}]).get_sql()
|
|
self.assertIn("Unsupported function or operator: XOR", str(cm.exception))
|
|
|
|
# Test deeply nested expression
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=[
|
|
{
|
|
"DIV": [
|
|
{"ADD": [{"MUL": [2, 3]}, 4]},
|
|
{"SUB": [10, 5]},
|
|
],
|
|
"as": "complex_expr",
|
|
}
|
|
],
|
|
)
|
|
sql = query.get_sql()
|
|
# PyPika adds parentheses for clarity in complex expressions
|
|
self.assertIn("complex_expr", sql)
|
|
self.assertIn("/", sql)
|
|
|
|
def test_not_equal_condition_on_none(self):
|
|
self.assertQueryEqual(
|
|
frappe.qb.get_query(
|
|
"DocType",
|
|
["*"],
|
|
[
|
|
["DocField", "name", "=", None],
|
|
["DocType", "parent", "!=", None],
|
|
],
|
|
).get_sql(),
|
|
"SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` IS NULL AND `tabDocType`.`parent`<>''",
|
|
)
|
|
|
|
def test_field_alias_in_group_by(self):
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["creation as created_date", {"COUNT": "*"}],
|
|
group_by="created_date",
|
|
order_by="created_date",
|
|
)
|
|
|
|
sql = query.get_sql()
|
|
self.assertIn(self.normalize_sql("GROUP BY `created_date`"), self.normalize_sql(sql))
|
|
if (
|
|
frappe.db.db_type != "postgres"
|
|
): # since Postgres requires fields in Order by to be grouped or aggregated, order by is dropped
|
|
self.assertIn(self.normalize_sql("ORDER BY `created_date`"), self.normalize_sql(sql))
|
|
self.assertIn(self.normalize_sql("`creation` `created_date`"), self.normalize_sql(sql))
|
|
|
|
def test_field_alias_permission_check(self):
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["creation as created_date", {"COUNT": "*"}],
|
|
group_by="created_date",
|
|
)
|
|
sql = query.get_sql()
|
|
# If we get here without PermissionError, the test passes
|
|
self.assertIn(self.normalize_sql("GROUP BY `created_date`"), self.normalize_sql(sql))
|
|
|
|
def test_between_datetime_expansion(self):
|
|
"""Test that date strings are expanded to datetime ranges for Datetime fields with 'between' operator"""
|
|
# Test with creation field (standard datetime field)
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
filters={"creation": ["between", ["2025-12-01", "2025-12-01"]]},
|
|
)
|
|
sql = query.get_sql()
|
|
# Date strings should be expanded to datetime ranges
|
|
self.assertIn("2025-12-01 00:00:00", sql)
|
|
self.assertIn("2025-12-01 23:59:59", sql)
|
|
|
|
def test_timespan_datetime_expansion(self):
|
|
"""Test that timespan operator expands dates to datetime ranges for Datetime fields"""
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
filters={"creation": ["timespan", "last 7 days"]},
|
|
)
|
|
sql = query.get_sql()
|
|
# Timespan should expand dates to datetime ranges (start of first day, end of last day)
|
|
# Should have times like 00:00:00 and 23:59:59
|
|
self.assertIn("00:00:00", sql)
|
|
self.assertIn("23:59:59", sql)
|
|
|
|
def test_share_only_access(self):
|
|
"""Test that shared docs grant access when user has no role permissions."""
|
|
import frappe.share
|
|
|
|
test_user = "test2@example.com"
|
|
|
|
# Create a private event (only owner can see by default)
|
|
event = frappe.get_doc(
|
|
doctype="Event",
|
|
subject="Share Only Test Event",
|
|
starts_on="2025-01-01 10:00:00",
|
|
event_type="Private",
|
|
).insert()
|
|
|
|
self.addCleanup(event.delete)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
# Verify user can't access without share
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
|
|
self.assertEqual(len(result), 0, "User should not see event without share")
|
|
|
|
# Share the document
|
|
frappe.set_user("Administrator")
|
|
frappe.share.add("Event", event.name, test_user)
|
|
|
|
# Now user should be able to access via share
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query("Event", filters={"name": event.name}, ignore_permissions=False).run()
|
|
self.assertEqual(len(result), 1, "User should see event via share")
|
|
|
|
def test_if_owner_constraint_with_shared_docs(self):
|
|
"""Test that shared docs trump if_owner constraint."""
|
|
import frappe.share
|
|
from frappe.core.page.permission_manager.permission_manager import update
|
|
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
test_user_doc.add_roles("Blogger")
|
|
|
|
# Create blog post owned by Administrator
|
|
blog_post = frappe.get_doc(
|
|
doctype="Test Blog Post",
|
|
title="If Owner Test Post",
|
|
content="Test Content",
|
|
blog_category="_Test Blog Category",
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
# Enable if_owner constraint for Test Blog Post
|
|
update("Test Blog Post", "Blogger", 0, "if_owner", 1)
|
|
|
|
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
|
|
self.addCleanup(blog_post.delete)
|
|
self.addCleanup(lambda: update("Test Blog Post", "Blogger", 0, "if_owner", 0))
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
# User shouldn't see it (not owner, if_owner enabled)
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False
|
|
).run()
|
|
self.assertEqual(len(result), 0, "User should not see post owned by others with if_owner")
|
|
|
|
# Share the document
|
|
frappe.set_user("Administrator")
|
|
frappe.share.add("Test Blog Post", blog_post.name, test_user)
|
|
|
|
# Now user should see it via share (shared docs trump if_owner)
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post", filters={"name": blog_post.name}, ignore_permissions=False
|
|
).run()
|
|
self.assertEqual(len(result), 1, "User should see post via share despite if_owner")
|
|
|
|
def test_user_permission_with_shared_docs(self):
|
|
"""Test that shared docs grant access even when user permission doesn't match."""
|
|
import frappe.share
|
|
from frappe.permissions import add_user_permission, clear_user_permissions_for_doctype
|
|
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
test_user_doc.add_roles("Blogger")
|
|
|
|
# Create two blog posts
|
|
blog_post1 = frappe.get_doc(
|
|
doctype="Test Blog Post",
|
|
title="User Perm Test Post 1",
|
|
content="Test Content",
|
|
blog_category="_Test Blog Category",
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
blog_post2 = frappe.get_doc(
|
|
doctype="Test Blog Post",
|
|
title="User Perm Test Post 2",
|
|
content="Test Content",
|
|
blog_category="_Test Blog Category",
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
clear_user_permissions_for_doctype("Test Blog Post", test_user)
|
|
|
|
# Add user permission for only post1
|
|
add_user_permission("Test Blog Post", blog_post1.name, test_user, True)
|
|
|
|
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
|
|
self.addCleanup(blog_post2.delete)
|
|
self.addCleanup(blog_post1.delete)
|
|
self.addCleanup(lambda: clear_user_permissions_for_doctype("Test Blog Post", test_user))
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
# User should see post1 via user permission
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post", filters={"name": blog_post1.name}, ignore_permissions=False
|
|
).run()
|
|
self.assertEqual(len(result), 1, "User should see post1 via user permission")
|
|
|
|
# User should NOT see post2 (no user permission)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False
|
|
).run()
|
|
self.assertEqual(len(result), 0, "User should not see post2 without user permission")
|
|
|
|
# Share post2 with user
|
|
frappe.set_user("Administrator")
|
|
frappe.share.add("Test Blog Post", blog_post2.name, test_user)
|
|
|
|
# Now user should see post2 via share (shared docs trump user permissions)
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post", filters={"name": blog_post2.name}, ignore_permissions=False
|
|
).run()
|
|
self.assertEqual(len(result), 1, "User should see post2 via share")
|
|
|
|
def test_role_permission_without_restrictions(self):
|
|
"""Test that all documents are accessible when role permissions exist without if_owner/user_perms."""
|
|
from frappe.core.page.permission_manager.permission_manager import update
|
|
from frappe.permissions import clear_user_permissions_for_doctype
|
|
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
test_user_doc.add_roles("Blogger")
|
|
|
|
# Clear any user permissions
|
|
clear_user_permissions_for_doctype("Test Blog Post", test_user)
|
|
|
|
# Ensure if_owner is disabled
|
|
update("Test Blog Post", "Blogger", 0, "if_owner", 0)
|
|
|
|
# Create blog posts owned by Administrator
|
|
blog_post1 = frappe.get_doc(
|
|
doctype="Test Blog Post",
|
|
title="No Restriction Test 1",
|
|
content="Test Content",
|
|
blog_category="_Test Blog Category",
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
blog_post2 = frappe.get_doc(
|
|
doctype="Test Blog Post",
|
|
title="No Restriction Test 2",
|
|
content="Test Content",
|
|
blog_category="_Test Blog Category",
|
|
).insert(ignore_permissions=True, ignore_mandatory=True)
|
|
|
|
self.addCleanup(lambda: test_user_doc.remove_roles("Blogger"))
|
|
self.addCleanup(blog_post2.delete)
|
|
self.addCleanup(blog_post1.delete)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
# User should see both posts (no restrictions)
|
|
frappe.set_user(test_user)
|
|
result = frappe.qb.get_query(
|
|
"Test Blog Post",
|
|
filters={"name": ["in", [blog_post1.name, blog_post2.name]]},
|
|
ignore_permissions=False,
|
|
).run()
|
|
self.assertEqual(len(result), 2, "User should see all posts without restrictions")
|
|
|
|
def test_child_table_permission_uses_parent_doctype(self):
|
|
"""Test that child table queries use parent doctype for permission checks."""
|
|
# DocField is a child table of DocType
|
|
# When querying with parent_doctype, permissions should be checked against DocType
|
|
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
self.ensure_system_manager(test_user_doc, should_have=False)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
# Query child table with parent_doctype - should use DocType's permissions
|
|
with self.assertRaises(frappe.PermissionError):
|
|
frappe.qb.get_query(
|
|
"DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False
|
|
).run()
|
|
|
|
# Give user read access to DocType
|
|
frappe.set_user("Administrator")
|
|
test_user_doc.add_roles("System Manager")
|
|
|
|
frappe.set_user(test_user)
|
|
# Now query should succeed
|
|
result = frappe.qb.get_query(
|
|
"DocField", fields=["name"], parent_doctype="DocType", ignore_permissions=False, limit=1
|
|
).run()
|
|
# Query should succeed and return results (tuple or list)
|
|
self.assertTrue(len(result) >= 0, "Query should succeed with proper permissions")
|
|
|
|
def test_child_table_filters_orphaned_rows(self):
|
|
"""Test that child table queries filter out orphaned rows (rows without valid parent)."""
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
self.ensure_system_manager(test_user_doc, should_have=True)
|
|
|
|
# Create a child table row with non-existent parent
|
|
frappe.db.sql(
|
|
"""
|
|
INSERT INTO `tabDefaultValue` (name, parent, parenttype, parentfield, defkey, defvalue)
|
|
VALUES ('_test_orphan_row', '_non_existent_parent', 'User', 'defaults', 'test_key', 'test_value')
|
|
"""
|
|
)
|
|
self.addCleanup(
|
|
lambda: frappe.db.sql("DELETE FROM `tabDefaultValue` WHERE name = '_test_orphan_row'")
|
|
)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
# Query with parent_doctype - orphaned row should be filtered out by inner join
|
|
result = frappe.qb.get_query(
|
|
"DefaultValue",
|
|
fields=["name"],
|
|
filters={"name": "_test_orphan_row"},
|
|
parent_doctype="User",
|
|
ignore_permissions=False,
|
|
).run()
|
|
self.assertEqual(len(result), 0, "Orphaned child row should be filtered out")
|
|
|
|
def test_child_table_of_single_doctype(self):
|
|
"""Test querying child tables whose parent is a Single doctype.
|
|
|
|
Single doctypes don't have physical tables, so we can't join to them.
|
|
This tests that the query works correctly without the join.
|
|
"""
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
self.ensure_system_manager(test_user_doc, should_have=True)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
# Log Settings is a Single doctype with child table "Logs To Clear"
|
|
# Query should work without trying to join the non-existent parent table
|
|
result = frappe.qb.get_query(
|
|
"Logs To Clear",
|
|
fields=["name", "ref_doctype", "days"],
|
|
parent_doctype="Log Settings",
|
|
ignore_permissions=False,
|
|
).run()
|
|
|
|
# Query should succeed (may return empty if no logs configured)
|
|
self.assertIsInstance(result, (list, tuple), "Query should return results without SQL error")
|
|
|
|
def test_child_table_of_single_doctype_without_permission(self):
|
|
"""Test that permission checks work for child tables of Single doctypes."""
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
self.ensure_system_manager(test_user_doc, should_have=False)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
# User without System Manager role should not be able to access Log Settings children
|
|
with self.assertRaises(frappe.PermissionError):
|
|
frappe.qb.get_query(
|
|
"Logs To Clear",
|
|
fields=["name"],
|
|
parent_doctype="Log Settings",
|
|
ignore_permissions=False,
|
|
).run()
|
|
|
|
def test_combined_raw_criterion_precedence(self):
|
|
"""Test that CombinedRawCriterion properly groups OR conditions.
|
|
|
|
When permission conditions (like permission_query_conditions) are combined with
|
|
shared docs via OR, the entire expression must be wrapped in parentheses to
|
|
ensure correct operator precedence with other WHERE filters.
|
|
|
|
Without proper grouping:
|
|
WHERE filter=X AND perm_cond OR shared_cond -- shared_cond ignores filter!
|
|
|
|
With proper grouping:
|
|
WHERE filter=X AND (perm_cond OR shared_cond) -- correct behavior
|
|
"""
|
|
from frappe.database.query import CombinedRawCriterion, RawCriterion
|
|
|
|
# Test that CombinedRawCriterion wraps the entire expression
|
|
left = RawCriterion("a = 1")
|
|
right = RawCriterion("b = 2")
|
|
combined = left | right
|
|
|
|
self.assertIsInstance(combined, CombinedRawCriterion)
|
|
sql = combined.get_sql()
|
|
# Should have outer parentheses: ((a = 1) OR (b = 2))
|
|
self.assertTrue(sql.startswith("(("), f"Should start with '((' but got: {sql}")
|
|
self.assertTrue(sql.endswith("))"), f"Should end with '))' but got: {sql}")
|
|
|
|
# Test nested combinations
|
|
third = RawCriterion("c = 3")
|
|
nested = combined & third
|
|
nested_sql = nested.get_sql()
|
|
# The AND combination should also be properly grouped
|
|
self.assertIn("OR", nested_sql)
|
|
self.assertIn("AND", nested_sql)
|
|
|
|
def test_permission_query_conditions_with_filter(self):
|
|
"""Test that filters work correctly when permission_query_conditions and shares exist.
|
|
|
|
This is a regression test for the CombinedRawCriterion fix - ensures that
|
|
explicit filters are not bypassed by shared doc conditions.
|
|
"""
|
|
test_user = "test2@example.com"
|
|
test_user_doc = frappe.get_doc("User", test_user)
|
|
self.ensure_system_manager(test_user_doc, should_have=True)
|
|
self.addCleanup(lambda: frappe.set_user("Administrator"))
|
|
|
|
frappe.set_user(test_user)
|
|
|
|
# User doctype has permission_query_conditions hook
|
|
# test2@example.com is shared their own User doc
|
|
# Query with a filter that should NOT match any row
|
|
result = frappe.qb.get_query(
|
|
"User",
|
|
fields=["name"],
|
|
filters={"name": "_non_existent_user_12345"},
|
|
ignore_permissions=False,
|
|
).run()
|
|
|
|
# Even though user has shared access to their own User doc,
|
|
# the filter should still apply and return no results
|
|
self.assertEqual(len(result), 0, "Filter should not be bypassed by shared doc OR condition")
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
def test_order_by_group_by_postgres(self):
|
|
"""PostgreSQL specific test that tests if order_by fields are correctly handled when used with group_by"""
|
|
# test order by fields already in group by (no aggregate needed)
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["creation as created_date", {"COUNT": "*"}],
|
|
group_by="created_date",
|
|
order_by="created_date",
|
|
).get_sql()
|
|
|
|
self.assertQueryEqual(
|
|
query,
|
|
'SELECT "creation" "created_date",COUNT(*) FROM "tabUser" GROUP BY "created_date" ORDER BY "created_date" DESC',
|
|
)
|
|
|
|
# test order by fields not in group by (aggregate needed)
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["creation as created_date", {"COUNT": "*"}],
|
|
group_by="created_date",
|
|
order_by="name",
|
|
).get_sql()
|
|
|
|
self.assertQueryEqual(
|
|
query,
|
|
'SELECT "creation" "created_date",COUNT(*) FROM "tabUser" GROUP BY "created_date" ORDER BY MAX("name") DESC',
|
|
)
|
|
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["user_type as type", "enabled as status", {"COUNT": "*"}],
|
|
group_by="type, status",
|
|
order_by="status asc",
|
|
).get_sql()
|
|
|
|
self.assertQueryEqual(
|
|
query,
|
|
'SELECT "user_type" "type","enabled" "status",COUNT(*) FROM "tabUser" GROUP BY "type","status" ORDER BY "status" ASC',
|
|
)
|
|
|
|
# test no double aggregation rule
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["creation", {"COUNT": "*", "as": "total"}],
|
|
group_by="creation",
|
|
order_by="total desc",
|
|
).get_sql()
|
|
|
|
self.assertQueryEqual(
|
|
query,
|
|
'SELECT "creation",COUNT(*) "total" FROM "tabUser" GROUP BY "creation" ORDER BY "total" DESC',
|
|
)
|
|
|
|
# test multiple order_by fields not in group_by
|
|
query = frappe.qb.get_query(
|
|
"User",
|
|
fields=["user_type", {"COUNT": "*"}],
|
|
group_by="user_type",
|
|
order_by="creation desc, modified asc",
|
|
).get_sql()
|
|
|
|
self.assertIn('MAX("creation") DESC', query)
|
|
self.assertIn('MAX("modified") ASC', query)
|
|
|
|
# for queries that have aggregate fields selected but not grouped (these queries are redundant but exist in some parts of codebase)
|
|
query = frappe.qb.get_query(
|
|
"User", fields=[{"COUNT": "*", "as": "result"}], order_by="creation desc"
|
|
).get_sql()
|
|
|
|
self.assertQueryEqual(query, 'SELECT COUNT(*) "result" FROM "tabUser" ORDER BY MAX("creation") DESC')
|
|
|
|
# test in case user uses `original_col` name instead of alias
|
|
query = frappe.qb.get_query(
|
|
"User", fields=["name as user_name"], group_by="user_name", order_by="user_name"
|
|
)
|
|
a = query.run()
|
|
|
|
query = frappe.qb.get_query("User", fields=["name as user_name"], group_by="name", order_by="name")
|
|
b = query.run()
|
|
|
|
query = frappe.qb.get_query(
|
|
"User", fields=["name as user_name"], group_by="name", order_by="user_name"
|
|
)
|
|
c = query.run()
|
|
|
|
query = frappe.qb.get_query(
|
|
"User", fields=["name as user_name"], group_by="user_name", order_by="name"
|
|
)
|
|
d = query.run()
|
|
|
|
for val in [b, c, d]:
|
|
self.assertEqual(a, val, "Query result mismatch detected.")
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
def test_ifnull_fallback_postgres(self):
|
|
"""Test ifnull fallback in postgres"""
|
|
from frappe.database.query import Engine
|
|
|
|
engine = Engine()
|
|
self.assertEqual(engine._get_ifnull_fallback("Patch Log", "skipped"), "0")
|
|
self.assertEqual(engine._get_ifnull_fallback("Patch Log", "patch"), "''")
|
|
|
|
@run_only_if(db_type_is.MARIADB)
|
|
def test_drop_unique_constraint_for_deleted_fields_mariadb(self):
|
|
trial_dt = new_doctype(
|
|
"Trial Doctype",
|
|
fields=[
|
|
{
|
|
"fieldname": "field_one",
|
|
"fieldtype": "Data",
|
|
"label": "Field One",
|
|
},
|
|
{
|
|
"fieldname": "field_two",
|
|
"fieldtype": "Data",
|
|
"label": "Field Two",
|
|
"unique": 1,
|
|
},
|
|
],
|
|
)
|
|
|
|
trial_dt.insert(ignore_if_duplicate=True)
|
|
|
|
indexes = frappe.db.get_column_index("tabTrial Doctype", "field_two", unique=True)
|
|
self.assertTrue(indexes)
|
|
|
|
field_to_remove = None
|
|
|
|
for field in trial_dt.fields:
|
|
if field.fieldname == "field_two":
|
|
field_to_remove = field
|
|
break
|
|
|
|
trial_dt.fields.remove(field_to_remove)
|
|
trial_dt.save()
|
|
|
|
indexes = frappe.db.get_column_index("tabTrial Doctype", "field_two", unique=True)
|
|
self.assertFalse(indexes)
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
def test_drop_unique_constraint_and_indexes_for_deleted_fields_postgres(self):
|
|
# test for unique index backed by constraint at field creation time
|
|
trial_dt = new_doctype(
|
|
"Trial Doctype",
|
|
fields=[
|
|
{
|
|
"fieldname": "field_one",
|
|
"fieldtype": "Data",
|
|
"label": "Field One",
|
|
},
|
|
{
|
|
"fieldname": "field_two",
|
|
"fieldtype": "Data",
|
|
"label": "Field Two",
|
|
"unique": 1,
|
|
},
|
|
],
|
|
)
|
|
|
|
trial_dt.insert(ignore_if_duplicate=True)
|
|
|
|
index_exists = frappe.db.sql(
|
|
"""
|
|
SELECT 1
|
|
FROM pg_indexes
|
|
WHERE tablename = %s
|
|
AND indexname = %s
|
|
""",
|
|
(
|
|
f"tab{trial_dt.name}",
|
|
f"tab{trial_dt.name}_field_two_key",
|
|
),
|
|
)
|
|
self.assertTrue(index_exists)
|
|
|
|
field_to_remove = None
|
|
|
|
for field in trial_dt.fields:
|
|
if field.fieldname == "field_two":
|
|
field_to_remove = field
|
|
break
|
|
|
|
trial_dt.fields.remove(field_to_remove)
|
|
trial_dt.save()
|
|
|
|
index_exists = frappe.db.sql(
|
|
"""
|
|
SELECT 1
|
|
FROM pg_indexes
|
|
WHERE tablename = %s
|
|
AND indexname = %s
|
|
""",
|
|
(
|
|
f"tab{trial_dt.name}",
|
|
f"tab{trial_dt.name}_field_two_key",
|
|
),
|
|
)
|
|
self.assertFalse(index_exists)
|
|
|
|
# test for unique index backed by no constraint created at field alteration post creation
|
|
for field in trial_dt.fields:
|
|
if field.fieldname == "field_one":
|
|
field.unique = 1
|
|
|
|
trial_dt.save()
|
|
|
|
index_exists = frappe.db.sql(
|
|
"""
|
|
SELECT 1
|
|
FROM pg_indexes
|
|
WHERE tablename = %s
|
|
AND indexname = %s
|
|
""",
|
|
(
|
|
f"tab{trial_dt.name}",
|
|
"unique_field_one",
|
|
),
|
|
)
|
|
self.assertTrue(index_exists)
|
|
|
|
field_to_remove = None
|
|
|
|
for field in trial_dt.fields:
|
|
if field.fieldname == "field_one":
|
|
field_to_remove = field
|
|
break
|
|
|
|
trial_dt.fields.remove(field_to_remove)
|
|
trial_dt.save()
|
|
|
|
index_exists = frappe.db.sql(
|
|
"""
|
|
SELECT 1
|
|
FROM pg_indexes
|
|
WHERE tablename = %s
|
|
AND indexname = %s
|
|
""",
|
|
(
|
|
f"tab{trial_dt.name}",
|
|
"unique_field_one",
|
|
),
|
|
)
|
|
self.assertFalse(index_exists)
|
|
|
|
def test_limit_offset_query(self):
|
|
"""Test if query builder correctly uses limit with offset in MariaDB and SQLite when limit is omitted."""
|
|
from frappe.database.query import MAX_LIMIT
|
|
|
|
query = frappe.qb.get_query("Doctype", offset=10).get_sql()
|
|
if frappe.db.db_type != "postgres":
|
|
self.assertIn(f"LIMIT {MAX_LIMIT} OFFSET 10", query)
|
|
query = frappe.qb.get_query("Doctype", limit=10, offset=10).get_sql()
|
|
self.assertIn("LIMIT 10 OFFSET 10", query)
|
|
else:
|
|
self.assertNotIn("LIMIT", query)
|
|
self.assertIn("OFFSET 10", query)
|
|
|
|
|
|
# This function is used as a permission query condition hook
|
|
def test_permission_hook_condition(user):
|
|
return "`tabDashboard Settings`.`name` = 'Administrator'"
|