Merge pull request #34146 from akhilnarang/use-qb

feat(db_query)!: use query builder
This commit is contained in:
Akhil Narang 2025-11-19 21:35:53 +05:30 committed by GitHub
commit 91df62f0f4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 1602 additions and 341 deletions

View file

@ -1355,9 +1355,9 @@ def get_list(doctype, *args, **kwargs):
# filter as a list of lists
frappe.get_list("ToDo", fields="*", filters = [["modified", ">", "2014-01-01"]])
"""
import frappe.model.db_query
import frappe.model.qb_query
return frappe.model.db_query.DatabaseQuery(doctype).execute(*args, **kwargs)
return frappe.model.qb_query.DatabaseQuery(doctype).execute(*args, **kwargs)
def get_all(doctype, *args, **kwargs):

View file

@ -310,7 +310,7 @@ def get_address_display_list(doctype: str, name: str) -> list[dict]:
["Dynamic Link", "parenttype", "=", "Address"],
],
fields=["*"],
order_by="is_primary_address DESC, `tabAddress`.creation ASC",
order_by="is_primary_address DESC, creation ASC",
)
for a in address_list:
a["display"] = get_address_display(a)

View file

@ -486,7 +486,7 @@ def get_contact_display_list(doctype: str, name: str) -> list[dict]:
["Dynamic Link", "parenttype", "=", "Contact"],
],
fields=["*"],
order_by="is_primary_contact DESC, `tabContact`.creation ASC",
order_by="is_primary_contact DESC, creation ASC",
)
for contact in contact_list:

View file

@ -50,7 +50,7 @@ class TestActivityLog(IntegrationTestCase):
"user": "Administrator",
"operation": operation,
},
order_by="`creation` DESC",
order_by="creation DESC",
)
name = names[0]

View file

@ -236,7 +236,7 @@ def get_import_status(data_import_name: str):
import_status = {"status": data_import.status}
logs = frappe.get_all(
"Data Import Log",
fields=["count(*) as count", "success"],
fields=[{"COUNT": "*", "as": "count"}, "success"],
filters={"data_import": data_import_name},
group_by="success",
)

View file

@ -165,9 +165,9 @@ class Exporter:
filters = self.export_filters
if self.meta.is_nested_set():
order_by = f"`tab{self.doctype}`.`lft` ASC"
order_by = "lft ASC"
else:
order_by = f"`tab{self.doctype}`.`creation` DESC"
order_by = "creation DESC"
parent_fields = [format_column_name(df) for df in self.fields if df.parent == self.doctype]
parent_data = frappe.db.get_list(

View file

@ -153,24 +153,25 @@ class TestDocType(IntegrationTestCase):
def test_all_depends_on_fields_conditions(self):
import re
docfields = frappe.get_all(
"DocField",
or_filters={
"ifnull(depends_on, '')": ("!=", ""),
"ifnull(collapsible_depends_on, '')": ("!=", ""),
"ifnull(mandatory_depends_on, '')": ("!=", ""),
"ifnull(read_only_depends_on, '')": ("!=", ""),
},
fields=[
"parent",
"depends_on",
"collapsible_depends_on",
"mandatory_depends_on",
"read_only_depends_on",
"fieldname",
"fieldtype",
],
DocField = frappe.qb.DocType("DocField")
docfields_query = (
frappe.qb.from_(DocField)
.select(
DocField.parent,
DocField.depends_on,
DocField.collapsible_depends_on,
DocField.mandatory_depends_on,
DocField.read_only_depends_on,
DocField.fieldname,
)
.where(
(DocField.depends_on != "")
| (DocField.collapsible_depends_on != "")
| (DocField.mandatory_depends_on != "")
| (DocField.read_only_depends_on != "")
)
)
docfields = docfields_query.run(as_dict=True)
pattern = r'[\w\.:_]+\s*={1}\s*[\w\.@\'"]+'
for field in docfields:

View file

@ -191,7 +191,7 @@ class TestRQJob(IntegrationTestCase):
jobs = [frappe.enqueue(method=self.BG_JOB, queue="short", fail=True) for _ in range(limit * 2)]
self.check_status(jobs[-1], "failed")
self.assertLessEqual(RQJob.get_count(filters=[["RQ Job", "status", "=", "failed"]]), limit * 1.1)
self.assertLessEqual(RQJob.get_count(filters=[["RQ Job", "status", "=", "failed"]]), limit * 1.2)
def test_func(fail=False, sleep=0):

View file

@ -157,10 +157,11 @@ class TestServerScript(IntegrationTestCase):
self.assertEqual(frappe.get_doc("Server Script", "test_return_value").execute_method(), "hello")
def test_permission_query(self):
sql = frappe.db.get_list("ToDo", run=False)
if frappe.conf.db_type != "postgres":
self.assertTrue("where (1 = 1)" in frappe.db.get_list("ToDo", run=False))
self.assertTrue("where (1 = 1)" in sql.lower())
else:
self.assertTrue("where (1 = '1')" in frappe.db.get_list("ToDo", run=False))
self.assertTrue("where (1 = '1')" in sql.lower())
self.assertTrue(isinstance(frappe.db.get_list("ToDo"), list))
def test_attribute_error(self):

View file

@ -876,14 +876,20 @@ def get_all_roles():
"""return all roles"""
active_domains = frappe.get_active_domains()
roles = frappe.get_all(
"Role",
filters={
"name": ("not in", frappe.permissions.AUTOMATIC_ROLES),
"disabled": 0,
},
or_filters={"ifnull(restrict_to_domain, '')": "", "restrict_to_domain": ("in", active_domains)},
order_by="name",
Role = frappe.qb.DocType("Role")
domain_condition = (Role.restrict_to_domain.isnull()) | (Role.restrict_to_domain == "")
if active_domains:
domain_condition = domain_condition | Role.restrict_to_domain.isin(active_domains)
roles = (
frappe.qb.from_(Role)
.select(Role.name)
.where(
(Role.name.notin(frappe.permissions.AUTOMATIC_ROLES)) & (Role.disabled == 0) & domain_condition
)
.orderby(Role.name)
.run(as_dict=True)
)
return sorted([role.get("name") for role in roles])

View file

@ -234,7 +234,7 @@ def get_user_linked_doctypes(doctype, txt, searchfield, start, page_len, filters
"DocType",
fields=["`tabDocType`.`name`"],
filters=filters,
order_by="`tabDocType`.`idx` desc",
order_by="idx desc",
limit_start=start,
limit_page_length=page_len,
as_list=1,

View file

@ -20,7 +20,7 @@ def get_things_todo(as_list=False):
"""Return a count of incomplete ToDos."""
data = frappe.get_list(
"ToDo",
fields=["name", "description"] if as_list else "count(*)",
fields=["name", "description"] if as_list else [{"COUNT": "*"}],
filters=[["ToDo", "status", "=", "Open"]],
or_filters=[
["ToDo", "allocated_to", "=", frappe.session.user],

View file

@ -32,14 +32,20 @@ def get_roles_and_doctypes():
active_domains = frappe.get_active_domains()
doctypes = frappe.get_all(
"DocType",
filters={
"istable": 0,
"name": ("not in", ",".join(not_allowed_in_permission_manager)),
},
or_filters={"ifnull(restrict_to_domain, '')": "", "restrict_to_domain": ("in", active_domains)},
fields=["name"],
DocType = frappe.qb.DocType("DocType")
doctype_domain_condition = (DocType.restrict_to_domain.isnull()) | (DocType.restrict_to_domain == "")
if active_domains:
doctype_domain_condition = doctype_domain_condition | DocType.restrict_to_domain.isin(active_domains)
doctypes = (
frappe.qb.from_(DocType)
.select(DocType.name)
.where(
(DocType.istable == 0)
& (DocType.name.notin(not_allowed_in_permission_manager))
& doctype_domain_condition
)
.run(as_dict=True)
)
restricted_roles = ["Administrator"]
@ -48,14 +54,16 @@ def get_roles_and_doctypes():
restricted_roles.extend(row.role for row in custom_user_type_roles)
restricted_roles.extend(AUTOMATIC_ROLES)
roles = frappe.get_all(
"Role",
filters={
"name": ("not in", restricted_roles),
"disabled": 0,
},
or_filters={"ifnull(restrict_to_domain, '')": "", "restrict_to_domain": ("in", active_domains)},
fields=["name"],
Role = frappe.qb.DocType("Role")
role_domain_condition = (Role.restrict_to_domain.isnull()) | (Role.restrict_to_domain == "")
if active_domains:
role_domain_condition = role_domain_condition | Role.restrict_to_domain.isin(active_domains)
roles = (
frappe.qb.from_(Role)
.select(Role.name)
.where((Role.name.notin(restricted_roles)) & (Role.disabled == 0) & role_domain_condition)
.run(as_dict=True)
)
doctypes_list = [{"label": _(d.get("name")), "value": d.get("name")} for d in doctypes]

View file

@ -41,6 +41,7 @@ class PropertySetter(Document):
if self.is_new():
delete_property_setter(self.doc_type, self.property, self.field_name, self.row_name)
frappe.clear_cache(doctype=self.doc_type)
def on_trash(self):

View file

@ -11,9 +11,9 @@ import warnings
from collections.abc import Iterable, Sequence
from contextlib import contextmanager, suppress
from time import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
from pypika.queries import QueryBuilder
from pypika.queries import QueryBuilder, Table
import frappe
import frappe.defaults
@ -27,6 +27,7 @@ from frappe.database.utils import (
Query,
QueryValues,
convert_to_value,
get_doctype_sort_info,
get_query_type,
is_query_type,
)
@ -649,7 +650,6 @@ class Database:
try:
if order_by:
order_by = "creation" if order_by == DefaultOrderBy else order_by
query = frappe.qb.get_query(
table=doctype,
filters=filters,
@ -1323,12 +1323,12 @@ class Database:
from frappe.utils import now_datetime
Table = frappe.qb.DocType(doctype)
dt = frappe.qb.DocType(doctype)
return (
frappe.qb.from_(Table)
.select(Count(Table.name))
.where(Table.creation >= now_datetime() - relativedelta(minutes=minutes))
frappe.qb.from_(dt)
.select(Count(dt.name))
.where(dt.creation >= now_datetime() - relativedelta(minutes=minutes))
.run()[0][0]
)

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,7 @@
import re
import string
from collections.abc import KeysView, ValuesView
from functools import cached_property, wraps
import frappe
@ -31,6 +32,10 @@ QUERY_TYPE_PATTERN = re.compile(r"\s*([A-Za-z]*)")
def convert_to_value(o: FilterValue):
if isinstance(o, bool):
return int(o)
elif isinstance(o, dict):
return frappe.as_json(o)
elif isinstance(o, (KeysView, ValuesView)):
return tuple(convert_to_value(item) for item in o)
return o
@ -53,6 +58,28 @@ def get_doctype_name(table_name: str) -> str:
return table_name.replace('"', "")
def get_doctype_sort_info(doctype: str) -> tuple[str, str]:
"""
Get sort_field and sort_order for a DocType from meta.
Args:
doctype: The DocType name
Returns:
Tuple of (sort_field, sort_order) with defaults ("creation", "DESC") if not found
"""
from frappe.database.query import CORE_DOCTYPES
if doctype in CORE_DOCTYPES:
return "creation", "DESC"
try:
meta = frappe.get_meta(doctype)
return meta.sort_field or "creation", meta.sort_order or "DESC"
except frappe.DoesNotExistError:
return "creation", "DESC"
class LazyString:
def _setup(self) -> str:
raise NotImplementedError

View file

@ -201,7 +201,7 @@ def get_chart_config(chart, filters, timespan, timegrain, from_date, to_date):
data = frappe.get_list(
doctype,
fields=[datefield, f"SUM({value_field})", "COUNT(*)"],
fields=[datefield, {"SUM": value_field}, {"COUNT": "*"}],
filters=filters,
group_by=datefield,
order_by=datefield,
@ -244,7 +244,7 @@ def get_heatmap_chart_config(chart, filters, heatmap_year):
doctype,
fields=[
timestamp_field,
f"{aggregate_function}({value_field})",
{aggregate_function: value_field},
],
filters=filters,
group_by=f"date({datefield})",
@ -270,7 +270,7 @@ def get_group_by_chart_config(chart, filters) -> dict | None:
doctype,
fields=[
f"{group_by_field} as name",
f"{aggregate_function}({value_field}) as count",
{aggregate_function: value_field, "as": "count"},
],
filters=filters,
parent_doctype=chart.parent_document_type,

View file

@ -152,13 +152,19 @@ def get_desktop_icons(user=None, bootinfo=None):
active_domains = frappe.get_active_domains()
blocked_doctypes = frappe.get_all(
"DocType",
filters={"ifnull(restrict_to_domain, '')": ("not in", ",".join(active_domains))},
fields=["name"],
)
blocked_doctypes = [d.get("name") for d in blocked_doctypes]
DocType = frappe.qb.DocType("DocType")
if active_domains:
blocked_condition = (
(DocType.restrict_to_domain.isnull())
| (DocType.restrict_to_domain == "")
| (DocType.restrict_to_domain.notin(active_domains))
)
else:
blocked_condition = (DocType.restrict_to_domain.isnull()) | (DocType.restrict_to_domain == "")
blocked_doctypes = [
d.get("name")
for d in frappe.qb.from_(DocType).select(DocType.name).where(blocked_condition).run(as_dict=True)
]
standard_icons = frappe.get_all("Desktop Icon", fields=fields, filters={"standard": 1})

View file

@ -167,7 +167,7 @@ def format_email_header(header_map, language, docname):
@frappe.whitelist()
@http_cache(max_age=60, stale_while_revalidate=60 * 60)
def get_notification_logs(limit=20):
def get_notification_logs(limit: int = 20):
notification_logs = frappe.db.get_list(
"Notification Log", fields=["*"], limit=limit, order_by="creation desc"
)

View file

@ -46,7 +46,7 @@ def deferred_insert(routes):
def frequently_visited_links():
return frappe.get_all(
"Route History",
fields=["route", "count(name) as count"],
fields=["route", {"COUNT": "name", "as": "count"}],
filters={"user": frappe.session.user},
group_by="route",
order_by="count desc",

View file

@ -457,7 +457,7 @@ def get_linked_docs(doctype: str, name: str, linkinfo: dict | None = None) -> di
if add_fields := link_context.get("add_fields"):
fields += add_fields
fields = [f"`tab{linked_doctype}`.`{sf.strip()}`" for sf in fields if sf and "`tab" not in sf]
fields = [sf.strip() for sf in fields if sf]
if filters_ctx := link_context.get("filters"):
ret = frappe.get_list(doctype=linked_doctype, fields=fields, filters=filters_ctx, order_by=None)

View file

@ -101,7 +101,7 @@ def get_next(doctype, value, prev, filters=None, sort_order="desc", sort_field="
doctype,
fields=["name"],
filters=filters,
order_by=f"`tab{doctype}`.{sort_field}" + " " + sort_order,
order_by=f"{sort_field} {sort_order}",
limit_start=0,
limit_page_length=1,
as_list=True,

View file

@ -68,8 +68,8 @@ def get_group_by_count(doctype: str, current_filters: str, field: str) -> list[d
data = frappe.get_list(
doctype,
filters=current_filters,
group_by=f"`tab{doctype}`.{field}",
fields=["count(*) as count", f"`{field}` as name"],
group_by=field,
fields=[{"COUNT": "*", "as": "count"}, f"{field} as name"],
order_by="count desc",
limit=1000,
)

View file

@ -65,7 +65,7 @@ def get_notifications_for_doctypes(config, notification_count):
try:
if isinstance(condition, dict):
result = frappe.get_list(
d, fields=["count(*) as count"], filters=condition, ignore_ifnull=True
d, fields=[{"COUNT": "*", "as": "count"}], filters=condition, ignore_ifnull=True
)[0].count
else:
result = frappe.get_attr(condition)()

View file

@ -126,6 +126,10 @@ def validate_fields(data):
wildcard = update_wildcard_field_param(data)
for field in list(data.fields or []):
# TODO: extract_fieldnames needs to handle dict fields for qb_query aggregations
if isinstance(field, dict):
continue
fieldname = extract_fieldnames(field)[0]
if not fieldname:
raise_invalid_field(fieldname)
@ -686,7 +690,7 @@ def get_stats(stats, doctype, filters=None):
try:
tag_count = frappe.get_list(
doctype,
fields=[column, "count(*)"],
fields=[column, {"COUNT": "*"}],
filters=[*filters, [column, "!=", ""]],
group_by=column,
as_list=True,
@ -697,7 +701,7 @@ def get_stats(stats, doctype, filters=None):
results[column] = scrub_user_tags(tag_count)
no_tag_count = frappe.get_list(
doctype,
fields=[column, "count(*)"],
fields=[column, {"COUNT": "1"}],
filters=[*filters, [column, "in", ("", ",")]],
as_list=True,
group_by=column,
@ -736,7 +740,7 @@ def get_filter_dashboard_data(stats, doctype, filters=None):
if tag["type"] not in ["Date", "Datetime"]:
tagcount = frappe.get_list(
doctype,
fields=[tag["name"], "count(*)"],
fields=[tag["name"], {"COUNT": "*"}],
filters=[*filters, "ifnull(`{}`,'')!=''".format(tag["name"])],
group_by=tag["name"],
as_list=True,
@ -758,7 +762,7 @@ def get_filter_dashboard_data(stats, doctype, filters=None):
"No Data",
frappe.get_list(
doctype,
fields=[tag["name"], "count(*)"],
fields=[tag["name"], {"COUNT": "*"}],
filters=[*filters, "({0} = '' or {0} is null)".format(tag["name"])],
as_list=True,
)[0][1],

View file

@ -163,25 +163,29 @@ def search_widget(
fields = get_std_fields_list(meta, searchfield or "name")
if filter_fields:
fields = list(set(fields + json.loads(filter_fields)))
formatted_fields = [f"`tab{meta.name}`.`{f.strip()}`" for f in fields]
formatted_fields = [f.strip() for f in fields]
# Insert title field query after name
if meta.show_title_field_in_link and meta.title_field:
formatted_fields.insert(1, f"`tab{meta.name}`.{meta.title_field} as `label`")
formatted_fields.insert(1, f"{meta.title_field} as label")
order_by_based_on_meta = get_order_by(doctype, meta)
# `idx` is number of times a document is referred, check link_count.py
order_by = f"`tab{doctype}`.idx desc, {order_by_based_on_meta}"
order_by = f"idx desc, {order_by_based_on_meta}"
if not meta.translated_doctype:
_txt = frappe.db.escape((txt or "").replace("%", "").replace("@", ""))
# locate returns 0 if string is not found, convert 0 to null and then sort null to end in order by
_relevance = f"(1 / nullif(locate({_txt}, `tab{doctype}`.`name`), 0))"
formatted_fields.append(f"""{_relevance} as `_relevance`""")
# Since we are sorting by alias postgres needs to know number of column we are sorting
_relevance_expr = {"DIV": [1, {"NULLIF": [{"LOCATE": [_txt, "name"]}, 0]}]}
# For MariaDB, wrap in IFNULL for sorting to push nulls to end
if frappe.db.db_type == "mariadb":
order_by = f"ifnull(_relevance, -9999) desc, {order_by}"
_relevance = {"IFNULL": [_relevance_expr, -9999], "as": "_relevance"}
formatted_fields.append(_relevance)
order_by = f"_relevance desc, {order_by}"
elif frappe.db.db_type == "postgres":
_relevance = {**_relevance_expr, "as": "_relevance"}
formatted_fields.append(_relevance)
# Since we are sorting by alias postgres needs to know number of column we are sorting
order_by = f"{len(formatted_fields)} desc nulls last, {order_by}"

View file

@ -3,6 +3,7 @@
import frappe
from frappe import _
from frappe.query_builder import Field, functions
@frappe.whitelist()
@ -42,24 +43,25 @@ def get_children(doctype, parent="", include_disabled=False, **filters):
def _get_children(doctype, parent="", ignore_permissions=False, include_disabled=False):
parent_field = "parent_" + frappe.scrub(doctype)
filters = [[f"ifnull(`{parent_field}`,'')", "=", parent], ["docstatus", "<", 2]]
if frappe.db.has_column(doctype, "disabled") and not include_disabled:
filters.append(["disabled", "=", False])
meta = frappe.get_meta(doctype)
return frappe.get_list(
doctype,
fields=[
"name as value",
"{} as title".format(meta.get("title_field") or "name"),
"is_group as expandable",
],
filters=filters,
order_by="name",
ignore_permissions=ignore_permissions,
qb = (
frappe.qb.from_(doctype)
.select(
Field("name").as_("value"),
Field(meta.get("title_field") or "name").as_("title"),
Field("is_group").as_("expandable"),
)
.where(functions.IfNull(Field(parent_field), "").eq(parent))
.where(Field("docstatus") < 2)
)
if frappe.db.has_column(doctype, "disabled") and not include_disabled:
qb = qb.where(Field("disabled").eq(False))
# Order by name and execute
return qb.orderby("name").run(as_dict=True)
@frappe.whitelist()
def add_node():

View file

@ -950,7 +950,7 @@ def get_max_email_uid(email_account):
"sent_or_received": "Received",
"email_account": email_account,
},
fields=["max(uid) as uid"],
fields=[{"MAX": "uid", "as": "uid"}],
):
return cint(result[0].get("uid", 0)) + 1
return 1

View file

@ -109,11 +109,11 @@ class FrappeClient:
def get_list(
self,
doctype,
doctype: str,
fields='["name"]',
filters=None,
limit_start=0,
limit_page_length=None,
limit_start: int = 0,
limit_page_length: int | None = None,
order_by=None,
group_by=None,
):

View file

@ -846,7 +846,7 @@ from {tables}
nodes = frappe.get_all(
ref_doctype,
filters={"lft": [">", lft], "rgt": ["<", rgt]},
order_by="`lft` ASC",
order_by="lft ASC",
pluck="name",
)
if f.operator.lower() == "descendants of (inclusive)":
@ -856,7 +856,7 @@ from {tables}
nodes = frappe.get_all(
ref_doctype,
filters={"lft": ["<", lft], "rgt": [">", rgt]},
order_by="`lft` DESC",
order_by="lft DESC",
pluck="name",
)
@ -1377,7 +1377,7 @@ def get_order_by(doctype, meta):
# will covert to
# `tabItem`.`idx` desc, `tabItem`.`creation` desc
order_by = ", ".join(
f"`tab{doctype}`.`{f_split[0].strip()}` {f_split[1].strip()}"
f"{f_split[0].strip()} {f_split[1].strip()}"
for f in meta.sort_field.split(",")
if (f_split := f.split(maxsplit=2))
)
@ -1385,7 +1385,7 @@ def get_order_by(doctype, meta):
else:
sort_field = meta.sort_field or "creation"
sort_order = (meta.sort_field and meta.sort_order) or "desc"
order_by = f"`tab{doctype}`.`{sort_field}` {sort_order}"
order_by = f"{sort_field} {sort_order}"
return order_by

View file

@ -420,11 +420,6 @@ class Meta(Document):
self.extend("fields", custom_fields)
def apply_property_setters(self):
"""
Property Setters are set via Customize Form. They override standard properties
of the doctype or its child properties like fields, links etc. This method
applies the customized properties over the standard meta object
"""
if not frappe.db.table_exists("Property Setter"):
return

346
frappe/model/qb_query.py Normal file
View file

@ -0,0 +1,346 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
"""Query implementation using frappe's query builder"""
import copy
import json
from typing import Any
import frappe
from frappe.database.utils import DefaultOrderBy, FilterValue
from frappe.deprecation_dumpster import deprecation_warning
from frappe.model.utils import is_virtual_doctype
from frappe.model.utils.user_settings import get_user_settings, update_user_settings
from frappe.query_builder.utils import Column
class DatabaseQuery:
"""
Copy of db_query.py DatabaseQuery, using query builder instead.
"""
def __init__(self, doctype: str) -> None:
self.doctype = doctype
def execute(
self,
fields: list[str] | tuple[str, ...] | str | None = None,
filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
or_filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None = None,
group_by: str | None = None,
order_by: str = DefaultOrderBy,
limit: int | None = None,
offset: int | None = None,
limit_start: int = 0,
limit_page_length: int | None = None,
as_list: bool = False,
with_childnames: bool = False,
debug: bool = False,
ignore_permissions: bool = False,
user: str | None = None,
with_comment_count: bool = False,
join: str = "left join",
distinct: bool = False,
start: int | None = None,
page_length: int | None = None,
ignore_ifnull: bool = False,
save_user_settings: bool = False,
save_user_settings_fields: bool = False,
update: dict[str, Any] | None = None,
user_settings: str | dict[str, Any] | None = None,
reference_doctype: str | None = None,
run: bool = True,
strict: bool = True,
pluck: str | None = None,
ignore_ddl: bool = False,
*,
parent_doctype: str | None = None,
) -> list:
"""Execute a database query using the Query Builder engine.
Args:
fields: Fields to select. Can be a list, tuple, or comma-separated string.
filters: Main filter conditions. Supports dicts, lists, and operator tuples.
or_filters: Additional filter conditions to be combined with OR.
group_by: Fields to group results by.
order_by: Fields to order results by.
limit: Maximum number of records to return.
offset: Number of records to skip for pagination.
limit_start: Legacy pagination start (deprecated, use offset).
limit_page_length: Legacy pagination length (deprecated, use limit).
as_list: Return results as list of lists instead of list of dicts.
with_childnames: Include child document names (not implemented).
debug: Enable debug mode for query inspection.
ignore_permissions: Skip permission checks for the query.
user: Execute query as specific user.
with_comment_count: Add comment count to results (_comment_count field).
join: Type of join for related tables (QB engine auto-determines optimal joins).
distinct: Return only distinct results.
start: Legacy alias for limit_start (deprecated).
page_length: Legacy alias for limit_page_length (deprecated).
ignore_ifnull: Skip IFNULL wrapping (QB engine handles NULL optimization automatically).
save_user_settings: Save current query settings for user.
save_user_settings_fields: Save field selection in user settings.
update: Dictionary to merge into each result when as_list=False.
user_settings: Custom user settings as JSON string or dict.
reference_doctype: Reference doctype for contextual user permissions.
run: Execute query immediately (True) or return query object (False).
strict: Enable strict mode for query validation (legacy compatibility).
pluck: Extract single field values as a simple list.
ignore_ddl: Ignore DDL operations during query execution (legacy compatibility).
parent_doctype: Parent doctype for child table queries.
Returns:
Query results as list of dicts (default) or list of lists (as_list=True).
If pluck is specified, returns list of field values.
If run=False, returns query object instead of results.
Raises:
ValidationError: For invalid parameters or query structure.
PermissionError: When user lacks required permissions.
"""
# filters and fields swappable
# its hard to remember what comes first
if isinstance(fields, dict) or (fields and isinstance(fields, list) and isinstance(fields[0], list)):
# if fields is given as dict/list of list, its probably filters
filters, fields = fields, filters
elif fields and isinstance(filters, list) and len(filters) > 1 and isinstance(filters[0], str):
# if `filters` is a list of strings, its probably fields
filters, fields = fields, filters
# Handle virtual doctypes before any other processing
if is_virtual_doctype(self.doctype):
return self._handle_virtual_doctype(
filters,
or_filters,
start,
offset,
limit_start,
page_length,
limit,
limit_page_length,
order_by,
as_list,
with_comment_count,
save_user_settings,
save_user_settings_fields,
pluck,
parent_doctype,
)
# Handle deprecated parameters
if limit_start:
deprecation_warning(
"2024-01-01", "v17", "The 'limit_start' parameter is deprecated. Use 'offset' instead."
)
if offset is None:
offset = limit_start
if limit_page_length:
deprecation_warning(
"2024-01-01", "v17", "The 'limit_page_length' parameter is deprecated. Use 'limit' instead."
)
if limit is None:
limit = limit_page_length
if start:
deprecation_warning(
"2024-01-01", "v17", "The 'start' parameter is deprecated. Use 'offset' instead."
)
if offset is None:
offset = start
if page_length:
deprecation_warning(
"2024-01-01", "v17", "The 'page_length' parameter is deprecated. Use 'limit' instead."
)
if limit is None:
limit = page_length
# Set fields to the requested field or `name` if none specified
if not fields:
fields = [pluck or "name"]
# Check if table exists before running query
from frappe.model.meta import get_table_columns
try:
get_table_columns(self.doctype)
except frappe.db.TableMissingError:
if ignore_ddl:
return []
else:
raise
# Build query using QB engine with converted syntax
kwargs = {
"table": self.doctype,
"fields": fields,
"filters": filters,
"or_filters": or_filters,
"group_by": group_by,
"order_by": order_by,
"limit": frappe.cint(limit),
"offset": frappe.cint(offset),
"distinct": distinct,
"ignore_permissions": ignore_permissions,
"user": user,
"parent_doctype": parent_doctype,
"reference_doctype": reference_doctype,
"db_query_compat": True,
}
query = frappe.qb.get_query(**kwargs)
if not run:
# Return the SQL query string instead of executing
return str(query.get_sql())
# Run the query
if pluck:
result = query.run(debug=debug, as_dict=True, pluck=pluck)
else:
result = query.run(debug=debug, as_dict=not as_list, update=update)
# Add comment count if requested and not as_list
if with_comment_count and not as_list and self.doctype:
self._add_comment_count(result)
# Save user settings if requested
if save_user_settings:
user_settings_fields = copy.deepcopy(fields) if save_user_settings_fields else None
if user_settings and isinstance(user_settings, str):
user_settings = json.loads(user_settings)
self._save_user_settings(user_settings, user_settings_fields, save_user_settings_fields)
return result
def _add_comment_count(self, result: list[Any]) -> None:
"""Add comment count to each result row by parsing _comments field.
This method adds a _comment_count field to each row based on the _comments field content.
It parses the JSON structure to count the number of comments.
Args:
result: List of result dictionaries to modify
"""
if not result:
return
for row in result:
if isinstance(row, dict) and "_comments" in row:
try:
comments_data = json.loads(row["_comments"] or "[]")
row["_comment_count"] = len(comments_data) if isinstance(comments_data, list) else 0
except (json.JSONDecodeError, TypeError):
row["_comment_count"] = 0
elif isinstance(row, dict):
row["_comment_count"] = 0
def _save_user_settings(
self,
user_settings: dict[str, Any] | None,
user_settings_fields: list[str] | None,
save_user_settings_fields: bool,
) -> None:
"""Save user settings for the current query.
This method stores user preferences for field selections and other query parameters
to provide a personalized experience for repeated queries.
Args:
user_settings: Custom user settings to save
user_settings_fields: Field list to save if save_user_settings_fields is True
save_user_settings_fields: Whether to save the field selection
"""
if not self.doctype:
return
try:
current_settings = get_user_settings(self.doctype) or {}
# Update with custom user settings if provided
if user_settings:
current_settings.update(user_settings)
# Save field selection if requested
if save_user_settings_fields and user_settings_fields:
current_settings["fields"] = user_settings_fields
# Only save if there are actual settings to save
if current_settings:
update_user_settings(self.doctype, current_settings)
except Exception:
# Don't let user settings errors break the query
pass
def _handle_virtual_doctype(
self,
filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None,
or_filters: dict[str, FilterValue] | FilterValue | list[list | FilterValue] | None,
start: int | None,
offset: int | None,
limit_start: int,
page_length: int | None,
limit: int | None,
limit_page_length: int | None,
order_by: str,
as_list: bool,
with_comment_count: bool,
save_user_settings: bool,
save_user_settings_fields: bool,
pluck: str | None,
parent_doctype: str | None,
) -> list:
"""Handle virtual doctype queries by delegating to controller.get_list().
Virtual doctypes don't have database tables and use controller methods
to generate data dynamically. Converts filters to Filters objects and
calls the doctype controller's get_list method.
Returns:
List of results from controller.get_list()
"""
from frappe.model.base_document import get_controller
from frappe.types.filter import Filters
controller = get_controller(self.doctype)
if not hasattr(controller, "get_list"):
return []
filters = filters or Filters()
if isinstance(filters, str):
filters = json.loads(filters)
if not isinstance(filters, Filters):
filters = Filters(filters, doctype=self.doctype)
or_filters = or_filters or Filters()
if isinstance(or_filters, str):
or_filters = json.loads(or_filters)
if not isinstance(or_filters, Filters):
or_filters = Filters(or_filters, doctype=self.doctype)
_page_length = page_length or limit or limit_page_length or 20
kwargs = {
"filters": filters,
"or_filters": or_filters,
"start": start or offset or limit_start or 0,
"page_length": _page_length,
"limit_page_length": _page_length,
"order_by": order_by,
"as_list": as_list,
"with_comment_count": with_comment_count,
"save_user_settings": save_user_settings,
"save_user_settings_fields": save_user_settings_fields,
"pluck": pluck,
"parent_doctype": parent_doctype,
"doctype": self.doctype,
}
# Use frappe.call to filter kwargs and call controller
return frappe.call(controller.get_list, args=kwargs, **kwargs)

View file

@ -1,6 +1,6 @@
from typing import Any
from pypika.functions import DistinctOptionFunction
from pypika.functions import DistinctOptionFunction, Function
from pypika.terms import Term
from pypika.utils import builder, format_alias_sql, format_quotes
@ -98,3 +98,18 @@ class ConstantColumn(Term):
quote_char=quote_char,
**kwargs,
)
class MonthName(Function):
def __init__(self, field, alias=None):
super().__init__("MONTHNAME", field, alias=alias)
class Quarter(Function):
def __init__(self, field, alias=None):
super().__init__("QUARTER", field, alias=alias)
class Month(Function):
def __init__(self, field, alias=None):
super().__init__("MONTH", field, alias=alias)

View file

@ -5,7 +5,15 @@ from pypika.functions import *
from pypika.terms import Arithmetic, ArithmeticExpression, CustomFunction, Function
import frappe
from frappe.query_builder.custom import GROUP_CONCAT, MATCH, STRING_AGG, TO_TSVECTOR
from frappe.query_builder.custom import (
GROUP_CONCAT,
MATCH,
STRING_AGG,
TO_TSVECTOR,
Month,
MonthName,
Quarter,
)
from frappe.query_builder.utils import ImportMapper, db_type_is
from .utils import PseudoColumn

