diff --git a/frappe/database/query.py b/frappe/database/query.py index 03273c7a97..32c65bce0e 100644 --- a/frappe/database/query.py +++ b/frappe/database/query.py @@ -1,9 +1,8 @@ import operator from typing import Any, Dict, List, Tuple, Union -from frappe.query_builder import Order - import frappe +from frappe.query_builder import Criterion, Order class Query: @@ -107,20 +106,91 @@ class Query: def make_function(self, key: Any, value: Union[int, str]): return self.operator_map[value[0]](key, value[1]) - def dict_query(self, table: str, filters: Dict[str, Union[str, int]] = None, - orderby:str = None, order:Order = None): - """Generate condition object using filters + @staticmethod + def change_orderby(order: str): + """Convert orderby to standart Order object + + Args: + order (str): Field, order + + Returns: + tuple: field, order + """ + order = order.split() + if order[1].lower() == "asc": + orderby, order = order[0], Order.asc + return orderby, order + orderby, order = order[0], Order.desc + return orderby, order + + @staticmethod + def get_condition(table: str, **kwargs) -> frappe.qb: + """Get initial table object Args: table (str): DocType - filters (Dict[str, Union[str, int]], optional): Conditions. Defaults to None. - orderby (str, optional): field to order by. Defaults to None. - order (Order, optional): order. Defaults to None. + + Returns: + frappe.qb: DocType with initial condition + """ + if kwargs.get("update"): + return frappe.qb.update(table) + if kwargs.get("into"): + return frappe.qb.into(table) + return frappe.qb.from_(table) + + def criterion_query(self, table: str, criterion: Criterion, **kwargs) -> frappe.qb: + """Generate filters from Criterion objects + + Args: + table (str): DocType + criterion (Criterion): Filters + + Returns: + frappe.qb: condition object + """ + condition = self.get_condition(table, **kwargs) + return condition.where(criterion) + + + def add_conditions(self, conditions: frappe.qb, **kwargs): + """Adding additional conditions + + Args: + conditions (frappe.qb): built conditions + + Returns: + conditions (frappe.qb): frappe.qb object + """ + if kwargs.get("orderby"): + orderby = kwargs.get("orderby") + order = kwargs.get("order") if kwargs.get("order") else Order.desc + if isinstance(orderby, str) and len(orderby.split()) > 1: + orderby, order = self.change_orderby(orderby) + conditions = conditions.orderby(orderby, order=order) + + if kwargs.get("limit"): + conditions = conditions.limit(kwargs.get("limit")) + + if kwargs.get("distinct"): + conditions = conditions.distinct() + + if kwargs.get("for_update"): + conditions = conditions.for_update() + + return conditions + + def dict_query(self, table: str, filters: Dict[str, Union[str, int]] = None, **kwargs): + """Build conditions using the given filters + + Args: + table (str): DocType + filters (Dict[str, Union[str, int]], optional): Filters. Defaults to None. Returns: condition: conditions object """ - conditions = frappe.qb.from_(table) + conditions = self.get_condition(table, **kwargs) if not filters: return conditions @@ -140,13 +210,10 @@ class Query: conditions = conditions.where(_operator(frappe.qb.Field(key), value[1])) else: conditions = conditions.where(_operator(frappe.qb.Field(key), value)) - if orderby: - order = order if order else Order.desc - return conditions.orderby(orderby, order=order) + conditions = self.add_conditions(conditions, **kwargs) return conditions - def build_conditions(self, table: str, filters: Union[Dict[str, Union[str, int]], str, int] = None, - orderby: str = None, order: Order = None) -> frappe.qb: + def build_conditions(self, table: str, filters: Union[Dict[str, Union[str, int]], str, int] = None, **kwargs) -> frappe.qb: """Build conditions for sql query Args: @@ -156,7 +223,10 @@ class Query: Returns: frappe.qb: frappe.qb conditions object """ + if isinstance(filters, Criterion): + return self.criterion_query(table, filters, **kwargs) + if isinstance(filters, int) or isinstance(filters, str): filters = {"name": str(filters)} - return self.dict_query(filters=filters, table=table, orderby=orderby, order=order) \ No newline at end of file + return self.dict_query(filters=filters, table=table, **kwargs) \ No newline at end of file