Replace a for append loop with list extend. Create the list with values instead of creating an empty list and extending it with another list.
1353 lines
40 KiB
Python
1353 lines
40 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
"""build query for doclistview and return results"""
|
|
|
|
import copy
|
|
import json
|
|
import re
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
|
|
import frappe
|
|
import frappe.defaults
|
|
import frappe.permissions
|
|
import frappe.share
|
|
from frappe import _
|
|
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
|
|
from frappe.database.utils import DefaultOrderBy, FallBackDateTimeStr, NestedSetHierarchy
|
|
from frappe.model import get_permitted_fields, optional_fields
|
|
from frappe.model.meta import get_table_columns
|
|
from frappe.model.utils import is_virtual_doctype
|
|
from frappe.model.utils.user_settings import get_user_settings, update_user_settings
|
|
from frappe.query_builder.utils import Column
|
|
from frappe.utils import (
|
|
add_to_date,
|
|
cint,
|
|
cstr,
|
|
flt,
|
|
get_filter,
|
|
get_time,
|
|
get_timespan_date_range,
|
|
make_filter_tuple,
|
|
)
|
|
from frappe.utils.data import sbool
|
|
|
|
LOCATE_PATTERN = re.compile(r"locate\([^,]+,\s*[`\"]?name[`\"]?\s*\)", flags=re.IGNORECASE)
|
|
LOCATE_CAST_PATTERN = re.compile(
|
|
r"locate\(([^,]+),\s*([`\"]?name[`\"]?)\s*\)", flags=re.IGNORECASE
|
|
)
|
|
FUNC_IFNULL_PATTERN = re.compile(
|
|
r"(strpos|ifnull|coalesce)\(\s*[`\"]?name[`\"]?\s*,", flags=re.IGNORECASE
|
|
)
|
|
CAST_VARCHAR_PATTERN = re.compile(
|
|
r"([`\"]?tab[\w`\" -]+\.[`\"]?name[`\"]?)(?!\w)", flags=re.IGNORECASE
|
|
)
|
|
ORDER_BY_PATTERN = re.compile(r"\ order\ by\ |\ asc|\ ASC|\ desc|\ DESC", flags=re.IGNORECASE)
|
|
SUB_QUERY_PATTERN = re.compile("^.*[,();@].*")
|
|
IS_QUERY_PATTERN = re.compile(r"^(select|delete|update|drop|create)\s")
|
|
IS_QUERY_PREDICATE_PATTERN = re.compile(
|
|
r"\s*[0-9a-zA-z]*\s*( from | group by | order by | where | join )"
|
|
)
|
|
FIELD_QUOTE_PATTERN = re.compile(r"[0-9a-zA-Z]+\s*'")
|
|
FIELD_COMMA_PATTERN = re.compile(r"[0-9a-zA-Z]+\s*,")
|
|
STRICT_FIELD_PATTERN = re.compile(r".*/\*.*")
|
|
STRICT_UNION_PATTERN = re.compile(r".*\s(union).*\s")
|
|
ORDER_GROUP_PATTERN = re.compile(r".*[^a-z0-9-_ ,`'\"\.\(\)].*")
|
|
FN_PARAMS_PATTERN = re.compile(r".*?\((.*)\).*")
|
|
SPECIAL_FIELD_CHARS = frozenset(("(", "`", ".", "'", '"', "*"))
|
|
|
|
|
|
class DatabaseQuery:
|
|
def __init__(self, doctype, user=None):
|
|
self.doctype = doctype
|
|
self.tables = []
|
|
self.link_tables = []
|
|
self.linked_table_aliases = {}
|
|
self.linked_table_counter = Counter()
|
|
self.conditions = []
|
|
self.or_conditions = []
|
|
self.fields = None
|
|
self.user = user or frappe.session.user
|
|
self.ignore_ifnull = False
|
|
self.flags = frappe._dict()
|
|
self.reference_doctype = None
|
|
self.permission_map = {}
|
|
self.shared = []
|
|
self._fetch_shared_documents = False
|
|
|
|
@property
|
|
def doctype_meta(self):
|
|
if not hasattr(self, "_doctype_meta"):
|
|
self._doctype_meta = frappe.get_meta(self.doctype)
|
|
return self._doctype_meta
|
|
|
|
@property
|
|
def query_tables(self):
|
|
return self.tables + [d.table_alias for d in self.link_tables]
|
|
|
|
def execute(
|
|
self,
|
|
fields=None,
|
|
filters=None,
|
|
or_filters=None,
|
|
docstatus=None,
|
|
group_by=None,
|
|
order_by=DefaultOrderBy,
|
|
limit_start=False,
|
|
limit_page_length=None,
|
|
as_list=False,
|
|
with_childnames=False,
|
|
debug=False,
|
|
ignore_permissions=False,
|
|
user=None,
|
|
with_comment_count=False,
|
|
join="left join",
|
|
distinct=False,
|
|
start=None,
|
|
page_length=None,
|
|
limit=None,
|
|
ignore_ifnull=False,
|
|
save_user_settings=False,
|
|
save_user_settings_fields=False,
|
|
update=None,
|
|
add_total_row=None,
|
|
user_settings=None,
|
|
reference_doctype=None,
|
|
run=True,
|
|
strict=True,
|
|
pluck=None,
|
|
ignore_ddl=False,
|
|
*,
|
|
parent_doctype=None,
|
|
) -> list:
|
|
|
|
if not ignore_permissions:
|
|
self.check_read_permission(self.doctype, parent_doctype=parent_doctype)
|
|
|
|
# filters and fields swappable
|
|
# its hard to remember what comes first
|
|
if isinstance(fields, dict) or (
|
|
fields and isinstance(fields, list) and isinstance(fields[0], list)
|
|
):
|
|
# if fields is given as dict/list of list, its probably filters
|
|
filters, fields = fields, filters
|
|
|
|
elif fields and isinstance(filters, list) and len(filters) > 1 and isinstance(filters[0], str):
|
|
# if `filters` is a list of strings, its probably fields
|
|
filters, fields = fields, filters
|
|
|
|
if fields:
|
|
self.fields = fields
|
|
else:
|
|
self.fields = [f"`tab{self.doctype}`.`{pluck or 'name'}`"]
|
|
|
|
if start:
|
|
limit_start = start
|
|
if page_length:
|
|
limit_page_length = page_length
|
|
if limit:
|
|
limit_page_length = limit
|
|
|
|
self.filters = filters or []
|
|
self.or_filters = or_filters or []
|
|
self.docstatus = docstatus or []
|
|
self.group_by = group_by
|
|
self.order_by = order_by
|
|
self.limit_start = cint(limit_start)
|
|
self.limit_page_length = cint(limit_page_length) if limit_page_length else None
|
|
self.with_childnames = with_childnames
|
|
self.debug = debug
|
|
self.join = join
|
|
self.distinct = distinct
|
|
self.as_list = as_list
|
|
self.ignore_ifnull = ignore_ifnull
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
self.user = user or frappe.session.user
|
|
self.update = update
|
|
self.user_settings_fields = copy.deepcopy(self.fields)
|
|
self.run = run
|
|
self.strict = strict
|
|
self.ignore_ddl = ignore_ddl
|
|
self.parent_doctype = parent_doctype
|
|
|
|
# for contextual user permission check
|
|
# to determine which user permission is applicable on link field of specific doctype
|
|
self.reference_doctype = reference_doctype or self.doctype
|
|
|
|
if user_settings:
|
|
self.user_settings = json.loads(user_settings)
|
|
|
|
if is_virtual_doctype(self.doctype):
|
|
from frappe.model.base_document import get_controller
|
|
|
|
controller = get_controller(self.doctype)
|
|
self.parse_args()
|
|
kwargs = {
|
|
"as_list": as_list,
|
|
"with_comment_count": with_comment_count,
|
|
"save_user_settings": save_user_settings,
|
|
"save_user_settings_fields": save_user_settings_fields,
|
|
"pluck": pluck,
|
|
"parent_doctype": parent_doctype,
|
|
} | self.__dict__
|
|
return controller.get_list(kwargs)
|
|
|
|
self.columns = self.get_table_columns()
|
|
|
|
# no table & ignore_ddl, return
|
|
if not self.columns:
|
|
return []
|
|
|
|
result = self.build_and_run()
|
|
|
|
if sbool(with_comment_count) and not as_list and self.doctype:
|
|
self.add_comment_count(result)
|
|
|
|
if save_user_settings:
|
|
self.save_user_settings_fields = save_user_settings_fields
|
|
self.update_user_settings()
|
|
|
|
if pluck:
|
|
return [d[pluck] for d in result]
|
|
|
|
return result
|
|
|
|
def build_and_run(self):
|
|
args = self.prepare_args()
|
|
args.limit = self.add_limit()
|
|
|
|
if args.conditions:
|
|
args.conditions = "where " + args.conditions
|
|
|
|
if self.distinct:
|
|
args.fields = "distinct " + args.fields
|
|
args.order_by = "" # TODO: recheck for alternative
|
|
|
|
# Postgres requires any field that appears in the select clause to also
|
|
# appear in the order by and group by clause
|
|
if frappe.db.db_type == "postgres" and args.order_by and args.group_by:
|
|
args = self.prepare_select_args(args)
|
|
|
|
query = (
|
|
"""select %(fields)s
|
|
from %(tables)s
|
|
%(conditions)s
|
|
%(group_by)s
|
|
%(order_by)s
|
|
%(limit)s"""
|
|
% args
|
|
)
|
|
|
|
return frappe.db.sql(
|
|
query,
|
|
as_dict=not self.as_list,
|
|
debug=self.debug,
|
|
update=self.update,
|
|
ignore_ddl=self.ignore_ddl,
|
|
run=self.run,
|
|
)
|
|
|
|
def prepare_args(self):
|
|
self.parse_args()
|
|
self.sanitize_fields()
|
|
self.extract_tables()
|
|
self.set_optional_columns()
|
|
self.build_conditions()
|
|
self.apply_fieldlevel_read_permissions()
|
|
|
|
args = frappe._dict()
|
|
|
|
if self.with_childnames:
|
|
for t in self.tables:
|
|
if t != f"`tab{self.doctype}`":
|
|
self.fields.append(f"{t}.name as '{t[4:-1]}:name'")
|
|
|
|
# query dict
|
|
args.tables = self.tables[0]
|
|
|
|
# left join parent, child tables
|
|
for child in self.tables[1:]:
|
|
parent_name = cast_name(f"{self.tables[0]}.name")
|
|
args.tables += f" {self.join} {child} on ({child}.parenttype = {frappe.db.escape(self.doctype)} and {child}.parent = {parent_name})"
|
|
|
|
# left join link tables
|
|
for link in self.link_tables:
|
|
args.tables += f" {self.join} {link.table_name} {link.table_alias} on ({link.table_alias}.`name` = {self.tables[0]}.`{link.fieldname}`)"
|
|
|
|
if self.grouped_or_conditions:
|
|
self.conditions.append(f"({' or '.join(self.grouped_or_conditions)})")
|
|
|
|
args.conditions = " and ".join(self.conditions)
|
|
|
|
if self.or_conditions:
|
|
args.conditions += (" or " if args.conditions else "") + " or ".join(self.or_conditions)
|
|
|
|
self.set_field_tables()
|
|
self.cast_name_fields()
|
|
|
|
fields = []
|
|
|
|
# Wrapping fields with grave quotes to allow support for sql keywords
|
|
# TODO: Add support for wrapping fields with sql functions and distinct keyword
|
|
for field in self.fields:
|
|
if field is None:
|
|
fields.append("NULL")
|
|
continue
|
|
|
|
stripped_field = field.strip().lower()
|
|
|
|
if (
|
|
stripped_field[0] in {"`", "*", '"', "'"}
|
|
or "(" in stripped_field
|
|
or "distinct" in stripped_field
|
|
):
|
|
fields.append(field)
|
|
elif "as" in stripped_field.split(" "):
|
|
col, _, new = field.split()
|
|
fields.append(f"`{col}` as {new}")
|
|
else:
|
|
fields.append(f"`{field}`")
|
|
|
|
args.fields = ", ".join(fields)
|
|
|
|
self.set_order_by(args)
|
|
|
|
self.validate_order_by_and_group_by(args.order_by)
|
|
args.order_by = args.order_by and (" order by " + args.order_by) or ""
|
|
|
|
self.validate_order_by_and_group_by(self.group_by)
|
|
args.group_by = self.group_by and (" group by " + self.group_by) or ""
|
|
|
|
return args
|
|
|
|
def prepare_select_args(self, args):
|
|
order_field = ORDER_BY_PATTERN.sub("", args.order_by)
|
|
|
|
if order_field not in args.fields:
|
|
extracted_column = order_column = order_field.replace("`", "")
|
|
if "." in extracted_column:
|
|
extracted_column = extracted_column.split(".")[1]
|
|
|
|
args.fields += f", MAX({extracted_column}) as `{order_column}`"
|
|
args.order_by = args.order_by.replace(order_field, f"`{order_column}`")
|
|
|
|
return args
|
|
|
|
def parse_args(self):
|
|
"""Convert fields and filters from strings to list, dicts"""
|
|
if isinstance(self.fields, str):
|
|
if self.fields == "*":
|
|
self.fields = ["*"]
|
|
else:
|
|
try:
|
|
self.fields = json.loads(self.fields)
|
|
except ValueError:
|
|
self.fields = [f.strip() for f in self.fields.split(",")]
|
|
|
|
# remove empty strings / nulls in fields
|
|
self.fields = [f for f in self.fields if f]
|
|
|
|
# convert child_table.fieldname to `tabChild DocType`.`fieldname`
|
|
for field in self.fields:
|
|
if "." in field:
|
|
original_field = field
|
|
alias = None
|
|
if " as " in field:
|
|
field, alias = field.split(" as ", 1)
|
|
linked_fieldname, fieldname = field.split(".", 1)
|
|
linked_field = frappe.get_meta(self.doctype).get_field(linked_fieldname)
|
|
# this is not a link field
|
|
if not linked_field:
|
|
continue
|
|
linked_doctype = linked_field.options
|
|
if linked_field.fieldtype == "Link":
|
|
linked_table = self.append_link_table(linked_doctype, linked_fieldname)
|
|
field = f"{linked_table.table_alias}.`{fieldname}`"
|
|
else:
|
|
field = f"`tab{linked_doctype}`.`{fieldname}`"
|
|
if alias:
|
|
field = f"{field} as {alias}"
|
|
self.fields[self.fields.index(original_field)] = field
|
|
|
|
for filter_name in ["filters", "or_filters"]:
|
|
filters = getattr(self, filter_name)
|
|
if isinstance(filters, str):
|
|
filters = json.loads(filters)
|
|
|
|
if isinstance(filters, dict):
|
|
fdict = filters
|
|
filters = [make_filter_tuple(self.doctype, key, value) for key, value in fdict.items()]
|
|
setattr(self, filter_name, filters)
|
|
|
|
def sanitize_fields(self):
|
|
"""
|
|
regex : ^.*[,();].*
|
|
purpose : The regex will look for malicious patterns like `,`, '(', ')', '@', ;' in each
|
|
field which may leads to sql injection.
|
|
example :
|
|
field = "`DocType`.`issingle`, version()"
|
|
As field contains `,` and mysql function `version()`, with the help of regex
|
|
the system will filter out this field.
|
|
"""
|
|
blacklisted_keywords = ["select", "create", "insert", "delete", "drop", "update", "case", "show"]
|
|
blacklisted_functions = [
|
|
"concat",
|
|
"concat_ws",
|
|
"if",
|
|
"ifnull",
|
|
"nullif",
|
|
"coalesce",
|
|
"connection_id",
|
|
"current_user",
|
|
"database",
|
|
"last_insert_id",
|
|
"session_user",
|
|
"system_user",
|
|
"user",
|
|
"version",
|
|
"global",
|
|
]
|
|
|
|
def _raise_exception():
|
|
frappe.throw(_("Use of sub-query or function is restricted"), frappe.DataError)
|
|
|
|
def _is_query(field):
|
|
if IS_QUERY_PATTERN.match(field):
|
|
_raise_exception()
|
|
|
|
elif IS_QUERY_PREDICATE_PATTERN.match(field):
|
|
_raise_exception()
|
|
|
|
for field in self.fields:
|
|
lower_field = field.lower().strip()
|
|
|
|
if SUB_QUERY_PATTERN.match(field):
|
|
if lower_field[0] == "(":
|
|
subquery_token = lower_field[1:].lstrip().split(" ", 1)[0]
|
|
if subquery_token in blacklisted_keywords:
|
|
_raise_exception()
|
|
|
|
function = lower_field.split("(", 1)[0].rstrip()
|
|
if function in blacklisted_functions:
|
|
frappe.throw(
|
|
_("Use of function {0} in field is restricted").format(function), exc=frappe.DataError
|
|
)
|
|
|
|
if "@" in lower_field:
|
|
# prevent access to global variables
|
|
_raise_exception()
|
|
|
|
if FIELD_QUOTE_PATTERN.match(field):
|
|
_raise_exception()
|
|
|
|
if FIELD_COMMA_PATTERN.match(field):
|
|
_raise_exception()
|
|
|
|
_is_query(field)
|
|
|
|
if self.strict:
|
|
if STRICT_FIELD_PATTERN.match(field):
|
|
frappe.throw(_("Illegal SQL Query"))
|
|
|
|
if STRICT_UNION_PATTERN.match(lower_field):
|
|
frappe.throw(_("Illegal SQL Query"))
|
|
|
|
def extract_tables(self):
|
|
"""extract tables from fields"""
|
|
self.tables = [f"`tab{self.doctype}`"]
|
|
sql_functions = [
|
|
"dayofyear(",
|
|
"extract(",
|
|
"locate(",
|
|
"strpos(",
|
|
"count(",
|
|
"sum(",
|
|
"avg(",
|
|
]
|
|
# add tables from fields
|
|
if self.fields:
|
|
for field in self.fields:
|
|
if not ("tab" in field and "." in field) or any(x for x in sql_functions if x in field):
|
|
continue
|
|
|
|
table_name = field.split(".", 1)[0]
|
|
|
|
# Check if table_name is a linked_table alias
|
|
for linked_table in self.link_tables:
|
|
if linked_table.table_alias == table_name:
|
|
table_name = linked_table.table_name
|
|
break
|
|
|
|
if table_name.lower().startswith("group_concat("):
|
|
table_name = table_name[13:]
|
|
if not table_name[0] == "`":
|
|
table_name = f"`{table_name}`"
|
|
if (
|
|
table_name not in self.query_tables and table_name not in self.linked_table_aliases.values()
|
|
):
|
|
self.append_table(table_name)
|
|
|
|
def append_table(self, table_name):
|
|
self.tables.append(table_name)
|
|
doctype = table_name[4:-1]
|
|
self.check_read_permission(doctype)
|
|
|
|
def append_link_table(self, doctype, fieldname):
|
|
for linked_table in self.link_tables:
|
|
if linked_table.doctype == doctype and linked_table.fieldname == fieldname:
|
|
return linked_table
|
|
|
|
self.check_read_permission(doctype)
|
|
self.linked_table_counter.update((doctype,))
|
|
linked_table = frappe._dict(
|
|
doctype=doctype,
|
|
fieldname=fieldname,
|
|
table_name=f"`tab{doctype}`",
|
|
table_alias=f"`tab{doctype}_{self.linked_table_counter[doctype]}`",
|
|
)
|
|
self.linked_table_aliases[linked_table.table_alias.replace("`", "")] = linked_table.table_name
|
|
self.link_tables.append(linked_table)
|
|
return linked_table
|
|
|
|
def check_read_permission(self, doctype: str, parent_doctype: str | None = None):
|
|
if self.flags.ignore_permissions:
|
|
return
|
|
|
|
if doctype not in self.permission_map:
|
|
self._set_permission_map(doctype, parent_doctype)
|
|
|
|
return self.permission_map[doctype]
|
|
|
|
def _set_permission_map(self, doctype: str, parent_doctype: str | None = None):
|
|
ptype = "select" if frappe.only_has_select_perm(doctype) else "read"
|
|
val = frappe.has_permission(
|
|
doctype,
|
|
ptype=ptype,
|
|
parent_doctype=parent_doctype or self.doctype,
|
|
)
|
|
if not val:
|
|
frappe.flags.error_message = _("Insufficient Permission for {0}").format(frappe.bold(doctype))
|
|
raise frappe.PermissionError(doctype)
|
|
self.permission_map[doctype] = ptype
|
|
|
|
def set_field_tables(self):
|
|
"""If there are more than one table, the fieldname must not be ambiguous.
|
|
If the fieldname is not explicitly mentioned, set the default table"""
|
|
|
|
def _in_standard_sql_methods(field):
|
|
methods = ("count(", "avg(", "sum(", "extract(", "dayofyear(")
|
|
return field.lower().startswith(methods)
|
|
|
|
if len(self.tables) > 1 or len(self.link_tables) > 0:
|
|
for idx, field in enumerate(self.fields):
|
|
if field is not None and "." not in field and not _in_standard_sql_methods(field):
|
|
self.fields[idx] = f"{self.tables[0]}.{field}"
|
|
|
|
def cast_name_fields(self):
|
|
for i, field in enumerate(self.fields):
|
|
if field is not None:
|
|
self.fields[i] = cast_name(field)
|
|
|
|
def get_table_columns(self):
|
|
try:
|
|
return get_table_columns(self.doctype)
|
|
except frappe.db.TableMissingError:
|
|
if self.ignore_ddl:
|
|
return None
|
|
else:
|
|
raise
|
|
|
|
def set_optional_columns(self):
|
|
"""Removes optional columns like `_user_tags`, `_comments` etc. if not in table"""
|
|
# remove from fields
|
|
to_remove = []
|
|
for fld in self.fields:
|
|
to_remove.extend(fld for f in optional_fields if f in fld and f not in self.columns)
|
|
for fld in to_remove:
|
|
del self.fields[self.fields.index(fld)]
|
|
|
|
# remove from filters
|
|
to_remove = []
|
|
for each in self.filters:
|
|
if isinstance(each, str):
|
|
each = [each]
|
|
|
|
to_remove.extend(
|
|
each for element in each if element in optional_fields and element not in self.columns
|
|
)
|
|
for each in to_remove:
|
|
if isinstance(self.filters, dict):
|
|
del self.filters[each]
|
|
else:
|
|
self.filters.remove(each)
|
|
|
|
def build_conditions(self):
|
|
self.conditions = []
|
|
self.grouped_or_conditions = []
|
|
self.build_filter_conditions(self.filters, self.conditions)
|
|
self.build_filter_conditions(self.or_filters, self.grouped_or_conditions)
|
|
|
|
# match conditions
|
|
if not self.flags.ignore_permissions:
|
|
match_conditions = self.build_match_conditions()
|
|
if match_conditions:
|
|
self.conditions.append(f"({match_conditions})")
|
|
|
|
def build_filter_conditions(self, filters, conditions: list, ignore_permissions=None):
|
|
"""build conditions from user filters"""
|
|
if ignore_permissions is not None:
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
|
|
if isinstance(filters, dict):
|
|
filters = [filters]
|
|
|
|
for f in filters:
|
|
if isinstance(f, str):
|
|
conditions.append(f)
|
|
else:
|
|
conditions.append(self.prepare_filter_condition(f))
|
|
|
|
def remove_field(self, idx: int):
|
|
if self.as_list:
|
|
self.fields[idx] = None
|
|
else:
|
|
self.fields.pop(idx)
|
|
|
|
def apply_fieldlevel_read_permissions(self):
|
|
"""Apply fieldlevel read permissions to the query
|
|
|
|
Note: Does not apply to `frappe.model.core_doctype_list`
|
|
|
|
Remove fields that user is not allowed to read. If `fields=["*"]` is passed, only permitted fields will
|
|
be returned.
|
|
|
|
Example:
|
|
- User has read permission only on `title` for DocType `Note`
|
|
- Query: fields=["*"]
|
|
- Result: fields=["title", ...] // will also include Frappe's meta field like `name`, `owner`, etc.
|
|
"""
|
|
if self.flags.ignore_permissions:
|
|
return
|
|
|
|
asterisk_fields = []
|
|
permitted_fields = get_permitted_fields(
|
|
doctype=self.doctype,
|
|
parenttype=self.parent_doctype,
|
|
permission_type=self.permission_map.get(self.doctype),
|
|
)
|
|
|
|
for i, field in enumerate(self.fields):
|
|
if "distinct" in field.lower():
|
|
# field: 'count(distinct `tabPhoto`.name) as total_count'
|
|
# column: 'tabPhoto.name'
|
|
if _fn := FN_PARAMS_PATTERN.findall(field):
|
|
column = _fn[0].replace("distinct ", "").replace("DISTINCT ", "").replace("`", "")
|
|
# field: 'distinct name'
|
|
# column: 'name'
|
|
else:
|
|
column = field.split(" ", 2)[1].replace("`", "")
|
|
else:
|
|
# field: 'count(`tabPhoto`.name) as total_count'
|
|
# column: 'tabPhoto.name'
|
|
column = field.split("(")[-1].split(")", 1)[0]
|
|
column = strip_alias(column).replace("`", "")
|
|
|
|
if column == "*" and not in_function("*", field):
|
|
asterisk_fields.append(i)
|
|
continue
|
|
|
|
# handle pseudo columns
|
|
elif not column or column.isnumeric():
|
|
continue
|
|
|
|
# labels / pseudo columns or frappe internals
|
|
elif column[0] in {"'", '"'} or column in optional_fields:
|
|
continue
|
|
|
|
# handle child / joined table fields
|
|
elif "." in field:
|
|
table, column = column.split(".", 1)
|
|
ch_doctype = table
|
|
|
|
if ch_doctype in self.linked_table_aliases:
|
|
ch_doctype = self.linked_table_aliases[ch_doctype]
|
|
|
|
ch_doctype = ch_doctype.replace("`", "").replace("tab", "", 1)
|
|
|
|
if wrap_grave_quotes(table) in self.query_tables:
|
|
permitted_child_table_fields = get_permitted_fields(
|
|
doctype=ch_doctype, parenttype=self.doctype
|
|
)
|
|
if column in permitted_child_table_fields or column in optional_fields:
|
|
continue
|
|
else:
|
|
self.remove_field(i)
|
|
else:
|
|
raise frappe.PermissionError(ch_doctype)
|
|
|
|
elif column in permitted_fields:
|
|
continue
|
|
|
|
# field inside function calls / * handles things like count(*)
|
|
elif "(" in field:
|
|
if "*" in field:
|
|
continue
|
|
elif _params := FN_PARAMS_PATTERN.findall(field):
|
|
params = (x.strip() for x in _params[0].split(","))
|
|
for param in params:
|
|
if not (
|
|
not param or param in permitted_fields or param.isnumeric() or "'" in param or '"' in param
|
|
):
|
|
self.remove_field(i)
|
|
break
|
|
continue
|
|
self.remove_field(i)
|
|
|
|
# remove if access not allowed
|
|
else:
|
|
self.remove_field(i)
|
|
|
|
# handle * fields
|
|
j = 0
|
|
for i in asterisk_fields:
|
|
self.fields[i + j : i + j + 1] = permitted_fields
|
|
j = j + len(permitted_fields) - 1
|
|
|
|
def prepare_filter_condition(self, f):
|
|
"""Returns a filter condition in the format:
|
|
ifnull(`tabDocType`.`fieldname`, fallback) operator "value"
|
|
"""
|
|
|
|
# TODO: refactor
|
|
|
|
from frappe.boot import get_additional_filters_from_hooks
|
|
|
|
additional_filters_config = get_additional_filters_from_hooks()
|
|
f = get_filter(self.doctype, f, additional_filters_config)
|
|
|
|
tname = "`tab" + f.doctype + "`"
|
|
if tname not in self.tables:
|
|
self.append_table(tname)
|
|
|
|
column_name = cast_name(f.fieldname if "ifnull(" in f.fieldname else f"{tname}.`{f.fieldname}`")
|
|
|
|
if f.operator.lower() in additional_filters_config:
|
|
f.update(get_additional_filter_field(additional_filters_config, f, f.value))
|
|
|
|
meta = frappe.get_meta(f.doctype)
|
|
can_be_null = True
|
|
|
|
# prepare in condition
|
|
if f.operator.lower() in NestedSetHierarchy:
|
|
values = f.value or ""
|
|
|
|
# TODO: handle list and tuple
|
|
# if not isinstance(values, (list, tuple)):
|
|
# values = values.split(",")
|
|
field = meta.get_field(f.fieldname)
|
|
ref_doctype = field.options if field else f.doctype
|
|
lft, rgt = "", ""
|
|
if f.value:
|
|
lft, rgt = frappe.db.get_value(ref_doctype, f.value, ["lft", "rgt"])
|
|
|
|
# Get descendants elements of a DocType with a tree structure
|
|
if f.operator.lower() in (
|
|
"descendants of",
|
|
"not descendants of",
|
|
"descendants of (inclusive)",
|
|
):
|
|
nodes = frappe.get_all(
|
|
ref_doctype,
|
|
filters={"lft": [">", lft], "rgt": ["<", rgt]},
|
|
order_by="`lft` ASC",
|
|
pluck="name",
|
|
)
|
|
if f.operator.lower() == "descendants of (inclusive)":
|
|
nodes += [f.value]
|
|
else:
|
|
# Get ancestor elements of a DocType with a tree structure
|
|
nodes = frappe.get_all(
|
|
ref_doctype,
|
|
filters={"lft": ["<", lft], "rgt": [">", rgt]},
|
|
order_by="`lft` DESC",
|
|
pluck="name",
|
|
)
|
|
|
|
fallback = "''"
|
|
value = [frappe.db.escape((cstr(v)).strip(), percent=False) for v in nodes]
|
|
if len(value):
|
|
value = f"({', '.join(value)})"
|
|
else:
|
|
value = "('')"
|
|
|
|
# changing operator to IN as the above code fetches all the parent / child values and convert into tuple
|
|
# which can be directly used with IN operator to query.
|
|
f.operator = (
|
|
"not in" if f.operator.lower() in ("not ancestors of", "not descendants of") else "in"
|
|
)
|
|
|
|
elif f.operator.lower() in ("in", "not in"):
|
|
# if values contain '' or falsy values then only coalesce column
|
|
# for `in` query this is only required if values contain '' or values are empty.
|
|
# for `not in` queries we can't be sure as column values might contain null.
|
|
if f.operator.lower() == "in":
|
|
can_be_null = not f.value or any(v is None or v == "" for v in f.value)
|
|
|
|
values = f.value or ""
|
|
if isinstance(values, str):
|
|
values = values.split(",")
|
|
|
|
fallback = "''"
|
|
value = [frappe.db.escape((cstr(v) or "").strip(), percent=False) for v in values]
|
|
if len(value):
|
|
value = f"({', '.join(value)})"
|
|
else:
|
|
value = "('')"
|
|
|
|
else:
|
|
escape = True
|
|
df = meta.get("fields", {"fieldname": f.fieldname})
|
|
df = df[0] if df else None
|
|
|
|
if df and df.fieldtype in ("Check", "Float", "Int", "Currency", "Percent"):
|
|
can_be_null = False
|
|
|
|
if f.operator.lower() in ("previous", "next", "timespan"):
|
|
date_range = get_date_range(f.operator.lower(), f.value)
|
|
f.operator = "Between"
|
|
f.value = date_range
|
|
fallback = f"'{FallBackDateTimeStr}'"
|
|
|
|
if f.operator in (">", "<", ">=", "<=") and (f.fieldname in ("creation", "modified")):
|
|
value = cstr(f.value)
|
|
fallback = f"'{FallBackDateTimeStr}'"
|
|
|
|
elif f.operator.lower() in ("between") and (
|
|
f.fieldname in ("creation", "modified")
|
|
or (df and (df.fieldtype == "Date" or df.fieldtype == "Datetime"))
|
|
):
|
|
|
|
escape = False
|
|
value = get_between_date_filter(f.value, df)
|
|
fallback = f"'{FallBackDateTimeStr}'"
|
|
|
|
elif f.operator.lower() == "is":
|
|
if f.value == "set":
|
|
f.operator = "!="
|
|
# Value can technically be null, but comparing with null will always be falsy
|
|
# Not using coalesce here is faster because indexes can be used.
|
|
# null != '' -> null ~ falsy
|
|
# '' != '' -> false
|
|
can_be_null = False
|
|
elif f.value == "not set":
|
|
f.operator = "="
|
|
fallback = "''"
|
|
can_be_null = True
|
|
|
|
value = ""
|
|
|
|
if can_be_null and "ifnull" not in column_name.lower():
|
|
column_name = f"ifnull({column_name}, {fallback})"
|
|
|
|
elif df and df.fieldtype == "Date":
|
|
value = frappe.db.format_date(f.value)
|
|
fallback = "'0001-01-01'"
|
|
|
|
elif (df and df.fieldtype == "Datetime") or isinstance(f.value, datetime):
|
|
value = frappe.db.format_datetime(f.value)
|
|
fallback = f"'{FallBackDateTimeStr}'"
|
|
|
|
elif df and df.fieldtype == "Time":
|
|
value = get_time(f.value).strftime("%H:%M:%S.%f")
|
|
fallback = "'00:00:00'"
|
|
|
|
elif f.operator.lower() in ("like", "not like") or (
|
|
isinstance(f.value, str)
|
|
and (not df or df.fieldtype not in ["Float", "Int", "Currency", "Percent", "Check"])
|
|
):
|
|
value = "" if f.value is None else f.value
|
|
fallback = "''"
|
|
|
|
if f.operator.lower() in ("like", "not like") and isinstance(value, str):
|
|
# because "like" uses backslash (\) for escaping
|
|
value = value.replace("\\", "\\\\").replace("%", "%%")
|
|
|
|
elif (
|
|
f.operator == "=" and df and df.fieldtype in ["Link", "Data"]
|
|
): # TODO: Refactor if possible
|
|
value = f.value or "''"
|
|
fallback = "''"
|
|
|
|
elif f.fieldname == "name":
|
|
value = f.value or "''"
|
|
fallback = "''"
|
|
|
|
else:
|
|
value = flt(f.value)
|
|
fallback = 0
|
|
|
|
if isinstance(f.value, Column):
|
|
can_be_null = False # added to avoid the ifnull/coalesce addition
|
|
quote = '"' if frappe.conf.db_type == "postgres" else "`"
|
|
value = f"{tname}.{quote}{f.value.name}{quote}"
|
|
|
|
# escape value
|
|
elif escape and isinstance(value, str):
|
|
value = f"{frappe.db.escape(value, percent=False)}"
|
|
|
|
if (
|
|
self.ignore_ifnull
|
|
or not can_be_null
|
|
or (f.value and f.operator.lower() in ("=", "like"))
|
|
or "ifnull(" in column_name.lower()
|
|
):
|
|
if f.operator.lower() == "like" and frappe.conf.get("db_type") == "postgres":
|
|
f.operator = "ilike"
|
|
condition = f"{column_name} {f.operator} {value}"
|
|
else:
|
|
condition = f"ifnull({column_name}, {fallback}) {f.operator} {value}"
|
|
|
|
return condition
|
|
|
|
def build_match_conditions(self, as_condition=True) -> str | list:
|
|
"""add match conditions if applicable"""
|
|
self.match_filters = []
|
|
self.match_conditions = []
|
|
only_if_shared = False
|
|
if not self.user:
|
|
self.user = frappe.session.user
|
|
|
|
if not self.tables:
|
|
self.extract_tables()
|
|
|
|
role_permissions = frappe.permissions.get_role_permissions(self.doctype_meta, user=self.user)
|
|
if (
|
|
not self.doctype_meta.istable
|
|
and not (role_permissions.get("select") or role_permissions.get("read"))
|
|
and not self.flags.ignore_permissions
|
|
and not has_any_user_permission_for_doctype(self.doctype, self.user, self.reference_doctype)
|
|
):
|
|
only_if_shared = True
|
|
self.shared = frappe.share.get_shared(self.doctype, self.user)
|
|
if not self.shared:
|
|
frappe.throw(_("No permission to read {0}").format(_(self.doctype)), frappe.PermissionError)
|
|
else:
|
|
self.conditions.append(self.get_share_condition())
|
|
|
|
else:
|
|
# skip user perm check if owner constraint is required
|
|
if requires_owner_constraint(role_permissions):
|
|
self._fetch_shared_documents = True
|
|
self.match_conditions.append(
|
|
f"`tab{self.doctype}`.`owner` = {frappe.db.escape(self.user, percent=False)}"
|
|
)
|
|
|
|
# add user permission only if role has read perm
|
|
elif role_permissions.get("read") or role_permissions.get("select"):
|
|
# get user permissions
|
|
user_permissions = frappe.permissions.get_user_permissions(self.user)
|
|
self.add_user_permissions(user_permissions)
|
|
|
|
# Only when full read access is not present fetch shared docuemnts.
|
|
# This is done to avoid extra query.
|
|
# Only following cases can require explicit addition of shared documents.
|
|
# 1. DocType has if_owner constraint and hence can't see shared documents
|
|
# 2. DocType has user permissions and hence can't see shared documents
|
|
if self._fetch_shared_documents:
|
|
self.shared = frappe.share.get_shared(self.doctype, self.user)
|
|
|
|
if as_condition:
|
|
conditions = ""
|
|
if self.match_conditions:
|
|
# will turn out like ((blog_post in (..) and blogger in (...)) or (blog_category in (...)))
|
|
conditions = "((" + ") or (".join(self.match_conditions) + "))"
|
|
|
|
doctype_conditions = self.get_permission_query_conditions()
|
|
if doctype_conditions:
|
|
conditions += (" and " + doctype_conditions) if conditions else doctype_conditions
|
|
|
|
# share is an OR condition, if there is a role permission
|
|
if not only_if_shared and self.shared and conditions:
|
|
conditions = f"(({conditions}) or ({self.get_share_condition()}))"
|
|
|
|
return conditions
|
|
|
|
else:
|
|
return self.match_filters
|
|
|
|
def get_share_condition(self):
|
|
return (
|
|
cast_name(f"`tab{self.doctype}`.name")
|
|
+ f" in ({', '.join(frappe.db.escape(s, percent=False) for s in self.shared)})"
|
|
)
|
|
|
|
def add_user_permissions(self, user_permissions):
|
|
doctype_link_fields = []
|
|
doctype_link_fields = self.doctype_meta.get_link_fields()
|
|
|
|
# append current doctype with fieldname as 'name' as first link field
|
|
doctype_link_fields.append(
|
|
dict(
|
|
options=self.doctype,
|
|
fieldname="name",
|
|
)
|
|
)
|
|
|
|
match_filters = {}
|
|
match_conditions = []
|
|
for df in doctype_link_fields:
|
|
if df.get("ignore_user_permissions"):
|
|
continue
|
|
|
|
user_permission_values = user_permissions.get(df.get("options"), {})
|
|
|
|
if user_permission_values:
|
|
docs = []
|
|
if frappe.get_system_settings("apply_strict_user_permissions"):
|
|
condition = ""
|
|
else:
|
|
empty_value_condition = cast_name(
|
|
f"ifnull(`tab{self.doctype}`.`{df.get('fieldname')}`, '')=''"
|
|
)
|
|
condition = empty_value_condition + " or "
|
|
|
|
for permission in user_permission_values:
|
|
if not permission.get("applicable_for"):
|
|
docs.append(permission.get("doc"))
|
|
|
|
# append docs based on user permission applicable on reference doctype
|
|
# this is useful when getting list of docs from a link field
|
|
# in this case parent doctype of the link
|
|
# will be the reference doctype
|
|
|
|
elif df.get("fieldname") == "name" and self.reference_doctype:
|
|
if permission.get("applicable_for") == self.reference_doctype:
|
|
docs.append(permission.get("doc"))
|
|
|
|
elif permission.get("applicable_for") == self.doctype:
|
|
docs.append(permission.get("doc"))
|
|
|
|
if docs:
|
|
values = ", ".join(frappe.db.escape(doc, percent=False) for doc in docs)
|
|
condition += cast_name(f"`tab{self.doctype}`.`{df.get('fieldname')}`") + f" in ({values})"
|
|
match_conditions.append(f"({condition})")
|
|
match_filters[df.get("options")] = docs
|
|
|
|
if match_conditions:
|
|
self._fetch_shared_documents = True
|
|
self.match_conditions.append(" and ".join(match_conditions))
|
|
|
|
if match_filters:
|
|
self._fetch_shared_documents = True
|
|
self.match_filters.append(match_filters)
|
|
|
|
def get_permission_query_conditions(self):
|
|
conditions = []
|
|
condition_methods = frappe.get_hooks("permission_query_conditions", {}).get(self.doctype, [])
|
|
if condition_methods:
|
|
for method in condition_methods:
|
|
c = frappe.call(frappe.get_attr(method), self.user)
|
|
if c:
|
|
conditions.append(c)
|
|
|
|
permision_script_name = get_server_script_map().get("permission_query", {}).get(self.doctype)
|
|
if permision_script_name:
|
|
script = frappe.get_doc("Server Script", permision_script_name)
|
|
condition = script.get_permission_query_conditions(self.user)
|
|
if condition:
|
|
conditions.append(condition)
|
|
|
|
return " and ".join(conditions) if conditions else ""
|
|
|
|
def set_order_by(self, args):
|
|
if self.order_by and self.order_by != "KEEP_DEFAULT_ORDERING":
|
|
args.order_by = self.order_by
|
|
else:
|
|
args.order_by = ""
|
|
|
|
# don't add order by from meta if a mysql group function is used without group by clause
|
|
group_function_without_group_by = (
|
|
len(self.fields) == 1
|
|
and (
|
|
self.fields[0].lower().startswith("count(")
|
|
or self.fields[0].lower().startswith("min(")
|
|
or self.fields[0].lower().startswith("max(")
|
|
)
|
|
and not self.group_by
|
|
)
|
|
|
|
if not group_function_without_group_by:
|
|
sort_field = sort_order = None
|
|
if self.doctype_meta.sort_field and "," in self.doctype_meta.sort_field:
|
|
# multiple sort given in doctype definition
|
|
# Example:
|
|
# `idx desc, modified desc`
|
|
# will covert to
|
|
# `tabItem`.`idx` desc, `tabItem`.`modified` desc
|
|
args.order_by = ", ".join(
|
|
f"`tab{self.doctype}`.`{f_split[0].strip()}` {f_split[1].strip()}"
|
|
for f in self.doctype_meta.sort_field.split(",")
|
|
if (f_split := f.split(maxsplit=2))
|
|
)
|
|
else:
|
|
sort_field = self.doctype_meta.sort_field or "modified"
|
|
sort_order = (self.doctype_meta.sort_field and self.doctype_meta.sort_order) or "desc"
|
|
if self.order_by:
|
|
args.order_by = f"`tab{self.doctype}`.`{sort_field or 'modified'}` {sort_order or 'desc'}"
|
|
|
|
# draft docs always on top
|
|
if hasattr(self.doctype_meta, "is_submittable") and self.doctype_meta.is_submittable:
|
|
if self.order_by:
|
|
args.order_by = f"`tab{self.doctype}`.docstatus asc, {args.order_by}"
|
|
|
|
def validate_order_by_and_group_by(self, parameters: str):
|
|
"""Check order by, group by so that atleast one column is selected and does not have subquery"""
|
|
if not parameters:
|
|
return
|
|
|
|
blacklisted_sql_functions = {
|
|
"sleep",
|
|
}
|
|
_lower = parameters.lower()
|
|
|
|
if "select" in _lower and "from" in _lower:
|
|
frappe.throw(_("Cannot use sub-query in order by"))
|
|
|
|
if ORDER_GROUP_PATTERN.match(_lower):
|
|
frappe.throw(_("Illegal SQL Query"))
|
|
|
|
for field in parameters.split(","):
|
|
field = field.strip()
|
|
function = field.split("(", 1)[0].rstrip().lower()
|
|
full_field_name = "." in field and field.startswith("`tab")
|
|
|
|
if full_field_name:
|
|
tbl = field.split(".", 1)[0]
|
|
if tbl not in self.tables:
|
|
if tbl.startswith("`"):
|
|
tbl = tbl[4:-1]
|
|
frappe.throw(_("Please select atleast 1 column from {0} to sort/group").format(tbl))
|
|
|
|
if function in blacklisted_sql_functions:
|
|
frappe.throw(_("Cannot use {0} in order/group by").format(field))
|
|
|
|
def add_limit(self):
|
|
if self.limit_page_length:
|
|
return f"limit {self.limit_page_length} offset {self.limit_start}"
|
|
else:
|
|
return ""
|
|
|
|
def add_comment_count(self, result):
|
|
for r in result:
|
|
if not r.name:
|
|
continue
|
|
|
|
r._comment_count = 0
|
|
if "_comments" in r:
|
|
r._comment_count = len(json.loads(r._comments or "[]"))
|
|
|
|
def update_user_settings(self):
|
|
# update user settings if new search
|
|
user_settings = json.loads(get_user_settings(self.doctype))
|
|
|
|
if hasattr(self, "user_settings"):
|
|
user_settings.update(self.user_settings)
|
|
|
|
if self.save_user_settings_fields:
|
|
user_settings["fields"] = self.user_settings_fields
|
|
|
|
update_user_settings(self.doctype, user_settings)
|
|
|
|
|
|
def cast_name(column: str) -> str:
|
|
"""Casts name field to varchar for postgres
|
|
|
|
Handles majorly 4 cases:
|
|
1. locate
|
|
2. strpos
|
|
3. ifnull
|
|
4. coalesce
|
|
|
|
Uses regex substitution.
|
|
|
|
Example:
|
|
input - "ifnull(`tabBlog Post`.`name`, '')=''"
|
|
output - "ifnull(cast(`tabBlog Post`.`name` as varchar), '')=''" """
|
|
|
|
if frappe.db.db_type == "mariadb":
|
|
return column
|
|
|
|
kwargs = {"string": column}
|
|
if "cast(" not in column.lower() and "::" not in column:
|
|
if LOCATE_PATTERN.search(**kwargs):
|
|
return LOCATE_CAST_PATTERN.sub(r"locate(\1, cast(\2 as varchar))", **kwargs)
|
|
|
|
elif match := FUNC_IFNULL_PATTERN.search(**kwargs):
|
|
func = match.groups()[0]
|
|
return re.sub(rf"{func}\(\s*([`\"]?name[`\"]?)\s*,", rf"{func}(cast(\1 as varchar),", **kwargs)
|
|
|
|
return CAST_VARCHAR_PATTERN.sub(r"cast(\1 as varchar)", **kwargs)
|
|
|
|
return column
|
|
|
|
|
|
def check_parent_permission(parent, child_doctype):
|
|
if parent:
|
|
# User may pass fake parent and get the information from the child table
|
|
if child_doctype and not (
|
|
frappe.db.exists("DocField", {"parent": parent, "options": child_doctype})
|
|
or frappe.db.exists("Custom Field", {"dt": parent, "options": child_doctype})
|
|
):
|
|
raise frappe.PermissionError
|
|
|
|
if frappe.permissions.has_permission(parent):
|
|
return
|
|
|
|
# Either parent not passed or the user doesn't have permission on parent doctype of child table!
|
|
raise frappe.PermissionError
|
|
|
|
|
|
def get_order_by(doctype, meta):
|
|
order_by = ""
|
|
|
|
sort_field = sort_order = None
|
|
if meta.sort_field and "," in meta.sort_field:
|
|
# multiple sort given in doctype definition
|
|
# Example:
|
|
# `idx desc, modified desc`
|
|
# will covert to
|
|
# `tabItem`.`idx` desc, `tabItem`.`modified` desc
|
|
order_by = ", ".join(
|
|
f"`tab{doctype}`.`{f_split[0].strip()}` {f_split[1].strip()}"
|
|
for f in meta.sort_field.split(",")
|
|
if (f_split := f.split(maxsplit=2))
|
|
)
|
|
|
|
else:
|
|
sort_field = meta.sort_field or "modified"
|
|
sort_order = (meta.sort_field and meta.sort_order) or "desc"
|
|
order_by = f"`tab{doctype}`.`{sort_field or 'modified'}` {sort_order or 'desc'}"
|
|
|
|
# draft docs always on top
|
|
if meta.is_submittable:
|
|
order_by = f"`tab{doctype}`.docstatus asc, {order_by}"
|
|
|
|
return order_by
|
|
|
|
|
|
def has_any_user_permission_for_doctype(doctype, user, applicable_for):
|
|
user_permissions = frappe.permissions.get_user_permissions(user=user)
|
|
doctype_user_permissions = user_permissions.get(doctype, [])
|
|
|
|
for permission in doctype_user_permissions:
|
|
if not permission.applicable_for or permission.applicable_for == applicable_for:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_between_date_filter(value, df=None):
|
|
"""
|
|
return the formattted date as per the given example
|
|
[u'2017-11-01', u'2017-11-03'] => '2017-11-01 00:00:00.000000' AND '2017-11-04 00:00:00.000000'
|
|
"""
|
|
from_date = frappe.utils.nowdate()
|
|
to_date = frappe.utils.nowdate()
|
|
|
|
if value and isinstance(value, (list, tuple)):
|
|
if len(value) >= 1:
|
|
from_date = value[0]
|
|
if len(value) >= 2:
|
|
to_date = value[1]
|
|
|
|
if not df or (df and df.fieldtype == "Datetime"):
|
|
to_date = add_to_date(to_date, days=1)
|
|
|
|
if df and df.fieldtype == "Datetime":
|
|
data = "'{}' AND '{}'".format(
|
|
frappe.db.format_datetime(from_date),
|
|
frappe.db.format_datetime(to_date),
|
|
)
|
|
else:
|
|
data = f"'{frappe.db.format_date(from_date)}' AND '{frappe.db.format_date(to_date)}'"
|
|
|
|
return data
|
|
|
|
|
|
def get_additional_filter_field(additional_filters_config, f, value):
|
|
additional_filter = additional_filters_config[f.operator.lower()]
|
|
f = frappe._dict(frappe.get_attr(additional_filter["get_field"])())
|
|
if f.query_value:
|
|
for option in f.options:
|
|
option = frappe._dict(option)
|
|
if option.value == value:
|
|
f.value = option.query_value
|
|
return f
|
|
|
|
|
|
def get_date_range(operator: str, value: str):
|
|
timespan_map = {
|
|
"1 week": "week",
|
|
"1 month": "month",
|
|
"3 months": "quarter",
|
|
"6 months": "6 months",
|
|
"1 year": "year",
|
|
}
|
|
period_map = {
|
|
"previous": "last",
|
|
"next": "next",
|
|
}
|
|
|
|
if operator != "timespan":
|
|
timespan = f"{period_map[operator]} {timespan_map[value]}"
|
|
else:
|
|
timespan = value
|
|
|
|
return get_timespan_date_range(timespan)
|
|
|
|
|
|
def requires_owner_constraint(role_permissions):
|
|
"""Returns True if "select" or "read" isn't available without being creator."""
|
|
|
|
if not role_permissions.get("has_if_owner_enabled"):
|
|
return
|
|
|
|
if_owner_perms = role_permissions.get("if_owner")
|
|
if not if_owner_perms:
|
|
return
|
|
|
|
# has select or read without if owner, no need for constraint
|
|
for perm_type in ("select", "read"):
|
|
if role_permissions.get(perm_type) and perm_type not in if_owner_perms:
|
|
return
|
|
|
|
# not checking if either select or read if present in if_owner_perms
|
|
# because either of those is required to perform a query
|
|
return True
|
|
|
|
|
|
def wrap_grave_quotes(table: str) -> str:
|
|
if table[0] != "`":
|
|
table = f"`{table}`"
|
|
return table
|
|
|
|
|
|
def is_plain_field(field: str) -> bool:
|
|
for char in field:
|
|
if char in SPECIAL_FIELD_CHARS:
|
|
return False
|
|
return True
|
|
|
|
|
|
def in_function(substr: str, field: str) -> bool:
|
|
try:
|
|
return substr in field and field.index("(") < field.index(substr) < field.index(")")
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def strip_alias(field: str) -> str:
|
|
# Note: Currently only supports aliases that use the " AS " syntax
|
|
if " as " in field.lower():
|
|
return field.split(" as ", 1)[0]
|
|
return field
|