fix: add support for permission query conditions

This commit is contained in:
Faris Ansari 2025-03-03 20:10:41 +05:30
parent b3a05896ea
commit a94c143314
2 changed files with 101 additions and 2 deletions

View file

@ -2,10 +2,11 @@ import re
from ast import literal_eval
from functools import lru_cache
from types import BuiltinFunctionType
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, TypeAlias
import sqlparse
from pypika.queries import QueryBuilder, Table
from pypika.terms import Term
import frappe
from frappe import _
@ -492,6 +493,10 @@ class Engine:
conditions.extend(user_perm_conditions)
fetch_shared_docs = fetch_shared_docs or fetch_shared
permission_query_conditions = self.get_permission_query_conditions()
if permission_query_conditions:
conditions.extend(permission_query_conditions)
shared_docs = []
if fetch_shared_docs:
shared_docs = frappe.share.get_shared(self.doctype, self.user)
@ -500,13 +505,33 @@ class Engine:
shared_condition = self.table.name.isin(shared_docs)
if conditions:
# (permission conditions) OR (shared condition)
self.query = self.query.where(Criterion.any(conditions) | shared_condition)
self.query = self.query.where(Criterion.all(conditions) | shared_condition)
else:
self.query = self.query.where(shared_condition)
elif conditions:
# AND all permission conditions
self.query = self.query.where(Criterion.all(conditions))
def get_permission_query_conditions(self):
"""Add permission query conditions from hooks and server scripts"""
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
conditions = []
hooks = frappe.get_hooks("permission_query_conditions", {})
condition_methods = hooks.get(self.doctype, []) + hooks.get("*", [])
for method in condition_methods:
if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.doctype):
conditions.append(RawCriterion(c))
# Get conditions from server scripts
if permission_script_name := get_server_script_map().get("permission_query", {}).get(self.doctype):
script = frappe.get_doc("Server Script", permission_script_name)
if condition := script.get_permission_query_conditions(self.user):
conditions.append(RawCriterion(condition))
return conditions
def get_permission_type(self, doctype) -> str:
"""Get permission type (select/read) based on user permissions"""
if frappe.only_has_select_perm(doctype, user=self.user):
@ -754,3 +779,18 @@ def _sanitize_field(field: str, is_mariadb):
if is_mariadb:
return MARIADB_SPECIFIC_COMMENT.sub("", stripped_field)
return stripped_field
class RawCriterion(Term):
"""A class to represent raw SQL string as a criterion.
Allows using raw SQL strings in pypika queries:
frappe.qb.from_("DocType").where(RawCriterion("name like 'a%'"))
"""
def __init__(self, sql_string: str):
self.sql_string = sql_string
super().__init__()
def get_sql(self, **kwargs: Any) -> str:
return self.sql_string

View file

@ -596,3 +596,62 @@ class TestQuery(IntegrationTestCase):
result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1)
self.assertFalse(any(d.name == "Property Setter" for d in result))
def test_permission_query_condition(self):
"""Test permission query condition being applied from hooks and server script"""
from frappe.desk.doctype.dashboard_settings.dashboard_settings import create_dashboard_settings
# Create a Dashboard Settings for test user
self.doctype = "Dashboard Settings"
self.user = "test@example.com"
# First check without custom permission query condition
original_hooks = frappe.get_hooks("permission_query_conditions") or {}
# Clear any hooks temporarily
frappe.clear_cache()
frappe.hooks.permission_query_conditions = {}
# Create test data
create_dashboard_settings(self.user)
# Register the hook for Dashboard Settings
frappe.clear_cache()
frappe.hooks.permission_query_conditions = {
"Dashboard Settings": ["frappe.tests.test_query.test_permission_hook_condition"]
}
# Hook condition will restrict to only name=Administrator, so our test user's record should not be found
query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False)
data = query.run(as_dict=1)
self.assertEqual(len(data), 0)
# Create a server script for permission query
script = frappe.new_doc(
doctype="Server Script",
name="Dashboard Settings Permission Query",
script_type="Permission Query",
enabled=1,
reference_doctype="Dashboard Settings",
script=f"""conditions = '`tabDashboard Settings`.`name` = "{self.user}"'""",
).insert()
# Test with server script
# Script condition should allow the record to be found
frappe.clear_cache()
frappe.hooks.permission_query_conditions = {} # Clear hooks to test server script alone
data = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False).run(
as_dict=1
)
self.assertEqual(len(data), 1)
# Cleanup
script.delete()
frappe.clear_cache()
frappe.hooks.permission_query_conditions = original_hooks
# This function is used as a permission query condition hook
def test_permission_hook_condition(user):
return "`tabDashboard Settings`.`name` = 'Administrator'"