* fix(system console): allow read-only queries in system console * fix(security): make transactions strictly read only for console and scripts * test: remove test to reflect ban on cte's for security * fix: Only enable read only mode for SQL query --------- Co-authored-by: Ankush Menat <ankush@frappe.io>
301 lines
8.5 KiB
Python
301 lines
8.5 KiB
Python
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
import frappe
|
|
import frappe.utils
|
|
from frappe.desk.query_report import build_xlsx_data, export_query, run
|
|
from frappe.tests import IntegrationTestCase
|
|
from frappe.utils.xlsxutils import make_xlsx
|
|
|
|
|
|
class TestQueryReport(IntegrationTestCase):
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
cls.enterClassContext(cls.enable_safe_exec())
|
|
return super().setUpClass()
|
|
|
|
def tearDown(self):
|
|
frappe.db.rollback()
|
|
|
|
def test_xlsx_data_with_multiple_datatypes(self):
|
|
"""Test exporting report using rows with multiple datatypes (list, dict)"""
|
|
|
|
# Create mock data
|
|
data = create_mock_data()
|
|
|
|
# Define the visible rows
|
|
visible_idx = [0, 2, 3]
|
|
|
|
# Build the result
|
|
xlsx_data, column_widths, _ = build_xlsx_data(data, visible_idx, include_indentation=0)
|
|
|
|
self.assertEqual(type(xlsx_data), list)
|
|
self.assertEqual(len(xlsx_data), 4) # columns + data
|
|
# column widths are divided by 10 to match the scale that is supported by openpyxl
|
|
self.assertListEqual(column_widths, [0, 10, 15])
|
|
|
|
for row in xlsx_data:
|
|
self.assertIsInstance(row, list)
|
|
|
|
# ensure all types are preserved
|
|
for row in xlsx_data[1:]:
|
|
for cell in row:
|
|
self.assertIsInstance(cell, (int, float))
|
|
|
|
def test_xlsx_data_with_filters(self):
|
|
"""Test building xlsx data along with filters"""
|
|
|
|
# Create mock data
|
|
data = create_mock_data()
|
|
data.filters = {"Label 1": "Filter Value", "Label 2": None, "Label 3": list(range(5))}
|
|
|
|
# Define the visible rows
|
|
visible_idx = [0, 2, 3]
|
|
|
|
# Build the result
|
|
xlsx_data, _column_widths, header_index = build_xlsx_data(
|
|
data, visible_idx, include_indentation=False, include_filters=True
|
|
)
|
|
|
|
self.assertEqual(header_index, 3) # 2 filter rows + 1 empty row
|
|
|
|
# Check if unset filters are skipped | Rows - 2 filters + 1 empty + 1 column + 3 data
|
|
self.assertEqual(len(xlsx_data), 7)
|
|
|
|
# Check filter formatting
|
|
self.assertListEqual(xlsx_data[:2], [["Label 1", "Filter Value"], ["Label 3", "0, 1, 2, 3, 4"]])
|
|
|
|
def test_xlsx_export_with_composite_cell_value(self):
|
|
"""Test excel export using rows with composite cell value"""
|
|
|
|
data = frappe._dict()
|
|
data.columns = [
|
|
{"label": "Column A", "fieldname": "column_a", "fieldtype": "Float"},
|
|
{"label": "Column B", "fieldname": "column_b", "width": 150, "fieldtype": "Data"},
|
|
]
|
|
data.result = [
|
|
[1.0, "Dummy 1"],
|
|
{"column_a": 22.1, "column_b": ["Dummy 1", "Dummy 2"]}, # composite value in column_b
|
|
]
|
|
|
|
# Define the visible rows
|
|
visible_idx = [0, 1]
|
|
|
|
# Build the result
|
|
xlsx_data, column_widths, header_index = build_xlsx_data(data, visible_idx, include_indentation=0)
|
|
# Export to excel
|
|
make_xlsx(xlsx_data, "Query Report", column_widths=column_widths, header_index=header_index)
|
|
|
|
for row in xlsx_data:
|
|
# column_b should be 'str' even with composite cell value
|
|
self.assertEqual(type(row[1]), str)
|
|
|
|
def test_csv(self):
|
|
from csv import QUOTE_ALL, QUOTE_MINIMAL, QUOTE_NONE, QUOTE_NONNUMERIC, DictReader
|
|
from io import StringIO
|
|
|
|
REPORT_NAME = "Test CSV Report"
|
|
REF_DOCTYPE = "DocType"
|
|
REPORT_COLUMNS = ["name", "module", "issingle"]
|
|
|
|
if not frappe.db.exists("Report", REPORT_NAME):
|
|
report = frappe.new_doc("Report")
|
|
report.report_name = REPORT_NAME
|
|
report.ref_doctype = "User"
|
|
report.report_type = "Query Report"
|
|
report.query = frappe.qb.from_(REF_DOCTYPE).select(*REPORT_COLUMNS).limit(10).get_sql()
|
|
report.is_standard = "No"
|
|
report.save()
|
|
|
|
for delimiter in (",", ";", "\t", "|"):
|
|
for quoting in (QUOTE_ALL, QUOTE_MINIMAL, QUOTE_NONE, QUOTE_NONNUMERIC):
|
|
frappe.local.form_dict = frappe._dict(
|
|
{
|
|
"report_name": REPORT_NAME,
|
|
"file_format_type": "CSV",
|
|
"csv_quoting": quoting,
|
|
"csv_delimiter": delimiter,
|
|
"include_indentation": 0,
|
|
"visible_idx": [0, 1, 2],
|
|
}
|
|
)
|
|
frappe.db.commit()
|
|
export_query()
|
|
|
|
self.assertTrue(frappe.response["filename"].endswith(".csv"))
|
|
self.assertEqual(frappe.response["type"], "binary")
|
|
with StringIO(frappe.response["filecontent"].decode("utf-8")) as result:
|
|
reader = DictReader(result, delimiter=delimiter, quoting=quoting)
|
|
row = reader.__next__()
|
|
for column in REPORT_COLUMNS:
|
|
self.assertIn(column, row)
|
|
|
|
frappe.delete_doc("Report", REPORT_NAME, delete_permanently=True)
|
|
frappe.db.commit()
|
|
|
|
def test_report_for_duplicate_column_names(self):
|
|
"""Test report with duplicate column names"""
|
|
|
|
try:
|
|
fields = [
|
|
{"label": "First Name", "fieldname": "first_name", "fieldtype": "Data"},
|
|
{"label": "Last Name", "fieldname": "last_name", "fieldtype": "Data"},
|
|
]
|
|
frappe.get_doc(
|
|
{
|
|
"doctype": "DocType",
|
|
"name": "Doc A",
|
|
"module": "Core",
|
|
"custom": 1,
|
|
"autoname": "field:first_name",
|
|
"fields": fields,
|
|
"permissions": [{"role": "System Manager"}],
|
|
}
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
frappe.get_doc(
|
|
{
|
|
"doctype": "DocType",
|
|
"name": "Doc B",
|
|
"module": "Core",
|
|
"custom": 1,
|
|
"autoname": "field:last_name",
|
|
"fields": fields,
|
|
"permissions": [{"role": "System Manager"}],
|
|
}
|
|
).insert(ignore_if_duplicate=True)
|
|
|
|
for i in range(1, 3):
|
|
frappe.get_doc({"doctype": "Doc A", "first_name": f"John{i}", "last_name": "Doe"}).insert()
|
|
frappe.get_doc({"doctype": "Doc B", "last_name": f"Doe{i}", "first_name": "John"}).insert()
|
|
|
|
if not frappe.db.exists("Report", "Doc A Report"):
|
|
report = frappe.get_doc(
|
|
{
|
|
"doctype": "Report",
|
|
"ref_doctype": "Doc A",
|
|
"report_name": "Doc A Report",
|
|
"report_type": "Script Report",
|
|
"is_standard": "No",
|
|
}
|
|
).insert(ignore_permissions=True)
|
|
else:
|
|
report = frappe.get_doc("Report", "Doc A Report")
|
|
|
|
report.report_script = """
|
|
result = [["Ritvik","Sardana", "Doe1"],["Shariq","Ansari", "Doe2"]]
|
|
columns = [{
|
|
"label": "First Name",
|
|
"fieldname": "first_name",
|
|
"fieldtype": "Data",
|
|
"width": 180,
|
|
},
|
|
{
|
|
"label": "Last Name",
|
|
"fieldname": "last_name",
|
|
"fieldtype": "Data",
|
|
"width": 180,
|
|
},
|
|
{
|
|
"label": "Linked Field",
|
|
"fieldname": "linked_field",
|
|
"fieldtype": "Link",
|
|
"options": "Doc B",
|
|
"width": 180,
|
|
},
|
|
]
|
|
|
|
data = columns, result
|
|
"""
|
|
report.save()
|
|
|
|
custom_columns = [
|
|
{
|
|
"fieldname": "first_name-Doc_B",
|
|
"fieldtype": "Data",
|
|
"label": "First Name",
|
|
"insert_after_index": 1,
|
|
"link_field": {"fieldname": "linked_field", "names": {}},
|
|
"doctype": "Doc B",
|
|
"width": 100,
|
|
"id": "first_name-Doc_B",
|
|
"name": "First Name",
|
|
"editable": False,
|
|
"compareValue": None,
|
|
},
|
|
]
|
|
|
|
response = run(
|
|
"Doc A Report",
|
|
filters={"user": "Administrator", "doctype": "Doc A"},
|
|
custom_columns=custom_columns,
|
|
)
|
|
|
|
self.assertListEqual(
|
|
["first_name", "last_name", "first_name-Doc_B", "linked_field"],
|
|
[d["fieldname"] for d in response["columns"]],
|
|
)
|
|
|
|
self.assertDictEqual(
|
|
{
|
|
"first_name": "Ritvik",
|
|
"last_name": "Sardana",
|
|
"linked_field": "Doe1",
|
|
"first_name-Doc_B": "John",
|
|
},
|
|
response["result"][0],
|
|
)
|
|
|
|
except Exception as e:
|
|
raise e
|
|
frappe.db.rollback()
|
|
|
|
def test_export_report_via_email(self):
|
|
REPORT_NAME = "Test CSV Report"
|
|
REF_DOCTYPE = "DocType"
|
|
REPORT_COLUMNS = ["name", "module", "issingle"]
|
|
|
|
if not frappe.db.exists("Report", REPORT_NAME):
|
|
report = frappe.new_doc("Report")
|
|
report.report_name = REPORT_NAME
|
|
report.ref_doctype = "User"
|
|
report.report_type = "Query Report"
|
|
report.query = frappe.qb.from_(REF_DOCTYPE).select(*REPORT_COLUMNS).limit(10).get_sql()
|
|
report.is_standard = "No"
|
|
report.save()
|
|
|
|
frappe.local.form_dict = frappe._dict(
|
|
{
|
|
"report_name": REPORT_NAME,
|
|
"file_format_type": "CSV",
|
|
"include_indentation": 0,
|
|
"visible_idx": [0, 1, 2],
|
|
"export_in_background": 1,
|
|
}
|
|
)
|
|
frappe.db.delete("Email Queue")
|
|
frappe.db.commit()
|
|
export_query()
|
|
|
|
email_queue = frappe.get_all("Email Queue")
|
|
|
|
self.assertTrue(email_queue, "Email was not enqueued")
|
|
|
|
frappe.delete_doc("Report", REPORT_NAME, delete_permanently=True)
|
|
frappe.db.commit()
|
|
|
|
|
|
def create_mock_data():
|
|
data = frappe._dict()
|
|
data.columns = [
|
|
{"label": "Column A", "fieldname": "column_a", "fieldtype": "Float"},
|
|
{"label": "Column B", "fieldname": "column_b", "width": 100, "fieldtype": "Float"},
|
|
{"label": "Column C", "fieldname": "column_c", "width": 150, "fieldtype": "Duration"},
|
|
]
|
|
data.result = [
|
|
[1.0, 3.0, 600],
|
|
{"column_a": 22.1, "column_b": 21.8, "column_c": 86412},
|
|
{"column_b": 5.1, "column_c": 53234, "column_a": 11.1},
|
|
[3.0, 1.5, 333],
|
|
]
|
|
return data
|