feat(qb): implement build_match_conditions, build_filter_conditions (#35857)
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
parent
9b84264fa5
commit
3d66341ee2
2 changed files with 113 additions and 30 deletions
|
|
@ -390,14 +390,23 @@ class Engine:
|
|||
if not filters:
|
||||
return
|
||||
|
||||
# 1. Handle special case: list of names -> name IN (...)
|
||||
# 1. Check for single simple filter [field, op, value] or [doctype, field, op, value]
|
||||
if len(filters) in (3, 4) and isinstance(filters[1], str):
|
||||
if (
|
||||
filters[1].lower() in OPERATOR_MAP
|
||||
or filters[1].lower() in get_additional_filters_from_hooks()
|
||||
):
|
||||
self.apply_list_filters(filters, collect=collect)
|
||||
return
|
||||
|
||||
# 2. Handle special case: list of names -> name IN (...)
|
||||
if all(isinstance(d, FilterValue) for d in filters):
|
||||
self.apply_dict_filters(
|
||||
{"name": ("in", tuple(convert_to_value(f) for f in filters))}, collect=collect
|
||||
)
|
||||
return
|
||||
|
||||
# 2. Check for nested logic format [cond, op, cond, ...] or [[cond, op, cond]]
|
||||
# 3. Check for nested logic format [cond, op, cond, ...] or [[cond, op, cond]]
|
||||
is_nested_structure = False
|
||||
potential_nested_list = filters
|
||||
is_single_group = False
|
||||
|
|
@ -406,8 +415,12 @@ class Engine:
|
|||
if len(filters) == 1 and isinstance(filters[0], list | tuple):
|
||||
inner_list = filters[0]
|
||||
# Ensure inner list also looks like a nested structure
|
||||
# Check if the operator is a string, validation happens inside _parse_nested_filters
|
||||
if len(inner_list) >= 3 and isinstance(inner_list[1], str):
|
||||
# Check if the operator is a string, and specifically a logical operator
|
||||
if (
|
||||
len(inner_list) >= 3
|
||||
and isinstance(inner_list[1], str)
|
||||
and inner_list[1].lower() in ("and", "or")
|
||||
):
|
||||
is_nested_structure = True
|
||||
potential_nested_list = inner_list # Use the inner list for validation and parsing
|
||||
is_single_group = True # Flag that the original filters was wrapped
|
||||
|
|
@ -416,10 +429,12 @@ class Engine:
|
|||
# Check if it looks like it *might* be nested (even if malformed).
|
||||
# This allows lists starting with operators or containing invalid operators
|
||||
# to be passed to _parse_nested_filters for detailed validation.
|
||||
# Condition: Contains a string at an odd index OR starts with a string.
|
||||
elif any(isinstance(item, str) for i, item in enumerate(filters) if i % 2 != 0) or (
|
||||
len(filters) > 0 and isinstance(filters[0], str)
|
||||
):
|
||||
# Condition: Starts with a list/tuple and contains a string at an odd index OR starts with a string.
|
||||
elif (
|
||||
len(filters) >= 2
|
||||
and isinstance(filters[0], list | tuple)
|
||||
and any(isinstance(item, str) for i, item in enumerate(filters) if i % 2 != 0)
|
||||
) or (len(filters) > 0 and isinstance(filters[0], str)):
|
||||
is_nested_structure = True
|
||||
# potential_nested_list remains filters
|
||||
|
||||
|
|
@ -434,7 +449,10 @@ class Engine:
|
|||
# _parse_nested_filters MUST validate the structure, including the first element and operators.
|
||||
combined_criterion = self._parse_nested_filters(potential_nested_list)
|
||||
if combined_criterion:
|
||||
self.query = self.query.where(combined_criterion)
|
||||
if collect is not None:
|
||||
collect.append(combined_criterion)
|
||||
else:
|
||||
self.query = self.query.where(combined_criterion)
|
||||
except Exception as e:
|
||||
# Log the original filters list for better debugging context
|
||||
frappe.throw(_("Error parsing nested filters: {0}. {1}").format(filters, e), exc=e)
|
||||
|
|
@ -736,10 +754,9 @@ class Engine:
|
|||
|
||||
# Check if it's a nested condition list [cond1, op, cond2, ...]
|
||||
is_nested = False
|
||||
# Broaden check here as well: length >= 3 and second element is string
|
||||
if len(condition) >= 3 and isinstance(condition[1], str):
|
||||
if isinstance(condition[0], list | tuple): # First element must also be a condition
|
||||
is_nested = True
|
||||
# Broaden check here as well: length >= 2 and second element is string
|
||||
if len(condition) >= 2 and isinstance(condition[1], str) and isinstance(condition[0], list | tuple):
|
||||
is_nested = True
|
||||
|
||||
if is_nested:
|
||||
# It's a nested sub-expression like [["assignee", "=", "A"], "or", ["assignee", "=", "B"]]
|
||||
|
|
@ -844,7 +861,7 @@ class Engine:
|
|||
parent_doctype_for_perm = self.parent_doctype if doctype else None
|
||||
|
||||
# If a specific doctype is provided and it's different from the main query doctype,
|
||||
# assume it's a child table and add the join using ChildTableField logic.
|
||||
# if it's a child table, add the join using ChildTableField logic
|
||||
if doctype and doctype != self.doctype:
|
||||
# Check if doctype is a valid child table of self.doctype
|
||||
parent_meta = frappe.get_meta(self.doctype)
|
||||
|
|
@ -855,12 +872,10 @@ class Engine:
|
|||
parent_fieldname = df.fieldname
|
||||
break
|
||||
|
||||
# If it's not a child table, check permissions
|
||||
if not parent_fieldname:
|
||||
frappe.throw(
|
||||
_("{0} is not a child table of {1}").format(doctype, self.doctype),
|
||||
frappe.ValidationError,
|
||||
title=_("Invalid Filter"),
|
||||
)
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
||||
|
||||
# Create a ChildTableField instance to handle join and field access
|
||||
# Pass the identified parent_fieldname
|
||||
|
|
@ -1576,6 +1591,70 @@ class Engine:
|
|||
# because either of those is required to perform a query
|
||||
return True
|
||||
|
||||
def build_match_conditions(self, as_condition: bool = True) -> str | list:
|
||||
"""Build permission-based conditions for the doctype."""
|
||||
if as_condition:
|
||||
condition = self.get_permission_conditions(self.doctype, self.table)
|
||||
if condition:
|
||||
quote_char = "`" if self.is_mariadb else '"'
|
||||
return condition.get_sql(with_namespace=True, quote_char=quote_char)
|
||||
return ""
|
||||
|
||||
if not self.ignore_user_permissions:
|
||||
match_filters = []
|
||||
user_permissions = frappe.permissions.get_user_permissions(self.user)
|
||||
if not user_permissions:
|
||||
return match_filters
|
||||
|
||||
for df in self.get_doctype_link_fields(self.doctype):
|
||||
if df.get("ignore_user_permissions"):
|
||||
continue
|
||||
|
||||
options = df.get("options")
|
||||
|
||||
if user_permission_values := user_permissions.get(options, {}):
|
||||
docs = []
|
||||
|
||||
for permission in user_permission_values:
|
||||
applicable_for = permission.get("applicable_for")
|
||||
doc = permission.get("doc")
|
||||
if not applicable_for:
|
||||
docs.append(doc)
|
||||
|
||||
elif df.get("fieldname") == "name" and self.reference_doctype:
|
||||
if applicable_for == self.reference_doctype:
|
||||
docs.append(doc)
|
||||
|
||||
elif applicable_for == self.doctype:
|
||||
docs.append(doc)
|
||||
|
||||
if docs:
|
||||
match_filters.append({options: docs})
|
||||
|
||||
return match_filters
|
||||
|
||||
return []
|
||||
|
||||
def build_filter_conditions(
|
||||
self, filters, conditions: list, ignore_permissions: bool | None = None
|
||||
) -> None:
|
||||
if not filters:
|
||||
return
|
||||
|
||||
original_apply_permissions = self.apply_permissions
|
||||
if ignore_permissions is not None:
|
||||
self.apply_permissions = not ignore_permissions
|
||||
|
||||
try:
|
||||
criteria_list = []
|
||||
self.apply_filters(filters, collect=criteria_list)
|
||||
|
||||
quote_char = "`" if self.is_mariadb else '"'
|
||||
for c in criteria_list:
|
||||
conditions.append(c.get_sql(with_namespace=True, quote_char=quote_char))
|
||||
finally:
|
||||
self.apply_permissions = original_apply_permissions
|
||||
|
||||
def _is_field_nullable(self, doctype: str, fieldname: str) -> bool:
|
||||
"""Check if a field can contain NULL values."""
|
||||
# primary key is never nullable, modified is usually indexed by default and always present
|
||||
|
|
|
|||
|
|
@ -794,9 +794,11 @@ def scrub_user_tags(tagcount):
|
|||
|
||||
# used in building query in queries.py
|
||||
def get_match_cond(doctype, as_condition=True):
|
||||
from frappe.model.db_query import DatabaseQuery
|
||||
from frappe.database.query import Engine
|
||||
|
||||
cond = DatabaseQuery(doctype).build_match_conditions(as_condition=as_condition)
|
||||
engine = Engine()
|
||||
engine.get_query(doctype, db_query_compat=True)
|
||||
cond = engine.build_match_conditions(as_condition=as_condition)
|
||||
if not as_condition:
|
||||
return cond
|
||||
|
||||
|
|
@ -804,9 +806,11 @@ def get_match_cond(doctype, as_condition=True):
|
|||
|
||||
|
||||
def build_match_conditions(doctype, user=None, as_condition=True):
|
||||
from frappe.model.db_query import DatabaseQuery
|
||||
from frappe.database.query import Engine
|
||||
|
||||
match_conditions = DatabaseQuery(doctype, user=user).build_match_conditions(as_condition=as_condition)
|
||||
engine = Engine()
|
||||
engine.get_query(doctype, user=user, db_query_compat=True)
|
||||
match_conditions = engine.build_match_conditions(as_condition=as_condition)
|
||||
if as_condition:
|
||||
return match_conditions.replace("%", "%%")
|
||||
return match_conditions
|
||||
|
|
@ -842,18 +846,18 @@ def get_filters_cond(doctype, filters, conditions, ignore_permissions=None, with
|
|||
else:
|
||||
flt.append([doctype, f[0], "=", f[1]])
|
||||
|
||||
from frappe.model.db_query import DatabaseQuery
|
||||
from frappe.database.query import Engine
|
||||
|
||||
query = DatabaseQuery(doctype)
|
||||
query.filters = flt
|
||||
query.conditions = conditions
|
||||
engine = Engine()
|
||||
engine.get_query(doctype, ignore_permissions=ignore_permissions, db_query_compat=True)
|
||||
|
||||
if with_match_conditions:
|
||||
query.build_match_conditions()
|
||||
if match_cond := engine.build_match_conditions():
|
||||
conditions.append(match_cond)
|
||||
|
||||
query.build_filter_conditions(flt, conditions, ignore_permissions)
|
||||
engine.build_filter_conditions(flt, conditions)
|
||||
|
||||
cond = " and " + " and ".join(query.conditions)
|
||||
cond = " and " + " and ".join(conditions) if conditions else ""
|
||||
else:
|
||||
cond = ""
|
||||
return cond
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue