fix: use dict syntax instead of string
This commit is contained in:
parent
41d7563aff
commit
c4bb732eaa
2 changed files with 34 additions and 30 deletions
|
|
@ -92,7 +92,7 @@ class Engine:
|
|||
for field in self.fields:
|
||||
if isinstance(field, DynamicTableField):
|
||||
self.query = field.apply_select(self.query)
|
||||
elif isinstance(field, ChildTableFields):
|
||||
elif isinstance(field, ChildQuery):
|
||||
self.query._child_queries.append(field)
|
||||
else:
|
||||
self.query = self.query.select(field)
|
||||
|
|
@ -304,15 +304,16 @@ class Engine:
|
|||
for field in fields:
|
||||
if isinstance(field, Criterion):
|
||||
_fields.append(field)
|
||||
elif isinstance(field, dict):
|
||||
for child_field, fields in field.items():
|
||||
_fields.append(ChildQuery(child_field, fields, self.doctype))
|
||||
elif isinstance(field, str):
|
||||
if "," in field and "[" not in field:
|
||||
if "," in field:
|
||||
field = field.casefold() if "`" not in field else field
|
||||
field_list = COMMA_PATTERN.split(field)
|
||||
for field in field_list:
|
||||
if _field := field.strip():
|
||||
_fields.append(parse_field(_field))
|
||||
elif "[" in field and "]" in field:
|
||||
_fields.append(parse_field(field))
|
||||
else:
|
||||
_fields.append(parse_field(field))
|
||||
|
||||
|
|
@ -401,17 +402,6 @@ class DynamicTableField:
|
|||
elif linked_field.fieldtype in frappe.model.table_fields:
|
||||
return ChildTableField(linked_doctype, fieldname, doctype, alias=alias)
|
||||
|
||||
if "[" in field and "]" in field:
|
||||
child_fieldname, child_fields = field.split("[", 1)
|
||||
child_field = frappe.get_meta(doctype).get_field(child_fieldname)
|
||||
child_doctype = child_field.options
|
||||
child_fields = child_fields.rsplit("]", 1)[0]
|
||||
if not child_fields:
|
||||
child_fields = ["*"]
|
||||
else:
|
||||
child_fields = [f.strip() for f in child_fields.split(",")]
|
||||
return ChildTableFields(child_doctype, child_fieldname, child_fields, doctype)
|
||||
|
||||
def apply_select(self, query: QueryBuilder) -> QueryBuilder:
|
||||
raise NotImplementedError
|
||||
|
||||
|
|
@ -473,18 +463,20 @@ class LinkTableField(DynamicTableField):
|
|||
return query
|
||||
|
||||
|
||||
class ChildTableFields:
|
||||
class ChildQuery:
|
||||
def __init__(
|
||||
self,
|
||||
doctype: str,
|
||||
fieldname: str,
|
||||
fields: list,
|
||||
parent_doctype: str,
|
||||
) -> None:
|
||||
self.doctype = doctype
|
||||
field = frappe.get_meta(parent_doctype).get_field(fieldname)
|
||||
if field.fieldtype not in frappe.model.table_fields:
|
||||
return
|
||||
self.fieldname = fieldname
|
||||
self.fields = fields + ["parent", "parentfield"]
|
||||
self.fields = fields
|
||||
self.parent_doctype = parent_doctype
|
||||
self.doctype = field.options
|
||||
|
||||
def get_query(self, parent_names=None) -> QueryBuilder:
|
||||
filters = {
|
||||
|
|
@ -492,7 +484,12 @@ class ChildTableFields:
|
|||
"parentfield": self.fieldname,
|
||||
"parent": ["in", parent_names],
|
||||
}
|
||||
return frappe.qb.get_query(self.doctype, fields=self.fields, filters=filters, order_by="idx asc")
|
||||
return frappe.qb.get_query(
|
||||
self.doctype,
|
||||
fields=self.fields + ["parent", "parentfield"],
|
||||
filters=filters,
|
||||
order_by="idx asc",
|
||||
)
|
||||
|
||||
|
||||
def literal_eval_(literal):
|
||||
|
|
|
|||
|
|
@ -82,20 +82,27 @@ def patch_query_execute():
|
|||
|
||||
def execute_query(query, *args, **kwargs):
|
||||
child_queries = query._child_queries if isinstance(query._child_queries, list) else []
|
||||
|
||||
query, params = prepare_query(query)
|
||||
result = frappe.db.sql(query, params, *args, **kwargs) # nosemgrep
|
||||
|
||||
if result and isinstance(result[0], dict) and result[0].name:
|
||||
parent_names = [d.name for d in result]
|
||||
for child_query in child_queries or []:
|
||||
data = child_query.get_query(parent_names).run(as_dict=1)
|
||||
for row in result:
|
||||
row[child_query.fieldname] = [
|
||||
d for d in data if str(d.parent) == str(row.name) and d.parentfield == child_query.fieldname
|
||||
]
|
||||
execute_child_queries(child_queries, result)
|
||||
return result
|
||||
|
||||
def execute_child_queries(queries, result):
|
||||
if not result or not isinstance(result[0], dict) or not result[0].name:
|
||||
return
|
||||
parent_names = [d.name for d in result]
|
||||
for child_query in queries:
|
||||
data = child_query.get_query(parent_names).run(as_dict=1)
|
||||
for row in result:
|
||||
row[child_query.fieldname] = []
|
||||
for d in data:
|
||||
if str(d.parent) == str(row.name) and d.parentfield == child_query.fieldname:
|
||||
if "parent" not in child_query.fields:
|
||||
del d["parent"]
|
||||
if "parentfield" not in child_query.fields:
|
||||
del d["parentfield"]
|
||||
row[child_query.fieldname].append(d)
|
||||
|
||||
def prepare_query(query):
|
||||
import inspect
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue