From a94c14331443b2e3131c7a73b363571db186102f Mon Sep 17 00:00:00 2001 From: Faris Ansari Date: Mon, 3 Mar 2025 20:10:41 +0530 Subject: [PATCH] fix: add support for permission query conditions --- frappe/database/query.py | 44 ++++++++++++++++++++++++++-- frappe/tests/test_query.py | 59 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/frappe/database/query.py b/frappe/database/query.py index 045c4dd174..bf24553d1a 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -2,10 +2,11 @@ import re from ast import literal_eval from functools import lru_cache from types import BuiltinFunctionType -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, TypeAlias import sqlparse from pypika.queries import QueryBuilder, Table +from pypika.terms import Term import frappe from frappe import _ @@ -492,6 +493,10 @@ class Engine: conditions.extend(user_perm_conditions) fetch_shared_docs = fetch_shared_docs or fetch_shared + permission_query_conditions = self.get_permission_query_conditions() + if permission_query_conditions: + conditions.extend(permission_query_conditions) + shared_docs = [] if fetch_shared_docs: shared_docs = frappe.share.get_shared(self.doctype, self.user) @@ -500,13 +505,33 @@ class Engine: shared_condition = self.table.name.isin(shared_docs) if conditions: # (permission conditions) OR (shared condition) - self.query = self.query.where(Criterion.any(conditions) | shared_condition) + self.query = self.query.where(Criterion.all(conditions) | shared_condition) else: self.query = self.query.where(shared_condition) elif conditions: # AND all permission conditions self.query = self.query.where(Criterion.all(conditions)) + def get_permission_query_conditions(self): + """Add permission query conditions from hooks and server scripts""" + from frappe.core.doctype.server_script.server_script_utils import get_server_script_map + + conditions = [] + hooks = frappe.get_hooks("permission_query_conditions", {}) + condition_methods = hooks.get(self.doctype, []) + hooks.get("*", []) + + for method in condition_methods: + if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.doctype): + conditions.append(RawCriterion(c)) + + # Get conditions from server scripts + if permission_script_name := get_server_script_map().get("permission_query", {}).get(self.doctype): + script = frappe.get_doc("Server Script", permission_script_name) + if condition := script.get_permission_query_conditions(self.user): + conditions.append(RawCriterion(condition)) + + return conditions + def get_permission_type(self, doctype) -> str: """Get permission type (select/read) based on user permissions""" if frappe.only_has_select_perm(doctype, user=self.user): @@ -754,3 +779,18 @@ def _sanitize_field(field: str, is_mariadb): if is_mariadb: return MARIADB_SPECIFIC_COMMENT.sub("", stripped_field) return stripped_field + + +class RawCriterion(Term): + """A class to represent raw SQL string as a criterion. + + Allows using raw SQL strings in pypika queries: + frappe.qb.from_("DocType").where(RawCriterion("name like 'a%'")) + """ + + def __init__(self, sql_string: str): + self.sql_string = sql_string + super().__init__() + + def get_sql(self, **kwargs: Any) -> str: + return self.sql_string diff --git a/frappe/tests/test_query.py b/frappe/tests/test_query.py index 0b0a9e86a0..83d9d7a144 100644 --- a/frappe/tests/test_query.py +++ b/frappe/tests/test_query.py @@ -596,3 +596,62 @@ class TestQuery(IntegrationTestCase): result = frappe.qb.get_query("DocType", filters={"autoname": ["is", "set"]}).run(as_dict=1) self.assertFalse(any(d.name == "Property Setter" for d in result)) + + def test_permission_query_condition(self): + """Test permission query condition being applied from hooks and server script""" + from frappe.desk.doctype.dashboard_settings.dashboard_settings import create_dashboard_settings + + # Create a Dashboard Settings for test user + self.doctype = "Dashboard Settings" + self.user = "test@example.com" + + # First check without custom permission query condition + original_hooks = frappe.get_hooks("permission_query_conditions") or {} + + # Clear any hooks temporarily + frappe.clear_cache() + frappe.hooks.permission_query_conditions = {} + + # Create test data + create_dashboard_settings(self.user) + + # Register the hook for Dashboard Settings + frappe.clear_cache() + frappe.hooks.permission_query_conditions = { + "Dashboard Settings": ["frappe.tests.test_query.test_permission_hook_condition"] + } + + # Hook condition will restrict to only name=Administrator, so our test user's record should not be found + query = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False) + data = query.run(as_dict=1) + self.assertEqual(len(data), 0) + + # Create a server script for permission query + script = frappe.new_doc( + doctype="Server Script", + name="Dashboard Settings Permission Query", + script_type="Permission Query", + enabled=1, + reference_doctype="Dashboard Settings", + script=f"""conditions = '`tabDashboard Settings`.`name` = "{self.user}"'""", + ).insert() + + # Test with server script + # Script condition should allow the record to be found + frappe.clear_cache() + frappe.hooks.permission_query_conditions = {} # Clear hooks to test server script alone + + data = frappe.qb.get_query("Dashboard Settings", user=self.user, ignore_permissions=False).run( + as_dict=1 + ) + self.assertEqual(len(data), 1) + + # Cleanup + script.delete() + frappe.clear_cache() + frappe.hooks.permission_query_conditions = original_hooks + + +# This function is used as a permission query condition hook +def test_permission_hook_condition(user): + return "`tabDashboard Settings`.`name` = 'Administrator'"