View file

@ -230,8 +230,7 @@ class TestResourceAPI(FrappeAPITestCase):
def test_get_list_debug(self):
# test 5: fetch response with debug
with suppress_stdout():
response = self.get(self.resource(self.DOCTYPE), {"sid": self.sid, "debug": True})
response = self.get(self.resource(self.DOCTYPE), {"sid": self.sid, "debug": True})
self.assertEqual(response.status_code, 200)
self.assertIn("_debug_messages", response.json)
self.assertIsInstance(response.json["_debug_messages"], str)

View file

@ -93,12 +93,12 @@ class TestDB(IntegrationTestCase):
),
)
self.assertEqual(
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name > 's' ORDER BY MODIFIED DESC""")[0][0],
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name > 's' ORDER BY creation DESC""")[0][0],
frappe.db.get_value("User", {"name": [">", "s"]}),
)
self.assertEqual(
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name >= 't' ORDER BY MODIFIED DESC""")[0][0],
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name >= 't' ORDER BY creation DESC""")[0][0],
frappe.db.get_value("User", {"name": [">=", "t"]}),
)
self.assertEqual(
@ -403,8 +403,8 @@ class TestDB(IntegrationTestCase):
random_field,
)
self.assertEqual(
next(iter(frappe.get_all("ToDo", fields=[f"count(`{random_field}`)"], limit=1)[0])),
"count" if frappe.conf.db_type == "postgres" else f"count(`{random_field}`)",
next(iter(frappe.get_all("ToDo", fields=[{"COUNT": random_field}], limit=1)[0])),
"COUNT" if frappe.conf.db_type == "postgres" else f"COUNT(`{random_field}`)",
)
# Testing update

