2271 lines
77 KiB
Python
2271 lines
77 KiB
Python
import datetime
|
|
import re
|
|
from functools import lru_cache
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import sqlparse
|
|
from pypika.enums import Arithmetic
|
|
from pypika.queries import Column, QueryBuilder, Table
|
|
from pypika.terms import AggregateFunction, ArithmeticExpression, Term, ValueWrapper
|
|
|
|
import frappe
|
|
from frappe import _
|
|
from frappe.database.operator_map import NESTED_SET_OPERATORS, OPERATOR_MAP
|
|
from frappe.database.utils import (
|
|
DefaultOrderBy,
|
|
FilterValue,
|
|
convert_to_value,
|
|
get_doctype_name,
|
|
get_doctype_sort_info,
|
|
)
|
|
from frappe.model import get_permitted_fields
|
|
from frappe.model.base_document import DOCTYPES_FOR_DOCTYPE
|
|
from frappe.query_builder import Criterion, Field, Order, functions
|
|
from frappe.query_builder.custom import Month, MonthName, Quarter
|
|
from frappe.query_builder.utils import PseudoColumnMapper
|
|
from frappe.utils.data import MARIADB_SPECIFIC_COMMENT
|
|
|
|
CORE_DOCTYPES = DOCTYPES_FOR_DOCTYPE | frozenset(
|
|
("Custom Field", "Property Setter", "Module Def", "__Auth", "__global_search", "Singles")
|
|
)
|
|
|
|
|
|
def _apply_date_field_filter_conversion(value, operator: str, doctype: str, field):
|
|
"""Apply datetime to date conversion for Date fieldtype filters.
|
|
|
|
This matches db_query behavior where datetime values are truncated to dates
|
|
when filtering on Date fields, for all operators (not just 'between').
|
|
|
|
Args:
|
|
value: The filter value (can be datetime, tuple of datetimes, or other)
|
|
operator: The operator being used (between, >, <, etc.)
|
|
doctype: The doctype to get field metadata from
|
|
field: The field name or pypika Field object
|
|
|
|
Returns:
|
|
The converted value with datetimes converted to dates if field is Date type
|
|
"""
|
|
try:
|
|
# Extract field name
|
|
if "." in str(field):
|
|
field = field.split(".")[-1]
|
|
|
|
# Skip querying meta for core doctypes to avoid recursion
|
|
if doctype in CORE_DOCTYPES:
|
|
meta = None
|
|
else:
|
|
meta = frappe.get_meta(doctype)
|
|
|
|
if meta is None:
|
|
return value
|
|
|
|
df = meta.get_field(field)
|
|
if df is None or df.fieldtype != "Date":
|
|
return value
|
|
|
|
# Convert datetime to date if the fieldtype is date
|
|
if operator.lower() == "between" and isinstance(value, list | tuple) and len(value) == 2:
|
|
from_val, to_val = value
|
|
if isinstance(from_val, datetime.datetime):
|
|
from_val = from_val.date()
|
|
if isinstance(to_val, datetime.datetime):
|
|
to_val = to_val.date()
|
|
return (from_val, to_val)
|
|
elif isinstance(value, datetime.datetime):
|
|
return value.date()
|
|
|
|
except (AttributeError, TypeError, KeyError):
|
|
pass
|
|
|
|
return value
|
|
|
|
|
|
OPTIONAL_COLUMNS = frozenset(["_user_tags", "_comments", "_assign", "_liked_by", "_seen"])
|
|
|
|
if TYPE_CHECKING:
|
|
from frappe.query_builder import DocType
|
|
|
|
TAB_PATTERN = re.compile("^tab")
|
|
WORDS_PATTERN = re.compile(r"\w+")
|
|
COMMA_PATTERN = re.compile(r",\s*(?![^()]*\))")
|
|
|
|
# less restrictive version of frappe.core.doctype.doctype.doctype.START_WITH_LETTERS_PATTERN
|
|
# to allow table names like __Auth
|
|
TABLE_NAME_PATTERN = re.compile(r"^[\w -]*$", flags=re.ASCII)
|
|
|
|
|
|
def _is_function_call(field_str: str) -> bool:
|
|
"""Check if a string is a SQL function call using sqlparse."""
|
|
parsed = sqlparse.parse(field_str.strip())
|
|
if not parsed:
|
|
return False
|
|
|
|
return any(isinstance(token, sqlparse.sql.Function) for token in parsed[0].tokens)
|
|
|
|
|
|
# Pattern to validate field names in SELECT:
|
|
# Allows: name, `name`, name as alias, `name` as alias, table.name, table.name as alias
|
|
# Also allows backtick-qualified identifiers with spaces/hyphens:
|
|
# - `tabTable`.`field`
|
|
# - `tabTable Name`.`field` (spaces in table name)
|
|
# - `tabTable-Field`.`field` (hyphens in table name)
|
|
# - Any of above with aliases: ... as alias
|
|
ALLOWED_FIELD_PATTERN = re.compile(
|
|
r"^(?:(`[\w\s-]+`|\w+)\.)?(`[\w\s-]+`|\w+)(?:\s+as\s+(?:`[\w\s-]+`|\w+))?$",
|
|
flags=re.ASCII | re.IGNORECASE,
|
|
)
|
|
|
|
# Regex to parse field names:
|
|
# Group 1: Optional quote for table name
|
|
# Group 2: Optional table name (e.g., `tabDocType` or tabDocType or `tabNote Seen By`)
|
|
# Group 3: Optional quote for field name
|
|
# Group 4: Field name (e.g., `field` or field)
|
|
FIELD_PARSE_REGEX = re.compile(r"^(?:([`\"]?)(tab[\w\s-]+)\1\.)?([`\"]?)(\w+)\3$")
|
|
|
|
# Direct mapping from uppercase function names to pypika function classes
|
|
FUNCTION_MAPPING = {
|
|
"COUNT": functions.Count,
|
|
"SUM": functions.Sum,
|
|
"AVG": functions.Avg,
|
|
"MAX": functions.Max,
|
|
"MIN": functions.Min,
|
|
"ABS": functions.Abs,
|
|
"EXTRACT": functions.Extract,
|
|
"LOCATE": functions.Locate,
|
|
"TIMESTAMP": functions.Timestamp,
|
|
"IFNULL": functions.IfNull,
|
|
"CONCAT": functions.Concat,
|
|
"NOW": functions.Now,
|
|
"NULLIF": functions.NullIf,
|
|
"MONTHNAME": MonthName,
|
|
"QUARTER": Quarter,
|
|
"MONTH": Month,
|
|
}
|
|
|
|
# Mapping from operator names to pypika Arithmetic enum values
|
|
# Operators use dict format: {"ADD": [left, right], "as": "alias"}
|
|
# Supported: ADD (+), SUB (-), MUL (*), DIV (/)
|
|
# Can be nested with functions: {"DIV": [1, {"NULLIF": ["value", 0]}]}
|
|
OPERATOR_MAPPING = {
|
|
"ADD": Arithmetic.add,
|
|
"SUB": Arithmetic.sub,
|
|
"MUL": Arithmetic.mul,
|
|
"DIV": Arithmetic.div,
|
|
}
|
|
|
|
|
|
class Engine:
|
|
def get_query(
|
|
self,
|
|
table: str | Table,
|
|
fields: str | list | tuple | None = None,
|
|
filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
|
|
order_by: str | None = None,
|
|
group_by: str | None = None,
|
|
limit: int | None = None,
|
|
offset: int | None = None,
|
|
distinct: bool = False,
|
|
for_update: bool = False,
|
|
update: bool = False,
|
|
into: bool = False,
|
|
delete: bool = False,
|
|
*,
|
|
validate_filters: bool = False,
|
|
skip_locked: bool = False,
|
|
wait: bool = True,
|
|
ignore_permissions: bool = True,
|
|
user: str | None = None,
|
|
parent_doctype: str | None = None,
|
|
reference_doctype: str | None = None,
|
|
or_filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
|
|
db_query_compat: bool = False,
|
|
) -> QueryBuilder:
|
|
"""Build a query with optional compatibility mode for legacy db_query behavior.
|
|
|
|
Args:
|
|
db_query_compat: When True, uses legacy db_query behavior for sorting and filtering.
|
|
This is kept optional to not break existing code that relies on the original query builder behaviour.
|
|
"""
|
|
|
|
qb = frappe.local.qb
|
|
db_type = frappe.local.db.db_type
|
|
|
|
self.is_mariadb = db_type == "mariadb"
|
|
self.is_postgres = db_type == "postgres"
|
|
self.is_sqlite = db_type == "sqlite"
|
|
self.validate_filters = validate_filters
|
|
self.user = user or frappe.session.user
|
|
self.parent_doctype = parent_doctype
|
|
self.reference_doctype = reference_doctype
|
|
self.apply_permissions = not ignore_permissions
|
|
self.function_aliases = set()
|
|
self.field_aliases = set()
|
|
self.db_query_compat = db_query_compat
|
|
|
|
if isinstance(table, Table):
|
|
self.table = table
|
|
self.doctype = get_doctype_name(table.get_sql())
|
|
else:
|
|
self.doctype = table
|
|
self.validate_doctype()
|
|
self.table = qb.DocType(table)
|
|
|
|
if self.apply_permissions:
|
|
self.check_read_permission()
|
|
|
|
if update:
|
|
self.query = qb.update(self.table, immutable=False)
|
|
elif into:
|
|
self.query = qb.into(self.table, immutable=False)
|
|
elif delete:
|
|
self.query = qb.from_(self.table, immutable=False).delete()
|
|
else:
|
|
self.query = qb.from_(self.table, immutable=False)
|
|
self.apply_fields(fields)
|
|
|
|
self.apply_filters(filters)
|
|
self.apply_or_filters(or_filters)
|
|
|
|
if limit:
|
|
if not isinstance(limit, int) or limit < 0:
|
|
frappe.throw(_("Limit must be a non-negative integer"), TypeError)
|
|
self.query = self.query.limit(limit)
|
|
|
|
if offset:
|
|
if not isinstance(offset, int) or offset < 0:
|
|
frappe.throw(_("Offset must be a non-negative integer"), TypeError)
|
|
self.query = self.query.offset(offset)
|
|
|
|
if distinct:
|
|
self.query = self.query.distinct()
|
|
|
|
if for_update:
|
|
self.query = self.query.for_update(skip_locked=skip_locked, nowait=not wait)
|
|
|
|
if group_by:
|
|
self.apply_group_by(group_by)
|
|
|
|
if order_by:
|
|
self.apply_order_by(order_by)
|
|
|
|
if self.apply_permissions:
|
|
self.add_permission_conditions()
|
|
|
|
self.query.immutable = True
|
|
return self.query
|
|
|
|
def validate_doctype(self):
|
|
if not TABLE_NAME_PATTERN.match(self.doctype):
|
|
frappe.throw(_("Invalid DocType: {0}").format(self.doctype))
|
|
|
|
def apply_fields(self, fields):
|
|
self.fields = self.parse_fields(fields)
|
|
|
|
# Track field aliases for use in group_by/order_by
|
|
for field in self.fields:
|
|
if isinstance(field, Field) and field.alias:
|
|
self.field_aliases.add(field.alias)
|
|
|
|
if self.apply_permissions:
|
|
self.fields = self.apply_field_permissions()
|
|
|
|
if not self.fields:
|
|
self.fields = [self.table.name]
|
|
|
|
self.query._child_queries = []
|
|
for field in self.fields:
|
|
if isinstance(field, DynamicTableField):
|
|
self.query = field.apply_select(self.query)
|
|
elif isinstance(field, ChildQuery):
|
|
self.query._child_queries.append(field)
|
|
else:
|
|
self.query = self.query.select(field)
|
|
|
|
def apply_filters(
|
|
self,
|
|
filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
|
|
collect: list | None = None,
|
|
):
|
|
if filters is None:
|
|
return
|
|
|
|
if isinstance(filters, FilterValue):
|
|
filters = {"name": convert_to_value(filters)}
|
|
|
|
if isinstance(filters, Criterion):
|
|
self.query = self.query.where(filters)
|
|
return
|
|
|
|
if isinstance(filters, dict):
|
|
self.apply_dict_filters(filters, collect=collect)
|
|
return
|
|
|
|
if isinstance(filters, list | tuple):
|
|
if not filters:
|
|
return
|
|
|
|
# 1. Handle special case: list of names -> name IN (...)
|
|
if all(isinstance(d, FilterValue) for d in filters):
|
|
self.apply_dict_filters(
|
|
{"name": ("in", tuple(convert_to_value(f) for f in filters))}, collect=collect
|
|
)
|
|
return
|
|
|
|
# 2. Check for nested logic format [cond, op, cond, ...] or [[cond, op, cond]]
|
|
is_nested_structure = False
|
|
potential_nested_list = filters
|
|
is_single_group = False
|
|
|
|
# Check for single grouped condition [[cond_a, op, cond_b]]
|
|
if len(filters) == 1 and isinstance(filters[0], list | tuple):
|
|
inner_list = filters[0]
|
|
# Ensure inner list also looks like a nested structure
|
|
# Check if the operator is a string, validation happens inside _parse_nested_filters
|
|
if len(inner_list) >= 3 and isinstance(inner_list[1], str):
|
|
is_nested_structure = True
|
|
potential_nested_list = inner_list # Use the inner list for validation and parsing
|
|
is_single_group = True # Flag that the original filters was wrapped
|
|
|
|
# Check for standard nested structure [cond, op, cond, ...]
|
|
# Check if it looks like it *might* be nested (even if malformed).
|
|
# This allows lists starting with operators or containing invalid operators
|
|
# to be passed to _parse_nested_filters for detailed validation.
|
|
# Condition: Contains a string at an odd index OR starts with a string.
|
|
elif any(isinstance(item, str) for i, item in enumerate(filters) if i % 2 != 0) or (
|
|
len(filters) > 0 and isinstance(filters[0], str)
|
|
):
|
|
is_nested_structure = True
|
|
# potential_nested_list remains filters
|
|
|
|
if is_nested_structure:
|
|
# If validation passes, proceed with parsing the identified nested list
|
|
try:
|
|
# If it's a single group like [[cond]], parse the inner list as one condition.
|
|
# Otherwise, parse the list as a sequence [cond1, op, cond2, ...].
|
|
if is_single_group:
|
|
combined_criterion = self._condition_to_criterion(potential_nested_list)
|
|
else:
|
|
# _parse_nested_filters MUST validate the structure, including the first element and operators.
|
|
combined_criterion = self._parse_nested_filters(potential_nested_list)
|
|
if combined_criterion:
|
|
self.query = self.query.where(combined_criterion)
|
|
except Exception as e:
|
|
# Log the original filters list for better debugging context
|
|
frappe.log_error(f"Filter parsing error: {filters}", "Query Engine Error")
|
|
frappe.throw(_("Error parsing nested filters: {0}").format(e), exc=e)
|
|
|
|
else: # Not a nested structure, assume it's a list of simple filters (implicitly ANDed)
|
|
for filter_item in filters:
|
|
if isinstance(filter_item, list | tuple):
|
|
self.apply_list_filters(
|
|
filter_item, collect=collect
|
|
) # Handles simple [field, op, value] lists
|
|
elif isinstance(filter_item, dict | Criterion):
|
|
self.apply_filters(filter_item, collect=collect) # Recursive call for dict/criterion
|
|
else:
|
|
# Disallow single values (strings, numbers, etc.) directly in the list
|
|
# unless it's the name IN (...) case handled above.
|
|
raise ValueError(
|
|
f"Invalid item type in filter list: {type(filter_item).__name__}. Expected list, tuple, dict, or Criterion."
|
|
)
|
|
return
|
|
|
|
# If filters type is none of the above
|
|
raise ValueError(f"Unsupported filters type: {type(filters).__name__}")
|
|
|
|
def apply_or_filters(
|
|
self,
|
|
or_filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
|
|
):
|
|
"""Apply OR filters - all conditions are combined with OR operator.
|
|
|
|
Example:
|
|
or_filters={"name": "User", "module": "Core"}
|
|
→ Collects: [Criterion(name='User'), Criterion(module='Core')]
|
|
→ Combines: Criterion(name='User') | Criterion(module='Core')
|
|
→ Result: WHERE name='User' OR module='Core'
|
|
"""
|
|
if or_filters is None:
|
|
return
|
|
|
|
# Collect criteria instead of applying immediately
|
|
criteria = []
|
|
self.apply_filters(or_filters, collect=criteria)
|
|
|
|
# Combine all criteria with OR operator (|)
|
|
if criteria:
|
|
from functools import reduce
|
|
|
|
# Reduce combines: [Criterion(name='User'), Criterion(module='Core')] → Criterion(name='User') | Criterion(module='Core')
|
|
combined = reduce(lambda a, b: a | b, criteria)
|
|
self.query = self.query.where(combined)
|
|
|
|
def apply_list_filters(self, filter: list, collect: list | None = None):
|
|
if len(filter) == 2:
|
|
field, value = filter
|
|
self._apply_filter(field, value, collect=collect)
|
|
elif len(filter) == 3:
|
|
field, operator, value = filter
|
|
self._apply_filter(field, value, operator, collect=collect)
|
|
elif len(filter) == 4:
|
|
doctype, field, operator, value = filter
|
|
self._apply_filter(field, value, operator, doctype, collect=collect)
|
|
else:
|
|
raise ValueError(f"Unknown filter format: {filter}")
|
|
|
|
def apply_dict_filters(self, filters: dict[str, FilterValue | list], collect: list | None = None):
|
|
for field, value in filters.items():
|
|
operator = "="
|
|
if isinstance(value, list | tuple):
|
|
operator, value = value
|
|
|
|
self._apply_filter(field, value, operator, collect=collect)
|
|
|
|
def _apply_filter(
|
|
self,
|
|
field: str | Field,
|
|
value: FilterValue | list | set | None,
|
|
operator: str = "=",
|
|
doctype: str | None = None,
|
|
collect: list | None = None,
|
|
):
|
|
"""Applies a simple filter condition to the query."""
|
|
criterion = self._build_criterion_for_simple_filter(field, value, operator, doctype)
|
|
if criterion:
|
|
if collect is not None:
|
|
collect.append(criterion)
|
|
else:
|
|
self.query = self.query.where(criterion)
|
|
|
|
def _build_criterion_for_simple_filter(
|
|
self,
|
|
field: str | Field,
|
|
value: FilterValue | Column | list | set | None,
|
|
operator: str = "=",
|
|
doctype: str | None = None,
|
|
) -> "Criterion | None":
|
|
"""Builds a pypika Criterion object for a simple filter condition."""
|
|
import operator as builtin_operator
|
|
|
|
_field = self._validate_and_prepare_filter_field(field, doctype)
|
|
|
|
if isinstance(value, Column):
|
|
_value = self._validate_and_prepare_filter_field(value.name, doctype)
|
|
else:
|
|
# Regular value processing for literal comparisons like: table.field = 'value'
|
|
_value = convert_to_value(value)
|
|
_operator = operator
|
|
|
|
# For Date fields with datetime values, convert to date to match db_query behavior
|
|
if isinstance(_value, datetime.datetime) or (
|
|
isinstance(_value, list | tuple) and any(isinstance(v, datetime.datetime) for v in _value)
|
|
):
|
|
_value = _apply_date_field_filter_conversion(_value, _operator, doctype or self.doctype, field)
|
|
|
|
if not _value and isinstance(_value, list | tuple | set):
|
|
_value = ("",)
|
|
|
|
# db_query compatibility: handle None values for 'in' and 'not in' operators
|
|
# In db_query, None values are converted to empty tuples for these operators
|
|
if self.db_query_compat and _value is None and _operator.casefold() in ("in", "not in"):
|
|
_value = ("",)
|
|
|
|
if _operator in NESTED_SET_OPERATORS:
|
|
hierarchy = _operator
|
|
docname = _value
|
|
|
|
# Use the original field name string for get_field if _field was converted
|
|
# If _field is from a dynamic field, its name might be just the target fieldname.
|
|
# We need the original string ('link.target') or the fieldname from the main doctype.
|
|
original_field_name = field if isinstance(field, str) else _field.name
|
|
# Check if the original field name exists in the *main* doctype meta
|
|
main_meta = frappe.get_meta(self.doctype)
|
|
if main_meta.has_field(original_field_name):
|
|
_df = main_meta.get_field(original_field_name)
|
|
ref_doctype = _df.options if _df else self.doctype
|
|
else:
|
|
# If not in main doctype, assume it's a standard field like 'name' or refers to the main doctype itself
|
|
# This part might need refinement if nested set operators are used with dynamic fields.
|
|
ref_doctype = self.doctype
|
|
|
|
nodes = get_nested_set_hierarchy_result(ref_doctype, docname, hierarchy)
|
|
operator_fn = (
|
|
OPERATOR_MAP["not in"]
|
|
if hierarchy in ("not ancestors of", "not descendants of")
|
|
else OPERATOR_MAP["in"]
|
|
)
|
|
return operator_fn(_field, nodes or ("",))
|
|
|
|
operator_fn = OPERATOR_MAP[_operator.casefold()]
|
|
if _value is None and isinstance(_field, Field):
|
|
if operator_fn == builtin_operator.ne:
|
|
filter_field_name = (
|
|
field
|
|
if isinstance(field, str)
|
|
else (_field.name if hasattr(_field, "name") else str(_field))
|
|
)
|
|
if "." in filter_field_name:
|
|
filter_field_name = filter_field_name.split(".")[-1]
|
|
|
|
target_doctype = doctype or self.doctype
|
|
fallback_sql = self._get_ifnull_fallback(target_doctype, filter_field_name)
|
|
|
|
if fallback_sql == "''":
|
|
fallback_value = ""
|
|
elif fallback_sql.startswith("'") and fallback_sql.endswith("'"):
|
|
fallback_value = fallback_sql[1:-1]
|
|
else:
|
|
try:
|
|
fallback_value = int(fallback_sql)
|
|
except (ValueError, TypeError):
|
|
fallback_value = fallback_sql
|
|
|
|
return operator_fn(_field, ValueWrapper(fallback_value))
|
|
else:
|
|
return _field.isnull()
|
|
else:
|
|
filter_field_name = (
|
|
field if isinstance(field, str) else (_field.name if hasattr(_field, "name") else str(_field))
|
|
)
|
|
|
|
if "." in filter_field_name:
|
|
filter_field_name = filter_field_name.split(".")[-1]
|
|
|
|
target_doctype = doctype or self.doctype
|
|
|
|
# Skip applying ifnull if field already has null-handling function
|
|
if isinstance(_field, functions.IfNull | functions.Coalesce):
|
|
return operator_fn(_field, _value)
|
|
|
|
if self._should_apply_ifnull(target_doctype, filter_field_name, _operator, _value):
|
|
fallback_sql = self._get_ifnull_fallback(target_doctype, filter_field_name)
|
|
if fallback_sql == "''":
|
|
fallback_value = ""
|
|
elif fallback_sql.startswith("'") and fallback_sql.endswith("'"):
|
|
fallback_value = fallback_sql[1:-1]
|
|
else:
|
|
try:
|
|
fallback_value = int(fallback_sql)
|
|
except (ValueError, TypeError):
|
|
fallback_value = fallback_sql
|
|
_field = functions.IfNull(_field, ValueWrapper(fallback_value))
|
|
|
|
return operator_fn(_field, _value)
|
|
|
|
def _parse_nested_filters(self, nested_list: list | tuple) -> "Criterion | None":
|
|
"""Parses a nested filter list like [cond1, 'and', cond2, 'or', cond3, ...] into a pypika Criterion."""
|
|
if not isinstance(nested_list, list | tuple):
|
|
frappe.throw(_("Nested filters must be provided as a list or tuple."))
|
|
|
|
if not nested_list:
|
|
return None
|
|
|
|
# First item must be a condition (list/tuple)
|
|
if not isinstance(nested_list[0], list | tuple):
|
|
frappe.throw(
|
|
_("Invalid start for filter condition: {0}. Expected a list or tuple.").format(nested_list[0])
|
|
)
|
|
|
|
current_criterion = self._condition_to_criterion(nested_list[0])
|
|
|
|
idx = 1
|
|
while idx < len(nested_list):
|
|
# Expect an operator ('and' or 'or')
|
|
operator_str = nested_list[idx]
|
|
if not isinstance(operator_str, str) or operator_str.lower() not in ("and", "or"):
|
|
frappe.throw(
|
|
_("Expected 'and' or 'or' operator, found: {0}").format(operator_str),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
idx += 1
|
|
if idx >= len(nested_list):
|
|
frappe.throw(_("Filter condition missing after operator: {0}").format(operator_str))
|
|
|
|
# Expect a condition (list/tuple)
|
|
next_condition = nested_list[idx]
|
|
if not isinstance(next_condition, list | tuple):
|
|
frappe.throw(
|
|
_("Invalid filter condition: {0}. Expected a list or tuple.").format(next_condition)
|
|
)
|
|
|
|
next_criterion = self._condition_to_criterion(next_condition)
|
|
|
|
if operator_str.lower() == "and":
|
|
current_criterion = current_criterion & next_criterion
|
|
elif operator_str.lower() == "or":
|
|
current_criterion = current_criterion | next_criterion
|
|
|
|
idx += 1
|
|
|
|
return current_criterion
|
|
|
|
def _condition_to_criterion(self, condition: list | tuple) -> "Criterion":
|
|
"""Converts a single condition (simple filter list or nested list) into a pypika Criterion."""
|
|
if not isinstance(condition, list | tuple):
|
|
frappe.throw(_("Invalid condition type in nested filters: {0}").format(type(condition)))
|
|
|
|
# Check if it's a nested condition list [cond1, op, cond2, ...]
|
|
is_nested = False
|
|
# Broaden check here as well: length >= 3 and second element is string
|
|
if len(condition) >= 3 and isinstance(condition[1], str):
|
|
if isinstance(condition[0], list | tuple): # First element must also be a condition
|
|
is_nested = True
|
|
|
|
if is_nested:
|
|
# It's a nested sub-expression like [["assignee", "=", "A"], "or", ["assignee", "=", "B"]]
|
|
# _parse_nested_filters will handle operator validation ('and'/'or')
|
|
return self._parse_nested_filters(condition)
|
|
else:
|
|
# Assume it's a simple filter [field, op, value] etc.
|
|
field, value, operator, doctype = None, None, None, None
|
|
|
|
# Determine structure based on length and types
|
|
if len(condition) == 3 and isinstance(condition[1], str) and condition[1].lower() in OPERATOR_MAP:
|
|
# [field, operator, value]
|
|
field, operator, value = condition
|
|
elif (
|
|
len(condition) == 4 and isinstance(condition[2], str) and condition[2].lower() in OPERATOR_MAP
|
|
):
|
|
# [doctype, field, operator, value]
|
|
doctype, field, operator, value = condition
|
|
elif len(condition) == 2:
|
|
# [field, value] -> implies '=' operator
|
|
field, value = condition
|
|
operator = "="
|
|
else:
|
|
frappe.throw(_("Invalid simple filter format: {0}").format(condition))
|
|
|
|
# Use the helper method to build the criterion for the simple filter
|
|
return self._build_criterion_for_simple_filter(field, value, operator, doctype)
|
|
|
|
def _validate_and_prepare_filter_field(self, field: str | Field, doctype: str | None = None) -> Field:
|
|
"""Validate field name for filters and return a pypika Field object. Handles dynamic fields."""
|
|
|
|
if isinstance(field, Term):
|
|
# return if field is already a pypika Term
|
|
return field
|
|
|
|
# Parse backtick table.field notation: `tabDocType`.`fieldname`
|
|
if "`" in field:
|
|
if parsed := self._parse_backtick_field_notation(field):
|
|
table_name, field_name = parsed
|
|
|
|
# Return query builder field reference
|
|
return frappe.qb.DocType(table_name)[field_name]
|
|
|
|
# If parsing failed, fall through to error handling below
|
|
frappe.throw(
|
|
_("Filter fields have invalid backtick notation: {0}").format(field),
|
|
frappe.ValidationError,
|
|
title=_("Invalid Filter"),
|
|
)
|
|
|
|
# Handle dot notation (link_field.target_field or child_table_field.target_field)
|
|
if "." in field:
|
|
# Disallow tabDoc.field notation in filters.
|
|
dynamic_field = DynamicTableField.parse(field, self.doctype, allow_tab_notation=False)
|
|
if dynamic_field:
|
|
# Parsed successfully as link/child field access
|
|
target_doctype = dynamic_field.doctype
|
|
target_fieldname = dynamic_field.fieldname
|
|
parent_doctype_for_perm = (
|
|
dynamic_field.parent_doctype if isinstance(dynamic_field, ChildTableField) else None
|
|
)
|
|
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
|
|
|
self.query = dynamic_field.apply_join(self.query)
|
|
# Return the pypika Field object associated with the dynamic field
|
|
return dynamic_field.field
|
|
else:
|
|
# Contains '.' but is not a valid link/child field access pattern
|
|
# This rejects tabDoc.field and other invalid formats like a.b.c
|
|
frappe.throw(
|
|
_(
|
|
"Invalid filter field format: {0}. Use 'fieldname' or 'link_fieldname.target_fieldname'."
|
|
).format(field),
|
|
frappe.ValidationError,
|
|
title=_("Invalid Filter"),
|
|
)
|
|
else:
|
|
# No '.' and no '`'. Check if it's a simple field name (alphanumeric + underscore).
|
|
if not re.fullmatch(r"\w+", field):
|
|
frappe.throw(
|
|
_(
|
|
"Invalid characters in fieldname: {0}. Only letters, numbers, and underscores are allowed."
|
|
).format(field),
|
|
frappe.ValidationError,
|
|
title=_("Invalid Filter"),
|
|
)
|
|
# It's a simple, valid fieldname like 'name' or 'creation'
|
|
target_doctype = doctype or self.doctype
|
|
target_fieldname = field
|
|
parent_doctype_for_perm = self.parent_doctype if doctype else None
|
|
|
|
# If a specific doctype is provided and it's different from the main query doctype,
|
|
# assume it's a child table and add the join using ChildTableField logic.
|
|
if doctype and doctype != self.doctype:
|
|
# Check if doctype is a valid child table of self.doctype
|
|
parent_meta = frappe.get_meta(self.doctype)
|
|
# Find the parent fieldname for this child doctype
|
|
parent_fieldname = None
|
|
for df in parent_meta.get_table_fields():
|
|
if df.options == doctype:
|
|
parent_fieldname = df.fieldname
|
|
break
|
|
|
|
if not parent_fieldname:
|
|
frappe.throw(
|
|
_("{0} is not a child table of {1}").format(doctype, self.doctype),
|
|
frappe.ValidationError,
|
|
title=_("Invalid Filter"),
|
|
)
|
|
|
|
# Create a ChildTableField instance to handle join and field access
|
|
# Pass the identified parent_fieldname
|
|
child_field_handler = ChildTableField(
|
|
doctype=doctype,
|
|
fieldname=target_fieldname,
|
|
parent_doctype=self.doctype,
|
|
parent_fieldname=parent_fieldname,
|
|
)
|
|
|
|
# For permission check, the parent is the main doctype
|
|
parent_doctype_for_perm = self.doctype
|
|
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
|
|
|
# Delegate join logic
|
|
self.query = child_field_handler.apply_join(self.query)
|
|
# Return the pypika Field object from the handler
|
|
return child_field_handler.field
|
|
else:
|
|
# Field belongs to the main doctype or doctype wasn't specified differently
|
|
# If doctype wasn't specified, and the field isn't a standard field and doesn't exist in main doctype, check child tables
|
|
from frappe.model import child_table_fields, default_fields, optional_fields
|
|
|
|
if self.doctype in CORE_DOCTYPES:
|
|
meta = None
|
|
else:
|
|
try:
|
|
meta = frappe.get_meta(self.doctype)
|
|
except frappe.DoesNotExistError:
|
|
meta = None
|
|
|
|
if (
|
|
meta
|
|
and not doctype
|
|
and target_fieldname not in default_fields + optional_fields + child_table_fields
|
|
and not meta.has_field(target_fieldname)
|
|
):
|
|
for df in meta.get_table_fields(include_computed=True):
|
|
try:
|
|
child_meta = frappe.get_meta(df.options)
|
|
except frappe.DoesNotExistError:
|
|
continue
|
|
|
|
if child_meta.has_field(target_fieldname):
|
|
# Found in child table, create handler for it
|
|
child_field_handler = ChildTableField(
|
|
doctype=df.options,
|
|
fieldname=target_fieldname,
|
|
parent_doctype=self.doctype,
|
|
parent_fieldname=df.fieldname,
|
|
)
|
|
parent_doctype_for_perm = self.doctype
|
|
self._check_field_permission(
|
|
df.options, target_fieldname, parent_doctype_for_perm
|
|
)
|
|
self.query = child_field_handler.apply_join(self.query)
|
|
return child_field_handler.field
|
|
|
|
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
|
# Convert string field name to pypika Field object for the specified/current doctype
|
|
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
|
|
|
def _check_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
|
"""Check if the user has permission to access the given field"""
|
|
if not self.apply_permissions:
|
|
return
|
|
|
|
# Skip field permission check if doctype has no permissions defined
|
|
meta = frappe.get_meta(doctype)
|
|
if not meta.get_permissions(parenttype=parent_doctype):
|
|
return
|
|
|
|
permission_type = self.get_permission_type(doctype)
|
|
permitted_fields = get_permitted_fields(
|
|
doctype=doctype,
|
|
parenttype=parent_doctype,
|
|
permission_type=permission_type,
|
|
ignore_virtual=True,
|
|
user=self.user,
|
|
)
|
|
|
|
if fieldname in OPTIONAL_COLUMNS:
|
|
return
|
|
|
|
if fieldname not in permitted_fields:
|
|
frappe.throw(
|
|
_("You do not have permission to access field: {0}").format(
|
|
frappe.bold(f"{doctype}.{fieldname}")
|
|
),
|
|
frappe.PermissionError,
|
|
title=_("Permission Error"),
|
|
)
|
|
|
|
def parse_string_field(self, field: str):
|
|
"""
|
|
Parses a field string into a pypika Field object.
|
|
|
|
Handles:
|
|
- *
|
|
- simple_field
|
|
- `quoted_field`
|
|
- tabDocType.simple_field
|
|
- `tabDocType`.`quoted_field`
|
|
- `tabTable Name`.`quoted_field`
|
|
- Aliases for all above formats (e.g., field as alias)
|
|
"""
|
|
if field == "*":
|
|
return self.table.star
|
|
|
|
alias = None
|
|
field_part = field
|
|
if " as " in field.lower(): # Case-insensitive check for ' as '
|
|
# Find the last occurrence of ' as ' to handle potential aliases named 'as'
|
|
parts = re.split(r"\s+as\s+", field, flags=re.IGNORECASE)
|
|
if len(parts) > 1:
|
|
field_part = parts[0].strip()
|
|
alias = parts[1].strip().strip('`"') # Remove potential quotes from alias
|
|
|
|
match = FIELD_PARSE_REGEX.match(field_part)
|
|
|
|
if not match:
|
|
frappe.throw(_("Could not parse field: {0}").format(field))
|
|
|
|
# Groups: 1: table_quote, 2: table_name_with_tab, 3: field_quote, 4: field_name
|
|
groups = match.groups()
|
|
table_name = groups[1] # This will be None if no table part (e.g., just 'field')
|
|
field_name = groups[3] # This will be the field name (e.g., 'field')
|
|
|
|
if table_name:
|
|
# Table name specified (e.g., `tabX`.`y` or tabX.y or `tabX Y`.`y`)
|
|
# Ensure the extracted table name is valid before creating DocType object
|
|
if not TABLE_NAME_PATTERN.match(table_name.lstrip("tab")):
|
|
frappe.throw(_("Invalid characters in table name: {0}").format(table_name))
|
|
|
|
doctype_name = table_name[3:] if table_name.startswith("tab") else table_name
|
|
table_obj = frappe.qb.DocType(doctype_name)
|
|
pypika_field = table_obj[field_name]
|
|
else:
|
|
# Simple field name (e.g., `y` or y) - use the main table
|
|
pypika_field = self.table[field_name]
|
|
|
|
if alias:
|
|
return pypika_field.as_(alias)
|
|
else:
|
|
return pypika_field
|
|
|
|
def parse_fields(
|
|
self, fields: str | list | tuple | Field | AggregateFunction | None
|
|
) -> "list[Field | AggregateFunction | Criterion | DynamicTableField | ChildQuery]":
|
|
if not fields:
|
|
return []
|
|
|
|
# return if fields is already a pypika Term
|
|
if isinstance(fields, Term):
|
|
return [fields]
|
|
|
|
initial_field_list = []
|
|
if isinstance(fields, str):
|
|
# Split comma-separated fields passed as a single string
|
|
initial_field_list.extend(f.strip() for f in COMMA_PATTERN.split(fields) if f.strip())
|
|
elif isinstance(fields, list | tuple):
|
|
for item in fields:
|
|
if item is None:
|
|
continue
|
|
if isinstance(item, str) and "," in item:
|
|
# Split comma-separated strings within the list
|
|
initial_field_list.extend(f.strip() for f in COMMA_PATTERN.split(item) if f.strip())
|
|
else:
|
|
# Add non-comma-separated items directly
|
|
initial_field_list.append(item)
|
|
|
|
else:
|
|
frappe.throw(_("Fields must be a string, list, tuple, pypika Field, or pypika Function"))
|
|
|
|
_fields = []
|
|
# Iterate through the list where each item could be a single field, criterion, or a comma-separated string
|
|
for item in initial_field_list:
|
|
if isinstance(item, str):
|
|
# Sanitize and split potentially comma-separated strings within the list
|
|
if sanitized_item := _sanitize_field(item.strip(), self.is_mariadb).strip():
|
|
parsed = self._parse_single_field_item(sanitized_item)
|
|
if isinstance(parsed, list): # Result from parsing a child query dict
|
|
_fields.extend(parsed)
|
|
elif parsed:
|
|
_fields.append(parsed)
|
|
else:
|
|
# Handle non-string items (like dict for child query, or pre-parsed Field/Function)
|
|
parsed = self._parse_single_field_item(item)
|
|
if isinstance(parsed, list):
|
|
_fields.extend(parsed)
|
|
elif parsed:
|
|
_fields.append(parsed)
|
|
|
|
return _fields
|
|
|
|
def _parse_single_field_item(
|
|
self, field: str | Criterion | dict | Field | Term
|
|
) -> "list | Criterion | Field | DynamicTableField | ChildQuery | None":
|
|
"""Parses a single item from the fields list/tuple. Assumes comma-separated strings have already been split."""
|
|
if isinstance(field, Term):
|
|
# Accept any pypika Term (Field, Criterion, ArithmeticExpression, AggregateFunction, etc.)
|
|
return field
|
|
elif isinstance(field, dict):
|
|
# Check if it's a SQL function or operator dictionary
|
|
function_parser = SQLFunctionParser(engine=self)
|
|
if function_parser.is_function_dict(field):
|
|
return function_parser.parse_function(field)
|
|
elif function_parser.is_operator_dict(field):
|
|
return function_parser.parse_operator(field)
|
|
else:
|
|
# Handle child queries defined as dicts {fieldname: [child_fields]}
|
|
_parsed_fields = []
|
|
for child_field, child_fields_list in field.items():
|
|
# Skip uppercase keys as they might be unsupported SQL functions or operators
|
|
if child_field.isupper():
|
|
frappe.throw(
|
|
_("Unsupported function or operator: {0}").format(child_field),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Ensure child_fields_list is a list or tuple
|
|
if not isinstance(child_fields_list, list | tuple):
|
|
frappe.throw(
|
|
_("Child query fields for '{0}' must be a list or tuple.").format(child_field)
|
|
)
|
|
_parsed_fields.append(ChildQuery(child_field, list(child_fields_list), self.doctype))
|
|
# Return list as a dict entry might represent multiple child queries (though unlikely)
|
|
return _parsed_fields
|
|
|
|
# At this point, field must be a string (already validated and sanitized)
|
|
if not isinstance(field, str):
|
|
frappe.throw(_("Invalid field type: {0}").format(type(field)))
|
|
|
|
# Try parsing as dynamic field (link/child table access)
|
|
if parsed := DynamicTableField.parse(field, self.doctype):
|
|
return parsed
|
|
# Otherwise, parse as a standard field (simple, quoted, table-qualified, with/without alias)
|
|
else:
|
|
# Note: Comma handling is done in parse_fields before this method is called
|
|
return self.parse_string_field(field)
|
|
|
|
def apply_group_by(self, group_by: str | None = None):
|
|
parsed_group_by_fields = self._validate_group_by(group_by)
|
|
self.query = self.query.groupby(*parsed_group_by_fields)
|
|
|
|
def apply_order_by(self, order_by: str | None):
|
|
if not order_by or order_by == DefaultOrderBy:
|
|
self._apply_default_order_by()
|
|
return
|
|
|
|
parsed_order_fields = self._validate_order_by(order_by)
|
|
for order_field, order_direction in parsed_order_fields:
|
|
self.query = self.query.orderby(order_field, order=order_direction)
|
|
|
|
def _apply_default_order_by(self):
|
|
"""Apply default ordering based on configured DocType metadata"""
|
|
from pypika.enums import Order
|
|
|
|
sort_field, sort_order = get_doctype_sort_info(self.doctype)
|
|
|
|
# Handle multiple sort fields
|
|
if "," in sort_field:
|
|
for sort_spec in sort_field.split(","):
|
|
if parts := sort_spec.strip().split(maxsplit=1):
|
|
field_name = parts[0]
|
|
spec_order = parts[1].lower() if len(parts) > 1 else sort_order.lower()
|
|
field = self.table[field_name]
|
|
if self.db_query_compat:
|
|
order_direction = Order.desc if spec_order == "desc" else Order.asc
|
|
else:
|
|
order_direction = Order.asc if spec_order == "asc" else Order.desc
|
|
self.query = self.query.orderby(field, order=order_direction)
|
|
else:
|
|
field = self.table[sort_field]
|
|
if self.db_query_compat:
|
|
order_direction = Order.desc if sort_order.lower() == "desc" else Order.asc
|
|
else:
|
|
order_direction = Order.asc if sort_order.lower() == "asc" else Order.desc
|
|
self.query = self.query.orderby(field, order=order_direction)
|
|
|
|
def _parse_backtick_field_notation(self, field_name: str) -> tuple[str, str] | None:
|
|
"""
|
|
Parse backtick field notation like `tabDocType`.`fieldname` or `tabDocType`.fieldname and return (table_name, field_name).
|
|
Uses sqlparse for robust SQL parsing with Identifier support.
|
|
Returns None if the notation is invalid.
|
|
"""
|
|
import sqlparse
|
|
from sqlparse.sql import Identifier
|
|
|
|
# Parse the field name as SQL
|
|
parsed = sqlparse.parse(field_name.strip())
|
|
if not parsed or not parsed[0].tokens:
|
|
return None
|
|
|
|
tokens = parsed[0].tokens
|
|
|
|
# Filter out whitespace tokens
|
|
non_ws_tokens = [t for t in tokens if not t.is_whitespace]
|
|
|
|
if len(non_ws_tokens) != 1:
|
|
return None
|
|
|
|
# Check if it's an Identifier (which handles table.field notation)
|
|
first_token = non_ws_tokens[0]
|
|
if not isinstance(first_token, Identifier):
|
|
return None
|
|
|
|
# Get the sub-tokens within the identifier
|
|
# Should have: `tabTable` (Name), `.` (Punctuation), `fieldname` (Name)
|
|
identifier_tokens = [t for t in first_token.tokens if not t.is_whitespace]
|
|
|
|
if len(identifier_tokens) != 3:
|
|
return None
|
|
|
|
table_token = identifier_tokens[0]
|
|
dot_token = identifier_tokens[1]
|
|
field_token = identifier_tokens[2]
|
|
|
|
# Verify the dot
|
|
if str(dot_token).strip() != ".":
|
|
return None
|
|
|
|
# Extract and validate table name (should be backtick-quoted)
|
|
table_str = str(table_token).strip()
|
|
if not (table_str.startswith("`") and table_str.endswith("`")):
|
|
return None
|
|
|
|
# Extract field name (can be backtick-quoted or unquoted)
|
|
field_str = str(field_token).strip()
|
|
# Remove backticks if present
|
|
if field_str.startswith("`") and field_str.endswith("`"):
|
|
field_str = field_str[1:-1]
|
|
|
|
# Remove backticks from table name
|
|
table_name = table_str[1:-1]
|
|
field_name = field_str
|
|
|
|
# Validate table name starts with "tab"
|
|
if not table_name.startswith("tab"):
|
|
return None
|
|
|
|
# Extract doctype name by stripping "tab" prefix
|
|
doctype_name = table_name[3:]
|
|
|
|
# Validate doctype name is not empty and table actually exists
|
|
if not doctype_name or not frappe.db.table_exists(doctype_name):
|
|
return None
|
|
|
|
return (doctype_name, field_name)
|
|
|
|
def _validate_and_parse_field_for_clause(self, field_name: str, clause_name: str) -> Field:
|
|
"""
|
|
Common helper to validate and parse field names for GROUP BY and ORDER BY clauses.
|
|
|
|
Args:
|
|
field_name: The field name to validate and parse
|
|
clause_name: Name of the SQL clause (for error messages) - 'Group By' or 'Order By'
|
|
|
|
Returns:
|
|
Parsed Field object ready for use in pypika query
|
|
"""
|
|
if field_name.isdigit():
|
|
# For numeric field references, return as-is (will be handled by caller)
|
|
return field_name
|
|
|
|
# Allow function aliases and field aliases - return as Field (no table prefix)
|
|
if field_name in self.function_aliases or field_name in self.field_aliases:
|
|
return Field(field_name)
|
|
|
|
# Parse backtick table.field notation: `tabDocType`.`fieldname`
|
|
if "`" in field_name:
|
|
if parsed := self._parse_backtick_field_notation(field_name):
|
|
table_name, field_name = parsed
|
|
return frappe.qb.DocType(table_name)[field_name]
|
|
|
|
# If parsing failed, fall through to error handling below
|
|
frappe.throw(
|
|
_("{0} has invalid backtick notation: {1}").format(clause_name, field_name),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Try parsing as dynamic field (link_field.field or child_table.field)
|
|
dynamic_field = DynamicTableField.parse(field_name, self.doctype, allow_tab_notation=False)
|
|
if dynamic_field:
|
|
# Check permissions for dynamic field
|
|
if self.apply_permissions:
|
|
if isinstance(dynamic_field, ChildTableField):
|
|
self._check_field_permission(
|
|
dynamic_field.doctype, dynamic_field.fieldname, dynamic_field.parent_doctype
|
|
)
|
|
elif isinstance(dynamic_field, LinkTableField):
|
|
# Check permission for the link field in parent doctype
|
|
self._check_field_permission(self.doctype, dynamic_field.link_fieldname)
|
|
# Check permission for the target field in linked doctype
|
|
self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
|
|
|
# Apply join for the dynamic field
|
|
self.query = dynamic_field.apply_join(self.query)
|
|
return dynamic_field.field
|
|
else:
|
|
# Validate as simple field name (alphanumeric + underscore only)
|
|
if not re.fullmatch(r"\w+", field_name):
|
|
frappe.throw(
|
|
_(
|
|
"Invalid field format in {0}: {1}. Use 'field', 'link_field.field', or 'child_table.field'."
|
|
).format(clause_name, field_name),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Check permissions for simple field
|
|
if self.apply_permissions:
|
|
self._check_field_permission(self.doctype, field_name)
|
|
|
|
# Create Field object for simple field
|
|
return self.table[field_name]
|
|
|
|
def _validate_group_by(self, group_by: str) -> list[Field]:
|
|
"""Validate the group_by string argument, apply joins for dynamic fields, and return parsed Field objects."""
|
|
if not isinstance(group_by, str):
|
|
frappe.throw(_("Group By must be a string"), TypeError)
|
|
|
|
parsed_fields = []
|
|
parts = COMMA_PATTERN.split(group_by)
|
|
for part in parts:
|
|
field_name = part.strip()
|
|
if not field_name:
|
|
continue
|
|
|
|
# Skip permission check for optional system columns in group_by
|
|
if field_name in OPTIONAL_COLUMNS:
|
|
parsed_fields.append(self.table[field_name])
|
|
else:
|
|
parsed_field = self._validate_and_parse_field_for_clause(field_name, "Group By")
|
|
parsed_fields.append(parsed_field)
|
|
|
|
return parsed_fields
|
|
|
|
def _validate_order_by(self, order_by: str) -> list[tuple[Field | str, Order]]:
|
|
"""Validate the order_by string argument, apply joins for dynamic fields, and return parsed Field objects with directions."""
|
|
if not isinstance(order_by, str):
|
|
frappe.throw(_("Order By must be a string"), TypeError)
|
|
|
|
valid_directions = {"asc", "desc"}
|
|
parsed_order_fields = []
|
|
|
|
for declaration in order_by.split(","):
|
|
if _order_by := declaration.strip():
|
|
# Extract direction from end of declaration (handles backtick identifiers with spaces)
|
|
# Check if the last word is a valid direction
|
|
parts = _order_by.split()
|
|
direction = None
|
|
field_name = _order_by
|
|
|
|
if len(parts) > 1 and parts[-1].lower() in valid_directions:
|
|
# Last part is a direction, so field_name is everything before it
|
|
direction = parts[-1].lower()
|
|
field_name = " ".join(parts[:-1])
|
|
|
|
if self.db_query_compat:
|
|
order_direction = Order.desc if direction == "desc" else Order.asc
|
|
else:
|
|
order_direction = Order.asc if direction == "asc" else Order.desc
|
|
|
|
parsed_field = self._validate_and_parse_field_for_clause(field_name, "Order By")
|
|
parsed_order_fields.append((parsed_field, order_direction))
|
|
|
|
if direction and direction not in valid_directions:
|
|
frappe.throw(
|
|
_("Invalid direction in Order By: {0}. Must be 'ASC' or 'DESC'.").format(direction),
|
|
ValueError,
|
|
)
|
|
|
|
return parsed_order_fields
|
|
|
|
def check_read_permission(self):
|
|
"""Check if user has read permission on the doctype"""
|
|
|
|
def has_permission(ptype):
|
|
return frappe.has_permission(
|
|
self.doctype,
|
|
ptype,
|
|
user=self.user,
|
|
parent_doctype=self.parent_doctype,
|
|
)
|
|
|
|
if not has_permission("select") and not has_permission("read"):
|
|
# Check for shared documents
|
|
if not frappe.share.get_shared(self.doctype, self.user):
|
|
frappe.throw(
|
|
_("Insufficient Permission for {0}").format(frappe.bold(self.doctype)),
|
|
frappe.PermissionError,
|
|
)
|
|
|
|
def apply_field_permissions(self):
|
|
"""Filter the list of fields based on permlevel."""
|
|
allowed_fields = []
|
|
parent_permission_type = self.get_permission_type(self.doctype)
|
|
permitted_fields_cache = {}
|
|
|
|
def get_cached_permitted_fields(doctype, parenttype, permission_type):
|
|
cache_key = (doctype, parenttype, permission_type)
|
|
if cache_key not in permitted_fields_cache:
|
|
permitted_fields_cache[cache_key] = set(
|
|
get_permitted_fields(
|
|
doctype=doctype,
|
|
parenttype=parenttype,
|
|
permission_type=permission_type,
|
|
ignore_virtual=True,
|
|
)
|
|
)
|
|
return permitted_fields_cache[cache_key]
|
|
|
|
permitted_fields_set = get_cached_permitted_fields(
|
|
self.doctype, self.parent_doctype, parent_permission_type
|
|
)
|
|
|
|
for field in self.fields:
|
|
if isinstance(field, ChildTableField):
|
|
if parent_permission_type == "select":
|
|
# Skip child table fields if parent permission is only 'select'
|
|
continue
|
|
|
|
# Cache permitted fields for child doctypes if accessed multiple times
|
|
permitted_child_fields_set = get_cached_permitted_fields(
|
|
field.doctype, field.parent_doctype, self.get_permission_type(field.doctype)
|
|
)
|
|
# Check permission for the specific field in the child table
|
|
if field.fieldname in permitted_child_fields_set:
|
|
allowed_fields.append(field)
|
|
elif isinstance(field, LinkTableField):
|
|
# Check permission for the link field *in the parent doctype*
|
|
if field.link_fieldname in permitted_fields_set:
|
|
# Also check if user has permission to read/select the target doctype
|
|
target_doctype = field.doctype
|
|
has_target_perm = frappe.has_permission(
|
|
target_doctype, "select", user=self.user
|
|
) or frappe.has_permission(target_doctype, "read", user=self.user)
|
|
|
|
if has_target_perm:
|
|
# Finally, check if the specific field *in the target doctype* is permitted
|
|
permitted_target_fields_set = get_cached_permitted_fields(
|
|
target_doctype, None, self.get_permission_type(target_doctype)
|
|
)
|
|
if field.fieldname in permitted_target_fields_set:
|
|
allowed_fields.append(field)
|
|
elif isinstance(field, ChildQuery):
|
|
if parent_permission_type == "select":
|
|
# Skip child queries if parent permission is only 'select'
|
|
continue
|
|
|
|
# Cache permitted fields for the child doctype of the query
|
|
permitted_child_fields_set = get_cached_permitted_fields(
|
|
field.doctype, field.parent_doctype, self.get_permission_type(field.doctype)
|
|
)
|
|
# Filter the fields *within* the ChildQuery object based on permissions
|
|
field.fields = [f for f in field.fields if f in permitted_child_fields_set]
|
|
# Only add the child query if it still has fields after filtering
|
|
if field.fields:
|
|
allowed_fields.append(field)
|
|
elif isinstance(field, Field):
|
|
if field.name == "*":
|
|
# Expand '*' to include all permitted fields
|
|
# Avoid reparsing '*' recursively by passing the actual list
|
|
allowed_fields.extend(self.parse_fields(list(permitted_fields_set)))
|
|
# Check if the field name (without alias) is permitted
|
|
elif field.name in permitted_fields_set:
|
|
allowed_fields.append(field)
|
|
# Handle cases where the field might be aliased but the base name is permitted
|
|
elif hasattr(field, "alias") and field.alias and field.name in permitted_fields_set:
|
|
allowed_fields.append(field)
|
|
|
|
elif isinstance(field, Term):
|
|
# Allow any Term subclass, like LiteralValue (raw SQL expressions), AggregateFunction, PseudoColumnMapper (functions or complex terms)
|
|
allowed_fields.append(field)
|
|
|
|
return allowed_fields
|
|
|
|
def get_user_permission_conditions(self, role_permissions):
|
|
"""Build conditions for user permissions and return tuple of (conditions, fetch_shared_docs)"""
|
|
conditions = []
|
|
fetch_shared_docs = False
|
|
|
|
# add user permission only if role has read perm
|
|
if not (role_permissions.get("read") or role_permissions.get("select")):
|
|
return conditions, fetch_shared_docs
|
|
|
|
user_permissions = frappe.permissions.get_user_permissions(self.user)
|
|
|
|
if not user_permissions:
|
|
return conditions, fetch_shared_docs
|
|
|
|
fetch_shared_docs = True
|
|
|
|
doctype_link_fields = self.get_doctype_link_fields()
|
|
for df in doctype_link_fields:
|
|
if df.get("ignore_user_permissions"):
|
|
continue
|
|
|
|
user_permission_values = user_permissions.get(df.get("options"), {})
|
|
if user_permission_values:
|
|
docs = []
|
|
for permission in user_permission_values:
|
|
if not permission.get("applicable_for"):
|
|
docs.append(permission.get("doc"))
|
|
# append docs based on user permission applicable on reference doctype
|
|
# this is useful when getting list of docs from a link field
|
|
# in this case parent doctype of the link
|
|
# will be the reference doctype
|
|
elif df.get("fieldname") == "name" and self.reference_doctype:
|
|
if permission.get("applicable_for") == self.reference_doctype:
|
|
docs.append(permission.get("doc"))
|
|
elif permission.get("applicable_for") == self.doctype:
|
|
docs.append(permission.get("doc"))
|
|
|
|
if docs:
|
|
field_name = df.get("fieldname")
|
|
strict_user_permissions = frappe.get_system_settings("apply_strict_user_permissions")
|
|
if strict_user_permissions:
|
|
conditions.append(self.table[field_name].isin(docs))
|
|
else:
|
|
empty_value_condition = functions.IfNull(self.table[field_name], "") == ""
|
|
value_condition = self.table[field_name].isin(docs)
|
|
conditions.append(empty_value_condition | value_condition)
|
|
|
|
return conditions, fetch_shared_docs
|
|
|
|
def get_doctype_link_fields(self):
|
|
meta = frappe.get_meta(self.doctype)
|
|
# append current doctype with fieldname as 'name' as first link field
|
|
doctype_link_fields = [{"options": self.doctype, "fieldname": "name"}]
|
|
# append other link fields
|
|
doctype_link_fields.extend(meta.get_link_fields())
|
|
return doctype_link_fields
|
|
|
|
def add_permission_conditions(self):
|
|
conditions = []
|
|
role_permissions = frappe.permissions.get_role_permissions(self.doctype, user=self.user)
|
|
fetch_shared_docs = False
|
|
|
|
if self.requires_owner_constraint(role_permissions):
|
|
fetch_shared_docs = True
|
|
conditions.append(self.table.owner == self.user)
|
|
# skip user perm check if owner constraint is required
|
|
elif role_permissions.get("read") or role_permissions.get("select"):
|
|
user_perm_conditions, fetch_shared = self.get_user_permission_conditions(role_permissions)
|
|
conditions.extend(user_perm_conditions)
|
|
fetch_shared_docs = fetch_shared_docs or fetch_shared
|
|
else:
|
|
# No role permissions - check if user has any user permissions
|
|
# If not, must check for shared documents (like db_query does)
|
|
user_permissions = frappe.permissions.get_user_permissions(self.user)
|
|
doctype_user_permissions = user_permissions.get(self.doctype, [])
|
|
has_user_perm = any(
|
|
not perm.get("applicable_for") or perm.get("applicable_for") == self.reference_doctype
|
|
for perm in doctype_user_permissions
|
|
)
|
|
if not has_user_perm:
|
|
fetch_shared_docs = True
|
|
|
|
permission_query_conditions = self.get_permission_query_conditions()
|
|
if permission_query_conditions:
|
|
conditions.extend(permission_query_conditions)
|
|
|
|
shared_docs = []
|
|
if fetch_shared_docs:
|
|
shared_docs = frappe.share.get_shared(self.doctype, self.user)
|
|
|
|
if shared_docs:
|
|
shared_condition = self.table.name.isin(shared_docs)
|
|
if conditions:
|
|
# (permission conditions) OR (shared condition)
|
|
self.query = self.query.where(Criterion.all(conditions) | shared_condition)
|
|
else:
|
|
self.query = self.query.where(shared_condition)
|
|
elif conditions:
|
|
# AND all permission conditions
|
|
self.query = self.query.where(Criterion.all(conditions))
|
|
|
|
def get_permission_query_conditions(self):
|
|
"""Add permission query conditions from hooks and server scripts"""
|
|
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
|
|
|
|
conditions = []
|
|
hooks = frappe.get_hooks("permission_query_conditions", {})
|
|
condition_methods = hooks.get(self.doctype, []) + hooks.get("*", [])
|
|
|
|
for method in condition_methods:
|
|
if c := frappe.call(frappe.get_attr(method), self.user, doctype=self.doctype):
|
|
conditions.append(RawCriterion(f"({c})"))
|
|
|
|
# Get conditions from server scripts
|
|
if permission_script_name := get_server_script_map().get("permission_query", {}).get(self.doctype):
|
|
script = frappe.get_doc("Server Script", permission_script_name)
|
|
if condition := script.get_permission_query_conditions(self.user):
|
|
conditions.append(RawCriterion(f"({condition})"))
|
|
|
|
return conditions
|
|
|
|
def get_permission_type(self, doctype) -> str:
|
|
"""Get permission type (select/read) based on user permissions"""
|
|
if frappe.only_has_select_perm(doctype, user=self.user):
|
|
return "select"
|
|
return "read"
|
|
|
|
def requires_owner_constraint(self, role_permissions):
|
|
"""Return True if "select" or "read" isn't available without being creator."""
|
|
if not role_permissions.get("has_if_owner_enabled"):
|
|
return
|
|
|
|
if_owner_perms = role_permissions.get("if_owner")
|
|
if not if_owner_perms:
|
|
return
|
|
|
|
# has select or read without if owner, no need for constraint
|
|
for perm_type in ("select", "read"):
|
|
if role_permissions.get(perm_type) and perm_type not in if_owner_perms:
|
|
return
|
|
|
|
# not checking if either select or read if present in if_owner_perms
|
|
# because either of those is required to perform a query
|
|
return True
|
|
|
|
def _is_field_nullable(self, doctype: str, fieldname: str) -> bool:
|
|
"""Check if a field can contain NULL values."""
|
|
# primary key is never nullable, modified is usually indexed by default and always present
|
|
if fieldname in ("name", "modified", "creation"):
|
|
return False
|
|
|
|
try:
|
|
# Use cached meta to avoid recursion when loading meta
|
|
if (meta := frappe.client_cache.get_value(f"doctype_meta::{doctype}")) is None:
|
|
return True
|
|
|
|
if (df := meta.get_field(fieldname)) is None:
|
|
return True
|
|
|
|
except Exception:
|
|
return True
|
|
|
|
if df.fieldtype in ("Check", "Float", "Int", "Currency", "Percent"):
|
|
return False
|
|
|
|
if getattr(df, "not_nullable", False):
|
|
return False
|
|
|
|
return True
|
|
|
|
def _get_ifnull_fallback(self, doctype: str, fieldname: str) -> str:
|
|
"""Get type-appropriate fallback value for NULL comparisons."""
|
|
try:
|
|
meta = frappe.get_meta(doctype)
|
|
df = meta.get_field(fieldname)
|
|
except Exception:
|
|
return "''"
|
|
|
|
if df is None:
|
|
return "''"
|
|
|
|
fieldtype = df.fieldtype
|
|
|
|
if fieldtype in ("Link", "Data", "Dynamic Link"):
|
|
return "''"
|
|
|
|
if fieldtype in ("Date", "Datetime"):
|
|
return "'0001-01-01'"
|
|
|
|
if fieldtype == "Time":
|
|
return "'00:00:00'"
|
|
|
|
if fieldtype in ("Float", "Int", "Currency", "Percent"):
|
|
return "0"
|
|
|
|
try:
|
|
db_type_info = frappe.db.type_map.get(fieldtype, ("varchar",))
|
|
if db_type_info:
|
|
db_type = db_type_info[0] if isinstance(db_type_info, (tuple, list)) else db_type_info
|
|
if db_type in ("varchar", "text", "longtext", "smalltext", "json"):
|
|
return "''"
|
|
except Exception:
|
|
pass
|
|
|
|
return "''"
|
|
|
|
def _should_apply_ifnull(self, doctype: str, fieldname: str, operator: str, value: Any) -> bool:
|
|
"""Determine if IFNULL wrapping is needed for a filter condition."""
|
|
# Skip this if we don't need db_query compatibility
|
|
if not self.db_query_compat:
|
|
return False
|
|
|
|
if not self._is_field_nullable(doctype, fieldname):
|
|
return False
|
|
|
|
if value is None:
|
|
return False
|
|
|
|
if operator.lower() in ("like", "is"):
|
|
return False
|
|
|
|
# For "=" operator, only skip IFNULL if value is truthy (non-empty string, non-zero, etc)
|
|
# When value is empty string "", we need to check for NULL values too
|
|
if operator.lower() == "=" and value:
|
|
return False
|
|
|
|
try:
|
|
meta = frappe.get_meta(doctype)
|
|
df = meta.get_field(fieldname)
|
|
except Exception:
|
|
df = None
|
|
|
|
is_datetime_field = df and df.fieldtype in ("Date", "Datetime") if df else False
|
|
is_creation_or_modified = fieldname in ("creation", "modified")
|
|
|
|
if operator.lower() in (">", ">="):
|
|
# Null values can never be greater than any non-null value
|
|
if is_datetime_field or is_creation_or_modified:
|
|
return False
|
|
|
|
if operator.lower() == "between":
|
|
# Between operator never needs to check for null
|
|
# Explanation: Consider SQL -> `COLUMN between X and Y`
|
|
# Actual computation:
|
|
# for row in rows:
|
|
# if Y > row.COLUMN > X:
|
|
# yield row
|
|
|
|
# Since Y and X can't be null, null value in column will never match filter, so
|
|
# coalesce is extra cost that prevents index usage
|
|
if is_datetime_field or is_creation_or_modified:
|
|
return False
|
|
|
|
if operator.lower() == "in":
|
|
if isinstance(value, (list, tuple)):
|
|
# if values contain '' or falsy values then only coalesce column
|
|
# for `in` query this is only required if values contain '' or values are empty.
|
|
has_null_or_empty = any(v is None or v == "" for v in value)
|
|
return has_null_or_empty
|
|
return False
|
|
|
|
# for `not in` queries we can't be sure as column values might contain null.
|
|
if operator.lower() == "not in":
|
|
return True
|
|
|
|
if operator.lower() == "<":
|
|
if is_datetime_field or is_creation_or_modified:
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
class Permission:
|
|
@classmethod
|
|
def check_permissions(cls, query, **kwargs):
|
|
if not isinstance(query, str):
|
|
query = query.get_sql()
|
|
|
|
doctype = cls.get_tables_from_query(query)
|
|
if isinstance(doctype, str):
|
|
doctype = [doctype]
|
|
|
|
for dt in doctype:
|
|
dt = TAB_PATTERN.sub("", dt)
|
|
if not frappe.has_permission(
|
|
dt,
|
|
"select",
|
|
user=kwargs.get("user"),
|
|
parent_doctype=kwargs.get("parent_doctype"),
|
|
) and not frappe.has_permission(
|
|
dt,
|
|
"read",
|
|
user=kwargs.get("user"),
|
|
parent_doctype=kwargs.get("parent_doctype"),
|
|
):
|
|
frappe.throw(
|
|
_("Insufficient Permission for {0}").format(frappe.bold(dt)), frappe.PermissionError
|
|
)
|
|
|
|
@staticmethod
|
|
def get_tables_from_query(query: str):
|
|
return [table for table in WORDS_PATTERN.findall(query) if table.startswith("tab")]
|
|
|
|
|
|
class DynamicTableField:
|
|
def __init__(
|
|
self,
|
|
doctype: str,
|
|
fieldname: str,
|
|
parent_doctype: str,
|
|
alias: str | None = None,
|
|
) -> None:
|
|
self.doctype = doctype
|
|
self.fieldname = fieldname
|
|
self.alias = alias
|
|
self.parent_doctype = parent_doctype
|
|
|
|
def __str__(self) -> str:
|
|
table_name = f"`tab{self.doctype}`"
|
|
fieldname = f"`{self.fieldname}`"
|
|
if frappe.db.db_type == "postgres":
|
|
table_name = table_name.replace("`", '"')
|
|
fieldname = fieldname.replace("`", '"')
|
|
alias = f"AS {self.alias}" if self.alias else ""
|
|
return f"{table_name}.{fieldname} {alias}".strip()
|
|
|
|
@staticmethod
|
|
def parse(field: str, doctype: str, allow_tab_notation: bool = True):
|
|
if "." in field:
|
|
alias = None
|
|
# Handle 'as' alias, case-insensitive, taking the last occurrence
|
|
if " as " in field.lower():
|
|
parts = re.split(r"\s+as\s+", field, flags=re.IGNORECASE)
|
|
if len(parts) > 1:
|
|
field_part = parts[0].strip()
|
|
alias = parts[-1].strip().strip('`"') # Get last part as alias
|
|
field = field_part # Use the part before alias for further parsing
|
|
|
|
child_match = None
|
|
if allow_tab_notation:
|
|
# Regex to match `tabDoc`.`field`, "tabDoc"."field", tabDoc.field
|
|
# Group 1: Doctype name (without 'tab')
|
|
# Group 2: Optional quote for fieldname
|
|
# Group 3: Fieldname
|
|
# Ensures quotes are consistent or absent on fieldname using backreference \2
|
|
# Uses re.match to ensure the pattern matches the *entire* field string
|
|
# Allow spaces in doctype name (Group 1) and field name (Group 3)
|
|
child_match = re.match(r'[`"]?tab([\w\s]+)[`"]?\.([`"]?)([\w\s]+)\2$', field)
|
|
|
|
if child_match:
|
|
child_doctype_name = child_match.group(1)
|
|
child_field = child_match.group(3)
|
|
|
|
if child_doctype_name == doctype:
|
|
# Referencing a field in the main doctype using `tabDoctype.field` notation.
|
|
# This should be handled by the standard field parser, not as a DynamicTableField.
|
|
return None
|
|
# Found a child table reference like tabChildDoc.child_field
|
|
# Note: parent_fieldname is None here as it's directly specified via tab notation
|
|
return ChildTableField(child_doctype_name, child_field, doctype, alias=alias)
|
|
else:
|
|
# Try parsing as LinkTableField (link_field.target_field) or ChildTableField (child_field.target_field)
|
|
# This handles patterns not starting with 'tab' prefix
|
|
if "." not in field: # Should not happen due to outer check, but safety
|
|
return None
|
|
|
|
parts = field.split(".", 1)
|
|
if len(parts) != 2: # Ensure it splits into exactly two parts
|
|
return None
|
|
potential_parent_fieldname, target_fieldname = parts
|
|
|
|
# Basic validation for the parts to avoid unnecessary metadata lookups on invalid input
|
|
# We expect simple identifiers here. Quoted/complex names are handled elsewhere or by child_match.
|
|
if (
|
|
not potential_parent_fieldname.replace("_", "").isalnum()
|
|
or not target_fieldname.replace("_", "").isalnum()
|
|
):
|
|
return None
|
|
|
|
try:
|
|
meta = frappe.get_meta(doctype) # Get meta of the *parent* doctype
|
|
# Check if the first part is a valid fieldname in the parent doctype
|
|
if not meta.has_field(potential_parent_fieldname):
|
|
return None # Not a field in the parent, so not link/child access pattern
|
|
|
|
linked_field = meta.get_field(potential_parent_fieldname)
|
|
except Exception:
|
|
# Handle cases where doctype doesn't exist, etc.
|
|
print(f"Error getting metadata for {doctype} while parsing field {field}")
|
|
return None
|
|
|
|
if linked_field:
|
|
linked_doctype = linked_field.options
|
|
if linked_field.fieldtype == "Link":
|
|
# It's a Link field access: parent_doctype.link_fieldname.target_fieldname
|
|
return LinkTableField(
|
|
linked_doctype, target_fieldname, doctype, potential_parent_fieldname, alias=alias
|
|
)
|
|
elif linked_field.fieldtype in frappe.model.table_fields:
|
|
# It's a Child Table field access: parent_doctype.child_table_fieldname.target_fieldname
|
|
return ChildTableField(
|
|
linked_doctype, target_fieldname, doctype, potential_parent_fieldname, alias=alias
|
|
)
|
|
|
|
return None
|
|
|
|
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
|
raise NotImplementedError
|
|
|
|
|
|
class ChildTableField(DynamicTableField):
|
|
def __init__(
|
|
self,
|
|
doctype: str,
|
|
fieldname: str,
|
|
parent_doctype: str,
|
|
parent_fieldname: str | None = None,
|
|
alias: str | None = None,
|
|
) -> None:
|
|
self.doctype = doctype
|
|
self.fieldname = fieldname
|
|
self.alias = alias
|
|
self.parent_doctype = parent_doctype
|
|
self.parent_fieldname = parent_fieldname
|
|
self.table = frappe.qb.DocType(self.doctype)
|
|
self.field = self.table[self.fieldname]
|
|
|
|
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
|
table = frappe.qb.DocType(self.doctype)
|
|
query = self.apply_join(query)
|
|
return query.select(getattr(table, self.fieldname).as_(self.alias or None))
|
|
|
|
def apply_join(self, query: QueryBuilder) -> QueryBuilder:
|
|
main_table = frappe.qb.DocType(self.parent_doctype)
|
|
if not query.is_joined(self.table):
|
|
join_conditions = (self.table.parent == main_table.name) & (
|
|
self.table.parenttype == self.parent_doctype
|
|
)
|
|
if self.parent_fieldname:
|
|
join_conditions &= self.table.parentfield == self.parent_fieldname
|
|
query = query.left_join(self.table).on(join_conditions)
|
|
return query
|
|
|
|
|
|
class LinkTableField(DynamicTableField):
|
|
def __init__(
|
|
self,
|
|
doctype: str,
|
|
fieldname: str,
|
|
parent_doctype: str,
|
|
link_fieldname: str,
|
|
alias: str | None = None,
|
|
) -> None:
|
|
super().__init__(doctype, fieldname, parent_doctype, alias=alias)
|
|
self.link_fieldname = link_fieldname
|
|
self.table = frappe.qb.DocType(self.doctype)
|
|
self.field = self.table[self.fieldname]
|
|
|
|
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
|
table = frappe.qb.DocType(self.doctype)
|
|
query = self.apply_join(query)
|
|
return query.select(getattr(table, self.fieldname).as_(self.alias or None))
|
|
|
|
def apply_join(self, query: QueryBuilder) -> QueryBuilder:
|
|
table = frappe.qb.DocType(self.doctype)
|
|
main_table = frappe.qb.DocType(self.parent_doctype)
|
|
if not query.is_joined(table):
|
|
query = query.left_join(table).on(table.name == getattr(main_table, self.link_fieldname))
|
|
return query
|
|
|
|
|
|
class ChildQuery:
|
|
def __init__(
|
|
self,
|
|
fieldname: str,
|
|
fields: list,
|
|
parent_doctype: str,
|
|
) -> None:
|
|
field = frappe.get_meta(parent_doctype).get_field(fieldname)
|
|
if field.fieldtype not in frappe.model.table_fields:
|
|
return
|
|
self.fieldname = fieldname
|
|
self.fields = fields
|
|
self.parent_doctype = parent_doctype
|
|
self.doctype = field.options
|
|
|
|
def get_query(self, parent_names=None) -> QueryBuilder:
|
|
filters = {
|
|
"parenttype": self.parent_doctype,
|
|
"parentfield": self.fieldname,
|
|
"parent": ["in", parent_names],
|
|
}
|
|
return frappe.qb.get_query(
|
|
self.doctype,
|
|
fields=[*self.fields, "parent", "parentfield"],
|
|
filters=filters,
|
|
order_by="idx asc",
|
|
)
|
|
|
|
|
|
def get_nested_set_hierarchy_result(doctype: str, name: str, hierarchy: str) -> list[str]:
|
|
"""Get matching nodes based on operator."""
|
|
table = frappe.qb.DocType(doctype)
|
|
try:
|
|
lft, rgt = frappe.qb.from_(table).select("lft", "rgt").where(table.name == name).run()[0]
|
|
except IndexError:
|
|
lft, rgt = None, None
|
|
|
|
if hierarchy in ("descendants of", "not descendants of", "descendants of (inclusive)"):
|
|
result = (
|
|
frappe.qb.from_(table)
|
|
.select(table.name)
|
|
.where(table.lft > lft)
|
|
.where(table.rgt < rgt)
|
|
.orderby(table.lft, order=Order.asc)
|
|
.run(pluck=True)
|
|
)
|
|
if hierarchy == "descendants of (inclusive)":
|
|
result += [name]
|
|
else:
|
|
# Get ancestor elements of a DocType with a tree structure
|
|
result = (
|
|
frappe.qb.from_(table)
|
|
.select(table.name)
|
|
.where(table.lft < lft)
|
|
.where(table.rgt > rgt)
|
|
.orderby(table.lft, order=Order.desc)
|
|
.run(pluck=True)
|
|
)
|
|
return result
|
|
|
|
|
|
@lru_cache(maxsize=1024)
|
|
def _validate_select_field(field: str):
|
|
"""Validate a field string intended for use in a SELECT clause."""
|
|
if field == "*":
|
|
return
|
|
|
|
if field.isdigit():
|
|
return
|
|
|
|
# Reject SQL functions
|
|
if _is_function_call(field):
|
|
frappe.throw(
|
|
_(
|
|
"SQL functions are not allowed in SELECT fields: {0}. Use the query builder API with functions instead."
|
|
).format(field),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
if ALLOWED_FIELD_PATTERN.match(field):
|
|
return
|
|
|
|
frappe.throw(
|
|
_(
|
|
"Invalid field format for SELECT: {0}. Field names must be simple, backticked, table-qualified, aliased, or '*'."
|
|
).format(field),
|
|
frappe.PermissionError,
|
|
)
|
|
|
|
|
|
@lru_cache(maxsize=1024)
|
|
def _sanitize_field(field: str, is_mariadb):
|
|
"""Validate and sanitize a field string for SELECT clause by stripping comments."""
|
|
_validate_select_field(field)
|
|
|
|
stripped_field = sqlparse.format(field, strip_comments=True, keyword_case="lower")
|
|
|
|
if is_mariadb:
|
|
stripped_field = MARIADB_SPECIFIC_COMMENT.sub("", stripped_field)
|
|
|
|
return stripped_field.strip()
|
|
|
|
|
|
class RawCriterion(Term):
|
|
"""A class to represent raw SQL string as a criterion.
|
|
|
|
Allows using raw SQL strings in pypika queries:
|
|
frappe.qb.from_("DocType").where(RawCriterion("name like 'a%'"))
|
|
"""
|
|
|
|
def __init__(self, sql_string: str):
|
|
self.sql_string = sql_string
|
|
super().__init__()
|
|
|
|
def get_sql(self, **kwargs: Any) -> str:
|
|
return self.sql_string
|
|
|
|
def __and__(self, other):
|
|
return CombinedRawCriterion(self, other, "AND")
|
|
|
|
def __or__(self, other):
|
|
return CombinedRawCriterion(self, other, "OR")
|
|
|
|
def __invert__(self):
|
|
return RawCriterion(f"NOT ({self.sql_string})")
|
|
|
|
|
|
class CombinedRawCriterion(RawCriterion):
|
|
def __init__(self, left, right, operator):
|
|
self.left = left
|
|
self.right = right
|
|
self.operator = operator
|
|
super(RawCriterion, self).__init__()
|
|
|
|
def get_sql(self, **kwargs: Any) -> str:
|
|
left_sql = self.left.get_sql(**kwargs) if hasattr(self.left, "get_sql") else str(self.left)
|
|
right_sql = self.right.get_sql(**kwargs) if hasattr(self.right, "get_sql") else str(self.right)
|
|
return f"({left_sql}) {self.operator} ({right_sql})"
|
|
|
|
|
|
class SQLFunctionParser:
|
|
"""Parser for SQL function dictionaries in query builder fields."""
|
|
|
|
def __init__(self, engine):
|
|
self.engine = engine
|
|
|
|
def is_function_dict(self, field_dict: dict) -> bool:
|
|
"""Check if a dictionary represents a SQL function definition."""
|
|
function_keys = [k for k in field_dict.keys() if k.lower() != "as"]
|
|
return len(function_keys) == 1 and function_keys[0] in FUNCTION_MAPPING
|
|
|
|
def is_operator_dict(self, field_dict: dict) -> bool:
|
|
"""Check if a dictionary represents an arithmetic operator expression.
|
|
|
|
Example: {"ADD": [1, 2], "as": "sum"} or {"DIV": ["total", "count"]}
|
|
"""
|
|
operator_keys = [k for k in field_dict.keys() if k.lower() != "as"]
|
|
return len(operator_keys) == 1 and operator_keys[0] in OPERATOR_MAPPING
|
|
|
|
def _extract_dict_components(self, d: dict, valid_keys: dict, error_msg: str) -> tuple:
|
|
"""Extract name, alias, and args from function/operator dict."""
|
|
name = None
|
|
alias = None
|
|
args = None
|
|
|
|
for key, value in d.items():
|
|
if key.lower() == "as":
|
|
alias = value
|
|
else:
|
|
name = key
|
|
args = value
|
|
|
|
if not name:
|
|
frappe.throw(_("Invalid {0} dictionary format").format(error_msg), frappe.ValidationError)
|
|
|
|
if name not in valid_keys:
|
|
frappe.throw(_("Unsupported {0}: {1}").format(error_msg, name), frappe.ValidationError)
|
|
|
|
if alias:
|
|
self._validate_alias(alias)
|
|
|
|
return name, alias, args
|
|
|
|
def parse_function(self, function_dict: dict) -> Field:
|
|
"""Parse a SQL function dictionary into a pypika function call."""
|
|
function_name, alias, function_args = self._extract_dict_components(
|
|
function_dict, FUNCTION_MAPPING, "function or invalid field name"
|
|
)
|
|
|
|
func_class = FUNCTION_MAPPING[function_name]
|
|
|
|
if isinstance(function_args, str):
|
|
parsed_arg = self._parse_and_validate_argument(function_args)
|
|
function_call = func_class(parsed_arg)
|
|
elif isinstance(function_args, list):
|
|
parsed_args = []
|
|
for arg in function_args:
|
|
parsed_arg = self._parse_and_validate_argument(arg)
|
|
parsed_args.append(parsed_arg)
|
|
function_call = func_class(*parsed_args)
|
|
elif isinstance(function_args, (int | float)):
|
|
function_call = func_class(function_args)
|
|
elif function_args is None:
|
|
try:
|
|
function_call = func_class()
|
|
except TypeError:
|
|
frappe.throw(
|
|
_("Function {0} requires arguments but none were provided").format(function_name),
|
|
frappe.ValidationError,
|
|
)
|
|
else:
|
|
frappe.throw(
|
|
_(
|
|
"Invalid function argument type: {0}. Only strings, numbers, lists, and None are allowed."
|
|
).format(type(function_args).__name__),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
if alias:
|
|
self.engine.function_aliases.add(alias)
|
|
return function_call.as_(alias)
|
|
else:
|
|
return function_call
|
|
|
|
def parse_operator(self, operator_dict: dict) -> ArithmeticExpression:
|
|
"""Parse an arithmetic operator dictionary into a pypika ArithmeticExpression.
|
|
|
|
Operators require exactly 2 arguments (left and right operands).
|
|
Arguments can be: numbers, field names, nested functions, or nested operators.
|
|
Example: {"DIV": [1, {"NULLIF": [{"LOCATE": ["'test'", "name"]}, 0]}]}
|
|
"""
|
|
operator_name, alias, operator_args = self._extract_dict_components(
|
|
operator_dict, OPERATOR_MAPPING, "operator"
|
|
)
|
|
|
|
operator = OPERATOR_MAPPING[operator_name]
|
|
|
|
# Operators require exactly 2 arguments (left and right operands)
|
|
if not isinstance(operator_args, list) or len(operator_args) != 2:
|
|
frappe.throw(
|
|
_("Operator {0} requires exactly 2 arguments (left and right operands)").format(
|
|
operator_name
|
|
),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Parse and validate both operands (supports nested functions/operators)
|
|
left = self._parse_and_validate_argument(operator_args[0])
|
|
right = self._parse_and_validate_argument(operator_args[1])
|
|
|
|
# Wrap raw values (numbers, strings) in ValueWrapper so pypika can process them
|
|
if not isinstance(left, Term):
|
|
left = ValueWrapper(left)
|
|
if not isinstance(right, Term):
|
|
right = ValueWrapper(right)
|
|
|
|
expression = ArithmeticExpression(operator=operator, left=left, right=right)
|
|
|
|
if alias:
|
|
self.engine.function_aliases.add(alias)
|
|
return expression.as_(alias)
|
|
else:
|
|
return expression
|
|
|
|
def _parse_and_validate_argument(self, arg):
|
|
"""Parse and validate a single function/operator argument against SQL injection.
|
|
|
|
Supports:
|
|
- Numbers: 1, 2.5, etc.
|
|
- Strings: field names or quoted literals
|
|
- Nested dicts: functions {"COUNT": "name"} or operators {"ADD": [1, 2]}
|
|
"""
|
|
if isinstance(arg, (int | float)):
|
|
return arg
|
|
elif isinstance(arg, str):
|
|
return self._validate_string_argument(arg)
|
|
elif isinstance(arg, dict):
|
|
# Recursively handle nested functions and operators
|
|
if self.is_function_dict(arg):
|
|
return self.parse_function(arg)
|
|
elif self.is_operator_dict(arg):
|
|
return self.parse_operator(arg)
|
|
else:
|
|
frappe.throw(
|
|
_("Invalid nested expression: dictionary must represent a function or operator"),
|
|
frappe.ValidationError,
|
|
)
|
|
elif arg is None:
|
|
# None is allowed for some functions
|
|
return arg
|
|
else:
|
|
frappe.throw(
|
|
_("Invalid argument type: {0}. Only strings, numbers, dicts, and None are allowed.").format(
|
|
type(arg).__name__
|
|
),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
def _validate_string_argument(self, arg: str):
|
|
"""Validate string arguments to prevent SQL injection."""
|
|
arg = arg.strip()
|
|
|
|
if not arg:
|
|
frappe.throw(_("Empty string arguments are not allowed"), frappe.ValidationError)
|
|
|
|
# Special case: allow '*' for COUNT(*) and similar aggregate functions
|
|
if arg == "*":
|
|
# Return as-is for SQL star expansion (COUNT(*), etc.)
|
|
# pypika will handle this correctly when used with aggregate functions
|
|
return Column("*")
|
|
|
|
# Check for string literals (quoted strings)
|
|
if self._is_string_literal(arg):
|
|
return self._validate_string_literal(arg)
|
|
|
|
# Check for backtick notation: `tabDocType`.`fieldname`
|
|
# Parse and return as Field object to preserve field reference in operators
|
|
elif "`" in arg:
|
|
if parsed := self.engine._parse_backtick_field_notation(arg):
|
|
table_name, field_name = parsed
|
|
return Table(f"tab{table_name}")[field_name]
|
|
else:
|
|
frappe.throw(
|
|
_(
|
|
"Invalid argument format: {0}. Only quoted string literals or simple field names are allowed."
|
|
).format(arg),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
elif self._is_valid_field_name(arg):
|
|
# Validate field name and check permissions
|
|
self._validate_function_field_arg(arg)
|
|
return self.engine.table[arg]
|
|
|
|
# Check if it's a numeric string like "1" (for COUNT(1), etc.)
|
|
elif arg.isdigit():
|
|
return int(arg)
|
|
|
|
else:
|
|
frappe.throw(
|
|
_(
|
|
"Invalid argument format: {0}. Only quoted string literals or simple field names are allowed."
|
|
).format(arg),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
def _is_string_literal(self, arg: str) -> bool:
|
|
"""Check if argument is a properly quoted string literal."""
|
|
return (arg.startswith("'") and arg.endswith("'") and len(arg) >= 2) or (
|
|
arg.startswith('"') and arg.endswith('"') and len(arg) >= 2
|
|
)
|
|
|
|
def _validate_string_literal(self, literal: str):
|
|
"""Validate a string literal for SQL injection attacks."""
|
|
if literal.startswith("'") and literal.endswith("'"):
|
|
quote_char = "'"
|
|
content = literal[1:-1]
|
|
elif literal.startswith('"') and literal.endswith('"'):
|
|
quote_char = '"'
|
|
content = literal[1:-1]
|
|
else:
|
|
frappe.throw(_("Invalid string literal format: {0}").format(literal), frappe.ValidationError)
|
|
|
|
if quote_char in content:
|
|
escaped_content = content.replace(quote_char + quote_char, "")
|
|
if quote_char in escaped_content:
|
|
frappe.throw(
|
|
_("Unescaped quotes in string literal: {0}").format(literal),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Reject dangerous SQL keywords and patterns
|
|
dangerous_patterns = [
|
|
# SQL injection keywords
|
|
r"\b(?:union|select|insert|update|delete|drop|create|alter|exec|execute)\b",
|
|
# Comment patterns
|
|
r"--",
|
|
r"/\*",
|
|
r"\*/",
|
|
# Semicolon (statement terminator)
|
|
r";",
|
|
# Backslash escape sequences that could be dangerous
|
|
r"\\x[0-9a-fA-F]{2}", # Hex escape sequences
|
|
r"\\[0-7]{1,3}", # Octal escape sequences
|
|
]
|
|
|
|
content_lower = content.lower()
|
|
for pattern in dangerous_patterns:
|
|
if re.search(pattern, content_lower, re.IGNORECASE):
|
|
frappe.throw(
|
|
_("Potentially dangerous content in string literal: {0}").format(literal),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Return just the content without quotes - pypika will handle proper escaping
|
|
return content
|
|
|
|
def _is_valid_field_name(self, name: str) -> bool:
|
|
"""Check if a string is a valid field name."""
|
|
# Field names should only contain alphanumeric characters and underscores
|
|
return re.fullmatch(r"[a-zA-Z_][a-zA-Z0-9_]*", name) is not None
|
|
|
|
def _validate_alias(self, alias: str):
|
|
"""Validate alias name for SQL injection."""
|
|
if not isinstance(alias, str):
|
|
frappe.throw(_("Alias must be a string"), frappe.ValidationError)
|
|
|
|
alias = alias.strip()
|
|
if not alias:
|
|
frappe.throw(_("Empty alias is not allowed"), frappe.ValidationError)
|
|
|
|
# Alias should be a simple identifier
|
|
if not re.fullmatch(r"[a-zA-Z_][a-zA-Z0-9_]*", alias):
|
|
frappe.throw(
|
|
_("Invalid alias format: {0}. Alias must be a simple identifier.").format(alias),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Check for SQL keywords that shouldn't be used as aliases
|
|
sql_keywords = {
|
|
"select",
|
|
"from",
|
|
"where",
|
|
"join",
|
|
"inner",
|
|
"left",
|
|
"right",
|
|
"outer",
|
|
"union",
|
|
"group",
|
|
"order",
|
|
"by",
|
|
"having",
|
|
"limit",
|
|
"offset",
|
|
"insert",
|
|
"update",
|
|
"delete",
|
|
"create",
|
|
"drop",
|
|
"alter",
|
|
"table",
|
|
"index",
|
|
"view",
|
|
"database",
|
|
"schema",
|
|
"grant",
|
|
"revoke",
|
|
"commit",
|
|
"rollback",
|
|
"transaction",
|
|
"begin",
|
|
"end",
|
|
"if",
|
|
"else",
|
|
"case",
|
|
"when",
|
|
"then",
|
|
"null",
|
|
"not",
|
|
"and",
|
|
"or",
|
|
"in",
|
|
"exists",
|
|
"between",
|
|
"like",
|
|
"is",
|
|
"as",
|
|
"on",
|
|
"using",
|
|
"distinct",
|
|
"all",
|
|
"any",
|
|
"some",
|
|
"true",
|
|
"false",
|
|
}
|
|
|
|
if alias.lower() in sql_keywords:
|
|
frappe.throw(
|
|
_("Alias cannot be a SQL keyword: {0}").format(alias),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
def _validate_function_field_arg(self, field_name: str):
|
|
"""Validate a field name used as a function argument."""
|
|
if not isinstance(field_name, str):
|
|
return # Non-string arguments are allowed (literals)
|
|
|
|
# Basic validation - should be a simple field name
|
|
if not self._is_valid_field_name(field_name):
|
|
frappe.throw(
|
|
_("Invalid field name in function: {0}. Only simple field names are allowed.").format(
|
|
field_name
|
|
),
|
|
frappe.ValidationError,
|
|
)
|
|
|
|
# Check field permission if permissions are being applied
|
|
if self.engine.apply_permissions and self.engine.doctype:
|
|
self.engine._check_field_permission(self.engine.doctype, field_name)
|