Merge pull request #20908 from netchampfaris/qb-getquery-child-fields-syntax-sugar

This commit is contained in:
Faris Ansari 2023-05-11 12:02:30 +05:30 committed by GitHub
commit 2df7fdd79e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 1 deletions

View file

@ -88,9 +88,12 @@ class Engine:
if not self.fields:
self.fields = [getattr(self.table, "name")]
self.query._child_queries = []
for field in self.fields:
if isinstance(field, DynamicTableField):
self.query = field.apply_select(self.query)
elif isinstance(field, ChildQuery):
self.query._child_queries.append(field)
else:
self.query = self.query.select(field)
@ -301,6 +304,9 @@ class Engine:
for field in fields:
if isinstance(field, Criterion):
_fields.append(field)
elif isinstance(field, dict):
for child_field, fields in field.items():
_fields.append(ChildQuery(child_field, fields, self.doctype))
elif isinstance(field, str):
if "," in field:
field = field.casefold() if "`" not in field else field
@ -457,6 +463,35 @@ class LinkTableField(DynamicTableField):
return query
class ChildQuery:
def __init__(
self,
fieldname: str,
fields: list,
parent_doctype: str,
) -> None:
field = frappe.get_meta(parent_doctype).get_field(fieldname)
if field.fieldtype not in frappe.model.table_fields:
return
self.fieldname = fieldname
self.fields = fields
self.parent_doctype = parent_doctype
self.doctype = field.options
def get_query(self, parent_names=None) -> QueryBuilder:
filters = {
"parenttype": self.parent_doctype,
"parentfield": self.fieldname,
"parent": ["in", parent_names],
}
return frappe.qb.get_query(
self.doctype,
fields=self.fields + ["parent", "parentfield"],
filters=filters,
order_by="idx asc",
)
def literal_eval_(literal):
try:
return literal_eval(literal)

View file

@ -81,8 +81,27 @@ def patch_query_execute():
"""
def execute_query(query, *args, **kwargs):
child_queries = query._child_queries if isinstance(query._child_queries, list) else []
query, params = prepare_query(query)
return frappe.db.sql(query, params, *args, **kwargs) # nosemgrep
result = frappe.db.sql(query, params, *args, **kwargs) # nosemgrep
execute_child_queries(child_queries, result)
return result
def execute_child_queries(queries, result):
if not result or not isinstance(result[0], dict) or not result[0].name:
return
parent_names = [d.name for d in result]
for child_query in queries:
data = child_query.get_query(parent_names).run(as_dict=1)
for row in result:
row[child_query.fieldname] = []
for d in data:
if str(d.parent) == str(row.name) and d.parentfield == child_query.fieldname:
if "parent" not in child_query.fields:
del d["parent"]
if "parentfield" not in child_query.fields:
del d["parentfield"]
row[child_query.fieldname].append(d)
def prepare_query(query):
import inspect

View file

@ -419,3 +419,37 @@ class TestQuery(FrappeTestCase):
frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'")
frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`")
def test_child_field_syntax(self):
note1 = frappe.get_doc(
doctype="Note", title="Note 1", seen_by=[{"user": "Administrator"}]
).insert()
note2 = frappe.get_doc(
doctype="Note", title="Note 2", seen_by=[{"user": "Administrator"}, {"user": "Guest"}]
).insert()
result = frappe.qb.get_query(
"Note",
filters={"name": ["in", [note1.name, note2.name]]},
fields=["name", {"seen_by": ["*"]}],
order_by="title asc",
).run(as_dict=1)
self.assertTrue(isinstance(result[0].seen_by, list))
self.assertTrue(isinstance(result[1].seen_by, list))
self.assertEqual(len(result[0].seen_by), 1)
self.assertEqual(len(result[1].seen_by), 2)
self.assertEqual(result[0].seen_by[0].user, "Administrator")
result = frappe.qb.get_query(
"Note",
filters={"name": ["in", [note1.name, note2.name]]},
fields=["name", {"seen_by": ["user"]}],
order_by="title asc",
).run(as_dict=1)
self.assertEqual(len(result[0].seen_by[0].keys()), 1)
self.assertEqual(result[1].seen_by[1].user, "Guest")
note1.delete()
note2.delete()