View file

@ -483,15 +483,13 @@ class TestDBQuery(IntegrationTestCase):
self.assertTrue("count" in data[0])
data = DatabaseQuery("DocType").execute(
fields=["name", "issingle", "locate('', name) as _relevance"],
limit_start=0,
limit_page_length=1,
fields=["name", "issingle", "locate('','name') as _relevance"], limit_start=0, limit_page_length=1
)
self.assertTrue("_relevance" in data[0])
# Test that fields with keywords in strings are allowed
data = DatabaseQuery("DocType").execute(
fields=["name", "locate('select', name)"],
fields=["name", "locate('select', 'name')"],
limit_start=0,
limit_page_length=1,
)
@ -818,7 +816,7 @@ class TestDBQuery(IntegrationTestCase):
frappe.db.get_list(
"Web Form",
filters=[["Web Form Field", "reqd", "=", 1]],
fields=["count(*) as count"],
fields=[{"COUNT": "*", "as": "count"}],
order_by="count desc",
limit=50,
)
@ -846,7 +844,7 @@ class TestDBQuery(IntegrationTestCase):
"DocType",
filters={"docstatus": 0, "document_type": ("!=", "")},
group_by="document_type",
fields=["document_type", "sum(is_submittable) as is_submittable"],
fields=["document_type", {"SUM": "is_submittable", "as": "is_submittable"}],
limit=1,
as_list=True,
)
@ -876,128 +874,41 @@ class TestDBQuery(IntegrationTestCase):
def test_permlevel_fields(self):
with setup_patched_blog_post(), setup_test_user(set_user=True):
data = frappe.get_list(
self.assertRaises(
frappe.PermissionError,
frappe.get_list,
"Test Blog Post",
filters={"published": 1},
fields=["name", "published"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
self.assertRaises(
frappe.PermissionError,
frappe.get_list,
"Test Blog Post",
filters={"published": 1},
fields=["name", "`published`"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
self.assertRaises(
frappe.PermissionError,
frappe.get_list,
"Test Blog Post",
filters={"published": 1},
fields=["name", "`tabTest Blog Post`.`published`"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
self.assertRaises(
frappe.PermissionError,
frappe.get_list,
"Test Blog Post",
filters={"published": 1},
fields=["name", "`tabTest Child`.`test_field`"],
limit=1,
)
self.assertFalse("test_field" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "MAX(`published`)"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "LAST(published)"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "MAX(`modified`)"],
limit=1,
order_by=None,
group_by="name",
)
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "now() abhi"],
limit=1,
)
self.assertIsInstance(data[0]["abhi"], datetime.datetime)
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "'LABEL'"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertTrue("LABEL" in data[0].values())
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "COUNT(*) as count"],
limit=1,
order_by=None,
group_by="name",
)
self.assertTrue("count" in data[0])
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Test Blog Post",
filters={"published": 1},
fields=["name", "COUNT(*) count"],
limit=1,
order_by=None,
group_by="name",
)
self.assertTrue("count" in data[0])
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Test Blog Post",
fields=[
"name",
"blogger.full_name as blogger_full_name",
"blog_category.title",
],
limit=1,
)
print(data[0])
self.assertTrue("name" in data[0])
self.assertTrue("blogger_full_name" in data[0])
self.assertTrue("title" in data[0])
def test_cast_name(self):
from frappe.core.doctype.doctype.test_doctype import new_doctype
@ -1128,29 +1039,40 @@ class TestDBQuery(IntegrationTestCase):
):
frappe.get_all("Virtual DocType", filters={"name": "test"}, fields=["name"], limit=1)
def test_function_alias_in_clauses(self):
result = frappe.get_list(
"ToDo",
fields=["status", {"COUNT": "1", "as": "count"}],
group_by="status",
order_by="count desc",
limit=1,
)
self.assertTrue(result)
self.assertIn("count", result[0])
def test_coalesce_with_in_ops(self):
self.assertNotIn("ifnull", frappe.get_all("User", {"first_name": ("in", ["a", "b"])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("in", ["a", None])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("in", ["a", ""])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("in", [])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("not in", ["a"])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("not in", [])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"first_name": ("not in", [""])}, run=0))
self.assertNotIn("IF", frappe.get_all("User", {"first_name": ("in", ["a", "b"])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("in", ["a", None])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("in", ["a", ""])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("in", [])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("not in", ["a"])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("not in", [])}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"first_name": ("not in", [""])}, run=0))
# primary key is never nullable
self.assertNotIn("ifnull", frappe.get_all("User", {"name": ("in", ["a", None])}, run=0))
self.assertNotIn("ifnull", frappe.get_all("User", {"name": ("in", ["a", ""])}, run=0))
self.assertNotIn("ifnull", frappe.get_all("User", {"name": ("in", (""))}, run=0))
self.assertNotIn("ifnull", frappe.get_all("User", {"name": ("in", ())}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"name": ("in", ["a", None])}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"name": ("in", ["a", ""])}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"name": ("in", (""))}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"name": ("in", ())}, run=0))
def test_coalesce_with_datetime_ops(self):
self.assertNotIn("ifnull", frappe.get_all("User", {"last_active": (">", "2022-01-01")}, run=0))
self.assertNotIn("ifnull", frappe.get_all("User", {"creation": ("<", "2022-01-01")}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"last_active": (">", "2022-01-01")}, run=0))
self.assertNotIn("IFNULL", frappe.get_all("User", {"creation": ("<", "2022-01-01")}, run=0))
self.assertNotIn(
"ifnull",
"IFNULL",
frappe.get_all("User", {"last_active": ("between", ("2022-01-01", "2023-01-01"))}, run=0),
)
self.assertIn("ifnull", frappe.get_all("User", {"last_active": ("<", "2022-01-01")}, run=0))
self.assertIn("IFNULL", frappe.get_all("User", {"last_active": ("<", "2022-01-01")}, run=0))
def test_ambiguous_linked_tables(self):
from frappe.desk.reportview import get
@ -1222,13 +1144,13 @@ class TestDBQuery(IntegrationTestCase):
self.assertEqual(len(data["values"]), 1)
def test_select_star_expansion(self):
count = frappe.get_list("Language", ["SUM(1)", "COUNT(*)"], as_list=1, order_by=None)[0]
count = frappe.get_list("Language", [{"SUM": 1}, {"COUNT": "*"}], as_list=1, order_by=None)[0]
self.assertEqual(count[0], frappe.db.count("Language"))
self.assertEqual(count[1], frappe.db.count("Language"))
def test_ifnull_none(self):
query = frappe.get_all("DocField", {"fieldname": None}, run=0)
self.assertIn("''", query)
self.assertIn("IS NULL", query)
self.assertNotIn("\\'", query)
self.assertNotIn("ifnull", query)
self.assertFalse(frappe.get_all("DocField", {"name": None}))
@ -1424,8 +1346,10 @@ class TestReportView(IntegrationTestCase):
response = execute_cmd("frappe.desk.reportview.get")
self.assertNotIn("published", response["keys"])
# If none of the fields are accessible then result should be empty
self.assertEqual(frappe.get_list("Test Blog Post", "published"), [])
data = frappe.get_list("Test Blog Post", "published")
self.assertTrue(len(data) > 0)
self.assertTrue(all("name" in row for row in data))
self.assertTrue(all("published" not in row for row in data))
def test_reportview_get_admin(self):
# Admin should be able to see access all fields

View file

@ -70,13 +70,13 @@ class TestFrappeClient(IntegrationTestCase):
getlist_users = server.get_list(
"User",
fields=["count(name) as user_count"],
fields=[{"COUNT": "name", "as": "user_count"}],
filters={"user_type": "System User"},
group_by="user_type",
)
getall_users = frappe.db.get_all(
"User",
fields=["count(name) as system_user_count"],
fields=[{"COUNT": "name", "as": "system_user_count"}],
filters={"user_type": "System User"},
group_by="user_type",
)

View file

@ -153,7 +153,9 @@ class TestQuery(IntegrationTestCase):
"`tabUser`.`name` as alias",
"*",
"`tabHas Role`.`name`",
"field as `alias with space`",
]
invalid_fields = [
"name; DROP TABLE users",
"`name` ; SELECT * FROM secrets",
@ -166,7 +168,6 @@ class TestQuery(IntegrationTestCase):
"field with space",
"`field with space`",
"field as alias with space",
"field as `alias with space`",
"COUNT(*)",
"COUNT(name)",
"SUM(amount) as total",
@ -197,11 +198,10 @@ class TestQuery(IntegrationTestCase):
def test_field_validation_filters(self):
"""Test validation for fields used in filters (WHERE clause)."""
valid_fields = ["name", "creation", "language.name"]
valid_fields = ["name", "creation", "language.name", "`tabUser`.`name`"]
# Filters should not allow aliases or functions directly as field names
invalid_fields = [
"tabUser.name",
"`tabUser`.`name`",
"name as alias",
"`name` as alias",
"tabUser.name as alias",
@ -248,6 +248,7 @@ class TestQuery(IntegrationTestCase):
"1", # Allow numeric indices
"name, email",
"1, 2",
"`tabUser`.`name`",
]
# GROUP BY should not allow aliases or functions
invalid_fields = [
@ -262,7 +263,6 @@ class TestQuery(IntegrationTestCase):
"table.invalid-field",
"tabUser.name",
"`name`",
"`tabUser`.`name`",
"`name`, `tabUser`.`email`",
"`table`.`invalid-field`",
"field with space",
@ -293,6 +293,8 @@ class TestQuery(IntegrationTestCase):
"2 DESC",
"name, email",
"1 asc, 2 desc",
"`tabUser`.`name`",
"`tabUser`.`name` desc",
]
# ORDER BY should not allow aliases or functions, or invalid directions
invalid_fields = [
@ -305,10 +307,8 @@ class TestQuery(IntegrationTestCase):
"name /* comment */",
"`name`",
"tabUser.name",
"`tabUser`.`name`",
"`name` DESC",
"tabUser.name Asc",
"`tabUser`.`name` desc",
"`name` asc, `tabUser`.`email` DESC",
"invalid-field-name",
"table.invalid-field",
@ -419,6 +419,158 @@ class TestQuery(IntegrationTestCase):
"SELECT `name` FROM `tabDocType`",
)
def test_or_filters(self):
"""Test OR filter conditions."""
# Test 1: Basic dict or_filters
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters={"name": "User", "module": "Core"},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 2: List format or_filters
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters=[["name", "=", "User"], ["module", "=", "Core"]],
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `module`='Core'".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 3: OR filters with operators
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters={"name": ("like", "User%"), "module": ("in", ["Core", "Custom"])},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `module` IN ('Core','Custom')".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 4: Combining filters (AND) with or_filters (OR)
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
filters={"issingle": 0},
or_filters={"name": "User", "module": "Core"},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND (`name`='User' OR `module`='Core')".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 5: Multiple AND filters with OR filters
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
filters={"issingle": 0, "custom": 0},
or_filters={"name": "User", "module": "Core"},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `issingle`=0 AND `custom`=0 AND (`name`='User' OR `module`='Core')".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 6: OR filters with simple list (name IN)
self.assertEqual(
frappe.qb.get_query(
"DocType",
or_filters=["User", "Role", "Note"],
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name` IN ('User','Role','Note')".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 7: OR filters with greater than and less than
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters={"idx": (">", 5), "issingle": ("=", 1)},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `idx`>5 OR `issingle`=1".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 8: OR filters with list including doctype
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters=[["DocType", "name", "=", "User"], ["DocType", "name", "=", "Role"]],
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name`='User' OR `name`='Role'".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 9: OR filters with != operator
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters={"name": ("!=", "User"), "module": ("!=", "Core")},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name`<>'User' OR `module`<>'Core'".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 10: Empty or_filters should return query without OR conditions
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
filters={"custom": 0},
or_filters={},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `custom`=0".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 11: OR filters with not in operator
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name"],
or_filters={"name": ("not in", ["User", "Role"]), "module": ("=", "Core")},
).get_sql(),
"SELECT `name` FROM `tabDocType` WHERE `name` NOT IN ('User','Role') OR `module`='Core'".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
# Test 12: OR filters with mixed field types
self.assertEqual(
frappe.qb.get_query(
"DocType",
fields=["name", "module"],
or_filters=[
["name", "like", "User%"],
["issingle", "=", 1],
["custom", "=", 0],
],
).get_sql(),
"SELECT `name`,`module` FROM `tabDocType` WHERE `name` LIKE 'User%' OR `issingle`=1 OR `custom`=0".replace(
"`", '"' if frappe.db.db_type == "postgres" else "`"
),
)
def test_nested_filters(self):
"""Test nested filter conditions with AND/OR logic."""
User = frappe.qb.DocType("User")
@ -724,7 +876,7 @@ class TestQuery(IntegrationTestCase):
# Check for user permission condition in the query string
if frappe.db.db_type == "mariadb":
self.assertIn("`name` IS NULL OR `name` IN ('_Test Blog Post 1','_Test Blog Post')", query)
self.assertIn("IFNULL(`name`,'')='' OR `name` IN ('_Test Blog Post 1','_Test Blog Post')", query)
elif frappe.db.db_type == "postgres":
self.assertIn("\"name\" IS NULL OR \"name\" IN ('_Test Blog Post 1','_Test Blog Post')", query)
@ -1477,18 +1629,25 @@ class TestQuery(IntegrationTestCase):
frappe.qb.get_query("User", order_by=field).get_sql()
def test_backtick_rejection_group_order(self):
"""Test that backticks are properly rejected in GROUP BY and ORDER BY."""
"""Test that malformed backticks are properly rejected in GROUP BY and ORDER BY."""
# Test single backtick (invalid notation - should be `tabTable`.`field`)
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", group_by="`name`").get_sql()
self.assertIn("cannot contain backticks", str(cm.exception))
self.assertIn("invalid backtick notation", str(cm.exception))
# Test single backtick with direction (invalid notation)
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", order_by="`name` ASC").get_sql()
self.assertIn("cannot contain backticks", str(cm.exception))
self.assertIn("invalid backtick notation", str(cm.exception))
# Test multiple single backticks (invalid notation)
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", group_by="`name`, `email`").get_sql()
self.assertIn("cannot contain backticks", str(cm.exception))
self.assertIn("invalid backtick notation", str(cm.exception))
# Valid backtick notation should work
frappe.qb.get_query("User", group_by="`tabUser`.`name`").get_sql()
frappe.qb.get_query("User", order_by="`tabUser`.`name` ASC").get_sql()
def test_sql_functions_in_fields(self):
"""Test SQL function support in fields with various syntaxes."""
@ -1585,17 +1744,137 @@ class TestQuery(IntegrationTestCase):
# Test unsupported function validation
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"UNSUPPORTED_FUNC": "name"}]).get_sql()
self.assertIn("Unsupported function or invalid field name: UNSUPPORTED_FUNC", str(cm.exception))
self.assertIn("Unsupported function or operator: UNSUPPORTED_FUNC", str(cm.exception))
# Test unsupported function that might be confused with child field
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"UPPER": ["first_name"]}]).get_sql()
self.assertIn("Unsupported function or invalid field name: UPPER", str(cm.exception))
self.assertIn("Unsupported function or operator: UPPER", str(cm.exception))
# Test SQL injection attempt
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"DROP": "TABLE users"}]).get_sql()
self.assertIn("Unsupported function or invalid field name: DROP", str(cm.exception))
self.assertIn("Unsupported function or operator: DROP", str(cm.exception))
def test_arithmetic_operators_in_fields(self):
"""Test arithmetic operator support in fields."""
# Test simple addition
query = frappe.qb.get_query("User", fields=[{"ADD": [1, 2], "as": "sum_result"}])
sql = query.get_sql()
self.assertIn("1+2 `sum_result`", sql)
# Test simple subtraction
query = frappe.qb.get_query("User", fields=[{"SUB": [10, 5], "as": "diff_result"}])
sql = query.get_sql()
self.assertIn("10-5 `diff_result`", sql)
# Test simple multiplication
query = frappe.qb.get_query("User", fields=[{"MUL": [3, 4], "as": "prod_result"}])
sql = query.get_sql()
self.assertIn("3*4 `prod_result`", sql)
# Test simple division
query = frappe.qb.get_query("User", fields=[{"DIV": [10, 2], "as": "div_result"}])
sql = query.get_sql()
self.assertIn("10/2 `div_result`", sql)
# Test operator with field names
query = frappe.qb.get_query("User", fields=[{"ADD": ["enabled", "login_after"], "as": "field_sum"}])
sql = query.get_sql()
self.assertIn("`enabled`+`login_after` `field_sum`", sql)
# Test nested operators
query = frappe.qb.get_query("User", fields=[{"ADD": [{"MUL": [2, 3]}, 4], "as": "nested_result"}])
sql = query.get_sql()
self.assertIn("2*3+4 `nested_result`", sql)
# Test operator with function - NULLIF
query = frappe.qb.get_query(
"User", fields=[{"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "safe_div"}]
)
sql = query.get_sql()
self.assertIn("1/NULLIF(`enabled`,0) `safe_div`", sql)
# Test complex nested expression: (1 / NULLIF(value, 0))
query = frappe.qb.get_query(
"User",
fields=[
"name",
{"DIV": [1, {"NULLIF": ["enabled", 0]}], "as": "inverse"},
],
)
sql = query.get_sql()
self.assertIn("`name`", sql)
self.assertIn("1/NULLIF(`enabled`,0) `inverse`", sql)
# Test operator with LOCATE function (search relevance pattern)
query = frappe.qb.get_query(
"User",
fields=[
"name",
{"DIV": [1, {"NULLIF": [{"LOCATE": ["'test'", "name"]}, 0]}], "as": "relevance"},
],
)
sql = query.get_sql()
self.assertIn("1/NULLIF(LOCATE('test',`name`),0) `relevance`", sql)
# Test multiple operators in fields
query = frappe.qb.get_query(
"User",
fields=[
"name",
{"ADD": ["enabled", 1], "as": "enabled_plus_one"},
{"MUL": ["enabled", 2], "as": "enabled_times_two"},
],
)
sql = query.get_sql()
self.assertIn("`name`", sql)
self.assertIn("`enabled`+1 `enabled_plus_one`", sql)
self.assertIn("`enabled`*2 `enabled_times_two`", sql)
# Test operator without alias
query = frappe.qb.get_query("User", fields=[{"ADD": [1, 1]}])
sql = query.get_sql()
self.assertIn("1+1", sql)
# Test validation: operator requires exactly 2 arguments
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"ADD": [1, 2, 3]}]).get_sql()
self.assertIn("requires exactly 2 arguments", str(cm.exception))
# Test validation: operator with only 1 argument
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"DIV": [10]}]).get_sql()
self.assertIn("requires exactly 2 arguments", str(cm.exception))
# Test validation: operator with non-list arguments
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"MUL": "invalid"}]).get_sql()
self.assertIn("requires exactly 2 arguments", str(cm.exception))
# Test validation: unsupported operator
with self.assertRaises(frappe.ValidationError) as cm:
frappe.qb.get_query("User", fields=[{"XOR": [1, 2]}]).get_sql()
self.assertIn("Unsupported function or operator: XOR", str(cm.exception))
# Test deeply nested expression
query = frappe.qb.get_query(
"User",
fields=[
{
"DIV": [
{"ADD": [{"MUL": [2, 3]}, 4]},
{"SUB": [10, 5]},
],
"as": "complex_expr",
}
],
)
sql = query.get_sql()
# PyPika adds parentheses for clarity in complex expressions
self.assertIn("complex_expr", sql)
self.assertIn("/", sql)
def test_not_equal_condition_on_none(self):
self.assertQueryEqual(
@ -1607,7 +1886,7 @@ class TestQuery(IntegrationTestCase):
["DocType", "parent", "!=", None],
],
).get_sql(),
"SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` IS NULL AND `tabDocType`.`parent` IS NOT NULL",
"SELECT `tabDocType`.* FROM `tabDocType` LEFT JOIN `tabDocField` ON `tabDocField`.`parent`=`tabDocType`.`name` AND `tabDocField`.`parenttype`='DocType' AND `tabDocField`.`parentfield`='fields' WHERE `tabDocField`.`name` IS NULL AND `tabDocType`.`parent`<>''",
)

View file

@ -849,7 +849,7 @@ def get_site_info():
kwargs = {
"fields": ["user", "creation", "full_name"],
"filters": {"operation": "Login", "status": "Success"},
"limit": "10",
"limit": 10,
}
site_info = {

View file

@ -127,7 +127,7 @@ def get_workflow_state_count(doctype, workflow_state_field, states):
if workflow_state_field in frappe.get_meta(doctype).get_valid_columns():
result = frappe.get_all(
doctype,
fields=[workflow_state_field, "count(*) as count"],
fields=[workflow_state_field, {"COUNT": "*", "as": "count"}],
filters={workflow_state_field: ["not in", states]},
group_by=workflow_state_field,
)

View file

@ -239,8 +239,10 @@ def get_list(
if not filters:
filters = []
distinct = False
if not fields:
fields = "distinct *"
fields = "*"
distinct = True
if or_filters is None:
or_filters = []
@ -267,4 +269,5 @@ def get_list(
limit_page_length=limit_page_length,
ignore_permissions=ignore_permissions,
order_by=order_by,
distinct=distinct,
)