1116 lines
32 KiB
Python
1116 lines
32 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
import frappe
|
|
import frappe.desk.reportview
|
|
from frappe import _
|
|
from frappe.core.utils import ljust_list
|
|
from frappe.desk.reportview import clean_params, parse_json
|
|
from frappe.model.utils import render_include
|
|
from frappe.modules import get_module_path, scrub
|
|
from frappe.monitor import add_data_to_monitor
|
|
from frappe.permissions import get_role_permissions, get_roles, has_permission
|
|
from frappe.utils import cint, cstr, flt, format_datetime, format_duration, formatdate, get_html_format, sbool
|
|
from frappe.utils.caching import request_cache
|
|
from frappe.utils.xlsxutils import XLSXMetadata, XLSXStyleBuilder, handle_html, make_xlsx
|
|
|
|
|
|
def get_report_doc(report_name):
|
|
doc = frappe.get_doc("Report", report_name)
|
|
doc.custom_columns = []
|
|
doc.custom_filters = []
|
|
|
|
if doc.report_type == "Custom Report":
|
|
custom_report_doc = doc
|
|
doc = get_reference_report(doc)
|
|
doc.custom_report = report_name
|
|
if custom_report_doc.json:
|
|
data = json.loads(custom_report_doc.json)
|
|
if data:
|
|
doc.custom_columns = data.get("columns")
|
|
doc.custom_filters = data.get("filters")
|
|
doc.is_custom_report = True
|
|
|
|
# Follow whatever the custom report has set for prepared report field
|
|
doc.prepared_report = custom_report_doc.prepared_report
|
|
|
|
if not doc.is_permitted():
|
|
frappe.throw(
|
|
_("You don't have access to Report: {0}").format(_(doc.name)),
|
|
frappe.PermissionError,
|
|
)
|
|
|
|
if not frappe.has_permission(doc.ref_doctype, "report"):
|
|
frappe.throw(
|
|
_("You don't have permission to get a report on: {0}").format(_(doc.ref_doctype)),
|
|
frappe.PermissionError,
|
|
)
|
|
|
|
if doc.disabled:
|
|
frappe.throw(_("Report {0} is disabled").format(_(report_name)))
|
|
|
|
return doc
|
|
|
|
|
|
def get_report_result(report, filters):
|
|
res = None
|
|
|
|
if report.report_type == "Query Report":
|
|
res = report.execute_query_report(filters)
|
|
|
|
elif report.report_type == "Script Report":
|
|
res = report.execute_script_report(filters)
|
|
|
|
elif report.report_type == "Custom Report":
|
|
ref_report = get_report_doc(report.report_name)
|
|
res = get_report_result(ref_report, filters)
|
|
|
|
return res
|
|
|
|
|
|
@frappe.read_only()
|
|
def generate_report_result(
|
|
report,
|
|
filters=None,
|
|
user=None,
|
|
custom_columns=None,
|
|
is_tree=False,
|
|
parent_field=None,
|
|
):
|
|
user = user or frappe.session.user
|
|
filters = filters or []
|
|
|
|
if filters and isinstance(filters, str):
|
|
filters = json.loads(filters)
|
|
|
|
res = get_report_result(report, filters) or []
|
|
|
|
columns, result, message, chart, report_summary, skip_total_row = ljust_list(res, 6)
|
|
columns = [get_column_as_dict(col) for col in (columns or [])]
|
|
report_column_names = [col["fieldname"] for col in columns]
|
|
# convert to list of dicts
|
|
|
|
result = normalize_result(result, columns)
|
|
|
|
if report.get("custom_columns"):
|
|
# saved columns (with custom columns / with different column order)
|
|
columns = report.custom_columns
|
|
|
|
# unsaved custom_columns
|
|
if custom_columns:
|
|
for custom_column in custom_columns:
|
|
columns.insert(custom_column["insert_after_index"] + 1, custom_column)
|
|
|
|
# all columns which are not in original report
|
|
report_custom_columns = [column for column in columns if column["fieldname"] not in report_column_names]
|
|
|
|
if report_custom_columns:
|
|
result = add_custom_column_data(report_custom_columns, result)
|
|
|
|
if result:
|
|
result = get_filtered_data(report.ref_doctype, columns, result, user)
|
|
|
|
has_total_row = cint(report.add_total_row) and result and not skip_total_row
|
|
|
|
if has_total_row:
|
|
result = add_total_row(result, columns, is_tree=is_tree, parent_field=parent_field)
|
|
|
|
if isinstance(filters, dict) and filters.get("translate_data"):
|
|
result = translate_report_data(result, has_total_row)
|
|
|
|
return {
|
|
"result": result,
|
|
"columns": columns,
|
|
"message": message,
|
|
"chart": chart,
|
|
"report_summary": report_summary,
|
|
"skip_total_row": skip_total_row or 0,
|
|
"status": None,
|
|
"execution_time": frappe.cache.hget("report_execution_time", report.name) or 0,
|
|
}
|
|
|
|
|
|
def normalize_result(result, columns):
|
|
# Convert to list of dicts from list of lists/tuples
|
|
data = []
|
|
column_names = [column["fieldname"] for column in columns]
|
|
if result and isinstance(result[0], list | tuple):
|
|
for row in result:
|
|
row_obj = {}
|
|
for idx, column_name in enumerate(column_names):
|
|
row_obj[column_name] = row[idx]
|
|
data.append(row_obj)
|
|
else:
|
|
data = result
|
|
|
|
return data
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_script(report_name: str):
|
|
report = get_report_doc(report_name)
|
|
module = report.module or frappe.db.get_value("DocType", report.ref_doctype, "module")
|
|
|
|
is_custom_module = frappe.get_cached_value("Module Def", module, "custom")
|
|
|
|
# custom modules are virtual modules those exists in DB but not in disk.
|
|
module_path = "" if is_custom_module else get_module_path(module)
|
|
report_folder = module_path and os.path.join(module_path, "report", scrub(report.name))
|
|
script_path = report_folder and os.path.join(report_folder, scrub(report.name) + ".js")
|
|
print_path = report_folder and os.path.join(report_folder, scrub(report.name) + ".html")
|
|
|
|
script = None
|
|
if os.path.exists(script_path):
|
|
with open(script_path) as f:
|
|
script = f.read()
|
|
script += f"\n\n//# sourceURL={scrub(report.name)}.js"
|
|
|
|
html_format = get_html_format(print_path)
|
|
|
|
if not script and report.javascript:
|
|
script = report.javascript
|
|
script += f"\n\n//# sourceURL={scrub(report.name)}__custom"
|
|
|
|
if not script:
|
|
script = "frappe.query_reports['{}']={{}}".format(report_name)
|
|
|
|
return {
|
|
"script": render_include(script),
|
|
"html_format": html_format,
|
|
"execution_time": frappe.cache.hget("report_execution_time", report_name) or 0,
|
|
"filters": report.filters,
|
|
"custom_report_name": report.name if report.get("is_custom_report") else None,
|
|
}
|
|
|
|
|
|
def get_reference_report(report):
|
|
if report.report_type != "Custom Report":
|
|
return report
|
|
reference_report = frappe.get_doc("Report", report.reference_report)
|
|
return get_reference_report(reference_report)
|
|
|
|
|
|
@frappe.whitelist()
|
|
@frappe.read_only()
|
|
def run(
|
|
report_name: str,
|
|
filters: str | dict | None = None,
|
|
user: str | None = None,
|
|
ignore_prepared_report: bool = False,
|
|
custom_columns: str | list | None = None,
|
|
is_tree: bool = False,
|
|
parent_field: str | None = None,
|
|
are_default_filters: bool = True,
|
|
js_filters: str | list | None = None,
|
|
) -> dict:
|
|
if not user:
|
|
user = frappe.session.user
|
|
validate_filters_permissions(report_name, filters, user, js_filters)
|
|
report = get_report_doc(report_name)
|
|
if not frappe.has_permission(report.ref_doctype, "report"):
|
|
frappe.msgprint(
|
|
_("Must have report permission to access this report."),
|
|
raise_exception=True,
|
|
)
|
|
|
|
result = None
|
|
|
|
if sbool(are_default_filters) and report.get("custom_filters"):
|
|
filters = report.custom_filters
|
|
|
|
is_prepared_report = report.prepared_report and not sbool(ignore_prepared_report) and not custom_columns
|
|
|
|
try:
|
|
if is_prepared_report:
|
|
if filters:
|
|
if isinstance(filters, str):
|
|
filters = json.loads(filters)
|
|
|
|
dn = filters.pop("prepared_report_name", None)
|
|
else:
|
|
dn = ""
|
|
result = get_prepared_report_result(report, filters, dn, user)
|
|
else:
|
|
result = generate_report_result(report, filters, user, custom_columns, is_tree, parent_field)
|
|
add_data_to_monitor(report=report.reference_report or report.name)
|
|
except Exception:
|
|
frappe.log_error("Report execution failed for: {}".format(report_name))
|
|
raise
|
|
|
|
result["add_total_row"] = report.add_total_row and not result.get("skip_total_row", False)
|
|
|
|
if sbool(are_default_filters) and report.get("custom_filters"):
|
|
result["custom_filters"] = report.custom_filters
|
|
|
|
return result
|
|
|
|
|
|
def add_custom_column_data(custom_columns, result):
|
|
doctype_names_from_custom_field = []
|
|
for column in custom_columns:
|
|
if len(column["fieldname"].split("-")) > 1:
|
|
# length greater than 1, means that the column is a custom field with conflicting fieldname
|
|
doctype_name = frappe.unscrub(column["fieldname"].split("-")[1])
|
|
doctype_names_from_custom_field.append(doctype_name)
|
|
column["fieldname"] = column["fieldname"].split("-")[0]
|
|
|
|
custom_column_data = get_data_for_custom_report(custom_columns, result)
|
|
|
|
for column in custom_columns:
|
|
key = (column.get("doctype"), column.get("fieldname"))
|
|
if key in custom_column_data:
|
|
for row in result:
|
|
link_field = column.get("link_field")
|
|
|
|
# backwards compatible `link_field`
|
|
# old custom reports which use `str` should not break.
|
|
if isinstance(link_field, str):
|
|
link_field = frappe._dict({"fieldname": link_field, "names": []})
|
|
|
|
row_reference = row.get(link_field.get("fieldname"))
|
|
# possible if the row is empty
|
|
if not row_reference:
|
|
continue
|
|
if key[0] in doctype_names_from_custom_field:
|
|
column["fieldname"] = column.get("id")
|
|
row[column.get("fieldname")] = custom_column_data.get(key).get(row_reference)
|
|
|
|
return result
|
|
|
|
|
|
def get_prepared_report_result(report, filters, dn="", user=None):
|
|
from frappe.core.doctype.prepared_report.prepared_report import get_completed_prepared_report
|
|
|
|
def get_report_data(doc, data):
|
|
# backwards compatibility - prepared report used to have a columns field,
|
|
# we now directly fetch it from the result file
|
|
if doc.get("columns") or isinstance(data, list):
|
|
columns = (doc.get("columns") and json.loads(doc.columns)) or data[0]
|
|
data = {"result": data}
|
|
else:
|
|
columns = data.get("columns")
|
|
|
|
for column in columns:
|
|
if isinstance(column, dict) and column.get("label"):
|
|
column["label"] = _(column["label"])
|
|
|
|
return data | {"columns": columns}
|
|
|
|
report_data = {}
|
|
if not dn:
|
|
dn = get_completed_prepared_report(
|
|
filters, user, report.get("custom_report") or report.get("report_name")
|
|
)
|
|
|
|
doc = frappe.get_doc("Prepared Report", dn) if dn else None
|
|
if doc:
|
|
try:
|
|
if data := json.loads(doc.get_prepared_data().decode("utf-8")):
|
|
report_data = get_report_data(doc, data)
|
|
except Exception as e:
|
|
doc.log_error("Prepared report render failed")
|
|
frappe.msgprint(_("Prepared report render failed") + f": {e!s}")
|
|
doc = None
|
|
|
|
return report_data | {"prepared_report": True, "doc": doc}
|
|
|
|
|
|
@frappe.whitelist()
|
|
def export_query():
|
|
"""export from query reports"""
|
|
from frappe.desk.utils import pop_csv_params
|
|
|
|
form_params = frappe._dict(frappe.local.form_dict)
|
|
csv_params = pop_csv_params(form_params)
|
|
clean_params(form_params)
|
|
parse_json(form_params)
|
|
report_name = form_params.report_name
|
|
frappe.permissions.can_export(
|
|
frappe.get_cached_value("Report", report_name, "ref_doctype"),
|
|
raise_exception=True,
|
|
)
|
|
|
|
export_in_background = int(form_params.export_in_background or 0)
|
|
if export_in_background:
|
|
user = frappe.session.user
|
|
user_email = frappe.get_cached_value("User", user, "email")
|
|
frappe.enqueue(
|
|
"frappe.desk.query_report.run_export_query_job",
|
|
user_email=user_email,
|
|
form_params=form_params,
|
|
csv_params=csv_params,
|
|
queue="long",
|
|
now=frappe.flags.in_test,
|
|
)
|
|
frappe.msgprint(
|
|
_(
|
|
"Your report is being generated in the background. You will receive an email on {0} with a download link once it is ready."
|
|
).format(user_email)
|
|
)
|
|
return
|
|
|
|
return _export_query(form_params, csv_params)
|
|
|
|
|
|
def run_export_query_job(user_email: str, form_params, csv_params):
|
|
from frappe.desk.utils import send_report_email
|
|
|
|
report_name, file_extension, content = _export_query(form_params, csv_params, populate_response=False)
|
|
send_report_email(
|
|
user_email, report_name, file_extension, content, attached_to_name=form_params.report_name
|
|
)
|
|
|
|
|
|
def _export_query(form_params, csv_params, populate_response=True):
|
|
from frappe.desk.utils import get_csv_bytes, provide_binary_file
|
|
|
|
report_name = form_params.report_name
|
|
file_format_type = form_params.file_format_type
|
|
custom_columns = frappe.parse_json(form_params.custom_columns or "[]")
|
|
include_indentation = form_params.include_indentation
|
|
include_filters = form_params.include_filters
|
|
visible_idx = form_params.visible_idx or [] # excluding total row idx
|
|
ignore_visible_idx = sbool(form_params.get("ignore_visible_idx"))
|
|
include_hidden_columns = form_params.include_hidden_columns
|
|
|
|
if isinstance(visible_idx, str):
|
|
visible_idx = json.loads(visible_idx)
|
|
|
|
data = run(
|
|
report_name,
|
|
form_params.filters,
|
|
custom_columns=custom_columns,
|
|
are_default_filters=False,
|
|
)
|
|
|
|
data = frappe._dict(data)
|
|
|
|
data.report_name = report_name
|
|
data.filters = form_params.filters
|
|
data.applied_filters = form_params.applied_filters
|
|
|
|
if not data.columns:
|
|
frappe.respond_as_web_page(
|
|
_("No data to export"),
|
|
_("You can try changing the filters of your report."),
|
|
)
|
|
return
|
|
|
|
has_total_row = cint(data.get("add_total_row"))
|
|
needs_visible_filtering = (
|
|
visible_idx
|
|
and not ignore_visible_idx
|
|
and len(visible_idx) < len(data.result) - (1 if has_total_row else 0)
|
|
)
|
|
|
|
if needs_visible_filtering:
|
|
visible_idx = set(visible_idx)
|
|
filtered_result = [row for idx, row in enumerate(data.result) if idx in visible_idx]
|
|
|
|
if has_total_row:
|
|
filtered_result = add_total_row(filtered_result, data.columns)
|
|
|
|
data["result"] = filtered_result
|
|
|
|
format_fields(data)
|
|
|
|
xlsx_data, column_widths, styles = build_xlsx_data(
|
|
data,
|
|
include_indentation=include_indentation,
|
|
include_filters=include_filters,
|
|
include_hidden_columns=include_hidden_columns,
|
|
build_styles=file_format_type == "Excel",
|
|
)
|
|
|
|
if file_format_type == "CSV":
|
|
file_extension = "csv"
|
|
content = get_csv_bytes(
|
|
[[handle_html(v) if isinstance(v, str) else v for v in r] for r in xlsx_data],
|
|
csv_params,
|
|
)
|
|
elif file_format_type == "Excel":
|
|
file_extension = "xlsx"
|
|
content = make_xlsx(
|
|
xlsx_data,
|
|
report_name,
|
|
column_widths=column_widths,
|
|
styles=styles,
|
|
).getvalue()
|
|
else:
|
|
frappe.throw(
|
|
title=_("Unsupported file format: {0}").format(file_format_type),
|
|
msg=_("Only CSV and Excel formats are supported for export"),
|
|
)
|
|
|
|
if include_filters:
|
|
for value in (data.filters or {}).values():
|
|
suffix = ""
|
|
if isinstance(value, list):
|
|
suffix = "_" + ",".join(value)
|
|
elif isinstance(value, str) and value not in {"Yes", "No"}:
|
|
suffix = f"_{value}"
|
|
|
|
if valid_report_name(report_name, suffix):
|
|
report_name += suffix
|
|
|
|
if not populate_response:
|
|
return report_name, file_extension, content
|
|
|
|
provide_binary_file(_(report_name), file_extension, content)
|
|
|
|
|
|
def valid_report_name(report_name, suffix):
|
|
if len(report_name) + len(suffix) < 200:
|
|
return True
|
|
return False
|
|
|
|
|
|
def format_fields(data: frappe._dict) -> None:
|
|
for i, col in enumerate(data.columns):
|
|
if col.get("fieldtype") == "Duration":
|
|
for row in data.result:
|
|
index = col.get("fieldname") if isinstance(row, dict) else i
|
|
val = row.get(index) if isinstance(row, dict) else row[index]
|
|
if val:
|
|
row[index] = format_duration(val)
|
|
elif col.get("fieldtype") == "Currency" and col.get("precision"):
|
|
for row in data.result:
|
|
index = col.get("fieldname") if isinstance(row, dict) else i
|
|
val = row.get(index) if isinstance(row, dict) else row[index]
|
|
if val:
|
|
row[index] = round(val, col.get("precision"))
|
|
elif col.get("fieldtype") == "Date":
|
|
for row in data.result:
|
|
index = col.get("fieldname") if isinstance(row, dict) else i
|
|
val = row.get(index) if isinstance(row, dict) else row[index]
|
|
if val:
|
|
row[index] = formatdate(val)
|
|
elif col.get("fieldtype") == "Datetime":
|
|
for row in data.result:
|
|
index = col.get("fieldname") if isinstance(row, dict) else i
|
|
val = row.get(index) if isinstance(row, dict) else row[index]
|
|
if val:
|
|
row[index] = format_datetime(val)
|
|
|
|
|
|
def format_filter_value(value):
|
|
return ", ".join([cstr(x) for x in value]) if isinstance(value, list) else cstr(value)
|
|
|
|
|
|
def build_xlsx_data(
|
|
data: frappe._dict,
|
|
visible_idx: list[int] | None = None,
|
|
include_indentation: bool = False,
|
|
include_filters: bool = False,
|
|
ignore_visible_idx: bool = False,
|
|
include_hidden_columns: bool = False,
|
|
*,
|
|
build_styles: bool = False,
|
|
) -> tuple[list[list[Any]], list[int], dict | None]:
|
|
"""
|
|
Build Excel data structure from report data with proper formatting.
|
|
|
|
Args:
|
|
data: Report data containing columns, result, filters, applied_filters, report_name etc.
|
|
visible_idx: Deprecated (v17). Row indices to include.
|
|
include_indentation: Whether to include indentation for tree-like data
|
|
include_filters: Whether to include filter rows at the top of the Excel sheet
|
|
ignore_visible_idx: Deprecated (v17). Skips visible_idx filtering.
|
|
include_hidden_columns: Whether to include columns marked as hidden
|
|
build_styles: Whether to build style metadata for Excel formatting
|
|
|
|
Returns:
|
|
tuple: A tuple containing:
|
|
- result: List of rows for the Excel sheet
|
|
- column_widths: List of column widths for the Excel sheet
|
|
- styles: Dictionary of styles for Excel formatting (if applicable)
|
|
"""
|
|
metadata = None
|
|
|
|
EXCEL_TYPES = (
|
|
str,
|
|
bool,
|
|
type(None),
|
|
int,
|
|
float,
|
|
datetime.datetime,
|
|
datetime.date,
|
|
datetime.time,
|
|
datetime.timedelta,
|
|
)
|
|
if visible_idx or ignore_visible_idx:
|
|
from frappe.deprecation_dumpster import deprecation_warning
|
|
|
|
deprecation_warning(
|
|
"2026-04-19",
|
|
"v17",
|
|
"The 'visible_idx' and 'ignore_visible_idx' parameters of build_xlsx_data are deprecated. "
|
|
"Filter data.result before calling build_xlsx_data instead.",
|
|
)
|
|
|
|
# NOTE: for backwards compatibility. remove in v17.
|
|
if not visible_idx or len(visible_idx) == len(data.result):
|
|
# It's not possible to have same length and different content.
|
|
ignore_visible_idx = True
|
|
else:
|
|
# Note: converted for faster lookups
|
|
visible_idx = set(visible_idx)
|
|
|
|
result = []
|
|
column_data = []
|
|
column_widths = []
|
|
|
|
excel_row_idx = 0
|
|
|
|
include_filters = cint(include_filters)
|
|
include_indentation = cint(include_indentation)
|
|
include_hidden_columns = cint(include_hidden_columns)
|
|
has_total_row = sbool(data.get("add_total_row"))
|
|
|
|
if build_styles:
|
|
metadata = XLSXMetadata(
|
|
report_name=data.report_name,
|
|
filters=data.filters,
|
|
has_total_row=has_total_row,
|
|
has_indentation=include_indentation,
|
|
)
|
|
|
|
# adding applied filter rows
|
|
if include_filters and data.applied_filters:
|
|
filter_data = []
|
|
for filter_name, filter_value in data.applied_filters.items():
|
|
if not filter_value:
|
|
continue
|
|
|
|
applied_filter = [cstr(filter_name), format_filter_value(filter_value)]
|
|
|
|
if build_styles:
|
|
metadata.applied_filters_map[excel_row_idx] = applied_filter
|
|
excel_row_idx += 1
|
|
|
|
filter_data.append(applied_filter)
|
|
|
|
# empty row after filters
|
|
filter_data.append([])
|
|
excel_row_idx += 1
|
|
result += filter_data
|
|
|
|
# adding header row
|
|
column_idx = 0
|
|
for column in data.columns:
|
|
if column.get("hidden") and not include_hidden_columns:
|
|
continue
|
|
|
|
if build_styles:
|
|
metadata.column_map[column_idx] = column
|
|
column_idx += 1
|
|
|
|
column_data.append(_(column.get("label")))
|
|
column_width = cint(column.get("width", 0))
|
|
# to convert into scale accepted by xlsxwriter
|
|
column_width /= 10
|
|
column_widths.append(column_width)
|
|
|
|
result.append(column_data)
|
|
excel_row_idx += 1
|
|
|
|
# build table from result
|
|
handle_indentation = include_indentation and not build_styles
|
|
for row_idx, row in enumerate(data.result):
|
|
# NOTE: for backwards compatibility. remove in v17.
|
|
if not (ignore_visible_idx or row_idx in visible_idx):
|
|
continue
|
|
|
|
row_data = []
|
|
row_is_dict = isinstance(row, dict)
|
|
|
|
indent = 0
|
|
if row_is_dict and handle_indentation:
|
|
indent = row.get("indent") or 0
|
|
if indent:
|
|
indent = cint(indent)
|
|
|
|
if build_styles:
|
|
metadata.row_map[excel_row_idx] = row
|
|
excel_row_idx += 1
|
|
|
|
for col_idx, column in enumerate(data.columns):
|
|
if column.get("hidden") and not include_hidden_columns:
|
|
continue
|
|
|
|
label = column.get("label")
|
|
fieldname = column.get("fieldname")
|
|
cell_value = row.get(fieldname, row.get(label, "")) if row_is_dict else row[col_idx]
|
|
|
|
if not isinstance(cell_value, EXCEL_TYPES):
|
|
cell_value = cstr(cell_value)
|
|
|
|
if handle_indentation and indent and col_idx == 0:
|
|
cell_value = (" " * indent) + cstr(cell_value)
|
|
|
|
row_data.append(cell_value)
|
|
|
|
result.append(row_data)
|
|
|
|
return result, column_widths, get_xlsx_styles(metadata, data.report_name) if build_styles else None
|
|
|
|
|
|
def get_xlsx_styles(metadata: XLSXMetadata, report_name: str | None = None) -> dict | None:
|
|
"""
|
|
Returns styles for XLSX export.
|
|
|
|
If report_name is provided, it tries to fetch styles defined in the report's module.
|
|
"""
|
|
styles = None
|
|
if report_name:
|
|
report = frappe.get_doc("Report", report_name)
|
|
styles = report.get_xlsx_styles_from_module(metadata)
|
|
|
|
if not styles:
|
|
styles = XLSXStyleBuilder(metadata).result
|
|
|
|
return styles
|
|
|
|
|
|
def add_total_row(
|
|
result,
|
|
columns,
|
|
meta=None,
|
|
is_tree=False,
|
|
parent_field=None,
|
|
) -> list[dict | list[Any]]:
|
|
total_row = [""] * len(columns)
|
|
has_percent = []
|
|
|
|
# all rows are dict or list/tuple, we can check the first row to decide the type
|
|
is_row_dict = isinstance(result[0], dict) if result else False
|
|
|
|
for col_idx, col in enumerate(columns):
|
|
fieldtype, options, fieldname = None, None, None
|
|
if isinstance(col, str):
|
|
if meta:
|
|
# get fieldtype from the meta
|
|
field = meta.get_field(col)
|
|
if field:
|
|
fieldtype = meta.get_field(col).fieldtype
|
|
fieldname = meta.get_field(col).fieldname
|
|
else:
|
|
col = col.split(":")
|
|
if len(col) > 1:
|
|
if col[1]:
|
|
fieldtype = col[1]
|
|
if "/" in fieldtype:
|
|
fieldtype, options = fieldtype.split("/")
|
|
else:
|
|
fieldtype = "Data"
|
|
else:
|
|
fieldtype = col.get("fieldtype")
|
|
fieldname = col.get("fieldname")
|
|
options = col.get("options")
|
|
|
|
for row in result:
|
|
# Skip if column index is out of bounds for list/tuple rows
|
|
if not is_row_dict and col_idx >= len(row):
|
|
continue
|
|
|
|
cell = row.get(fieldname) if is_row_dict else row[col_idx]
|
|
if fieldtype is None:
|
|
if isinstance(cell, int):
|
|
fieldtype = "Int"
|
|
elif isinstance(cell, float):
|
|
fieldtype = "Float"
|
|
if fieldtype in ["Currency", "Int", "Float", "Percent", "Duration"] and flt(cell):
|
|
if not (is_tree and row.get(parent_field)):
|
|
total_row[col_idx] = flt(total_row[col_idx]) + flt(cell)
|
|
|
|
if fieldtype == "Percent" and col_idx not in has_percent:
|
|
has_percent.append(col_idx)
|
|
|
|
if fieldtype == "Time" and cell:
|
|
if not total_row[col_idx]:
|
|
total_row[col_idx] = timedelta(hours=0, minutes=0, seconds=0)
|
|
total_row[col_idx] = total_row[col_idx] + cell
|
|
|
|
if fieldtype == "Link" and options == "Currency":
|
|
total_row[col_idx] = result[0].get(fieldname) if is_row_dict else result[0][col_idx]
|
|
|
|
for col_idx in has_percent:
|
|
total_row[col_idx] = flt(total_row[col_idx]) / len(result)
|
|
|
|
first_col_fieldtype = None
|
|
if isinstance(columns[0], str):
|
|
first_col = columns[0].split(":")
|
|
if len(first_col) > 1:
|
|
first_col_fieldtype = first_col[1].split("/", 1)[0]
|
|
else:
|
|
first_col_fieldtype = columns[0].get("fieldtype")
|
|
|
|
if first_col_fieldtype not in ["Currency", "Int", "Float", "Percent", "Date", "Datetime", "Time"]:
|
|
total_row[0] = _("Total")
|
|
|
|
result.append(total_row)
|
|
return result
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_data_for_custom_field(doctype: str, field: str, names: str | list[str] | None = None):
|
|
if not frappe.has_permission(doctype, "read"):
|
|
frappe.throw(_("Not Permitted to read {0}").format(_(doctype)), frappe.PermissionError)
|
|
|
|
filters = {}
|
|
if names:
|
|
if isinstance(names, str | bytearray):
|
|
names = frappe.json.loads(names)
|
|
filters.update({"name": ["in", names]})
|
|
|
|
return frappe._dict(frappe.get_list(doctype, filters=filters, fields=["name", field], as_list=1))
|
|
|
|
|
|
def get_data_for_custom_report(columns, result):
|
|
doc_field_value_map = {}
|
|
|
|
for column in columns:
|
|
if link_field := column.get("link_field"):
|
|
# backwards compatible `link_field`
|
|
# old custom reports which use `str` should not break
|
|
if isinstance(link_field, str):
|
|
link_field = frappe._dict({"fieldname": link_field, "names": []})
|
|
|
|
fieldname = column.get("fieldname")
|
|
doctype = column.get("doctype")
|
|
|
|
row_key = link_field.get("fieldname")
|
|
names = []
|
|
for row in result:
|
|
if row.get(row_key):
|
|
names.append(row.get(row_key))
|
|
names = list(set(names))
|
|
|
|
if names:
|
|
doc_field_value_map[(doctype, fieldname)] = get_data_for_custom_field(
|
|
doctype, fieldname, names
|
|
)
|
|
return doc_field_value_map
|
|
|
|
|
|
@frappe.whitelist()
|
|
def save_report(reference_report: str, report_name: str, columns: str, filters: str):
|
|
report_doc = get_report_doc(reference_report)
|
|
|
|
docname = frappe.db.exists(
|
|
"Report",
|
|
{
|
|
"report_name": report_name,
|
|
"is_standard": "No",
|
|
"report_type": "Custom Report",
|
|
},
|
|
)
|
|
|
|
if docname:
|
|
report = frappe.get_doc("Report", docname)
|
|
existing_jd = json.loads(report.json)
|
|
existing_jd["columns"] = json.loads(columns)
|
|
existing_jd["filters"] = json.loads(filters)
|
|
report.update({"json": json.dumps(existing_jd, separators=(",", ":"))})
|
|
report.save()
|
|
frappe.msgprint(_("Report updated successfully"))
|
|
|
|
return docname
|
|
else:
|
|
new_report = frappe.get_doc(
|
|
{
|
|
"doctype": "Report",
|
|
"report_name": report_name,
|
|
"json": f'{{"columns":{columns},"filters":{filters}}}',
|
|
"ref_doctype": report_doc.ref_doctype,
|
|
"is_standard": "No",
|
|
"report_type": "Custom Report",
|
|
"reference_report": reference_report,
|
|
}
|
|
).insert(ignore_permissions=True)
|
|
frappe.msgprint(_("{0} saved successfully").format(_(new_report.name)))
|
|
return new_report.name
|
|
|
|
|
|
def get_filtered_data(ref_doctype, columns, data, user):
|
|
result = []
|
|
linked_doctypes = get_linked_doctypes(columns, data)
|
|
match_filters_per_doctype = get_user_match_filters(linked_doctypes, user=user)
|
|
shared = frappe.share.get_shared(ref_doctype, user)
|
|
columns_dict = get_columns_dict(columns)
|
|
|
|
ref_doctype_meta = frappe.get_meta(ref_doctype)
|
|
|
|
role_permissions = get_role_permissions(ref_doctype_meta, user)
|
|
if_owner = role_permissions.get("if_owner", {}).get("report")
|
|
|
|
if ref_doctype_meta.get_masked_fields():
|
|
from frappe.model.db_query import mask_field_value
|
|
|
|
# Apply masking to the fields
|
|
for field in ref_doctype_meta.get_masked_fields():
|
|
for row in data:
|
|
val = row.get(field.fieldname)
|
|
row[field.fieldname] = mask_field_value(field, val)
|
|
|
|
if match_filters_per_doctype:
|
|
for row in data:
|
|
# Why linked_doctypes.get(ref_doctype)? because if column is empty, linked_doctypes[ref_doctype] is removed
|
|
if (
|
|
linked_doctypes.get(ref_doctype)
|
|
and shared
|
|
and row.get(linked_doctypes[ref_doctype]) in shared
|
|
):
|
|
result.append(row)
|
|
|
|
elif has_match(
|
|
row,
|
|
linked_doctypes,
|
|
match_filters_per_doctype,
|
|
ref_doctype,
|
|
if_owner,
|
|
columns_dict,
|
|
user,
|
|
):
|
|
result.append(row)
|
|
else:
|
|
result = list(data)
|
|
|
|
return result
|
|
|
|
|
|
def has_match(
|
|
row,
|
|
linked_doctypes,
|
|
doctype_match_filters,
|
|
ref_doctype,
|
|
if_owner,
|
|
columns_dict,
|
|
user,
|
|
):
|
|
"""Return True if after evaluating permissions for each linked doctype:
|
|
- There is an owner match for the ref_doctype
|
|
- `and` There is a user permission match for all linked doctypes
|
|
|
|
Return True if the row is empty.
|
|
|
|
Note:
|
|
Each doctype could have multiple conflicting user permission doctypes.
|
|
Hence even if one of the sets allows a match, it is true.
|
|
This behavior is equivalent to the trickling of user permissions of linked doctypes to the ref doctype.
|
|
"""
|
|
resultant_match = True
|
|
|
|
if not row:
|
|
# allow empty rows :)
|
|
return resultant_match
|
|
|
|
for doctype, filter_list in doctype_match_filters.items():
|
|
matched_for_doctype = False
|
|
|
|
if doctype == ref_doctype and if_owner:
|
|
idx = linked_doctypes.get("User")
|
|
if idx is not None and row[idx] == user and columns_dict[idx] == columns_dict.get("owner"):
|
|
# owner match is true
|
|
matched_for_doctype = True
|
|
|
|
if not matched_for_doctype:
|
|
for match_filters in filter_list:
|
|
match = True
|
|
for dt, idx in linked_doctypes.items():
|
|
# case handled above
|
|
if dt == "User" and columns_dict[idx] == columns_dict.get("owner"):
|
|
continue
|
|
|
|
cell_value = None
|
|
if isinstance(row, dict):
|
|
cell_value = row.get(idx)
|
|
elif isinstance(row, list | tuple):
|
|
cell_value = row[idx]
|
|
|
|
if (
|
|
dt in match_filters
|
|
and cell_value not in match_filters.get(dt)
|
|
and frappe.db.exists(dt, cell_value)
|
|
):
|
|
match = False
|
|
break
|
|
|
|
if match:
|
|
match = has_unrestricted_read_access(doctype=ref_doctype, user=frappe.session.user)
|
|
|
|
# each doctype could have multiple conflicting user permission doctypes, hence using OR
|
|
# so that even if one of the sets allows a match, it is true
|
|
matched_for_doctype = matched_for_doctype or match
|
|
|
|
if matched_for_doctype:
|
|
break
|
|
|
|
# each doctype's user permissions should match the row! hence using AND
|
|
resultant_match = resultant_match and matched_for_doctype
|
|
|
|
if not resultant_match:
|
|
break
|
|
|
|
return resultant_match
|
|
|
|
|
|
@request_cache
|
|
def has_unrestricted_read_access(doctype, user):
|
|
roles = get_roles(user)
|
|
|
|
permission_filters = {
|
|
"parent": doctype,
|
|
"role": ["in", roles],
|
|
"permlevel": 0,
|
|
"read": 1,
|
|
"if_owner": 0,
|
|
}
|
|
|
|
standard_perm_exists = frappe.db.exists(
|
|
"DocPerm",
|
|
permission_filters,
|
|
)
|
|
|
|
custom_perm_exists = frappe.db.exists(
|
|
"Custom DocPerm",
|
|
permission_filters,
|
|
)
|
|
|
|
has_perm = bool(custom_perm_exists or standard_perm_exists)
|
|
return has_perm
|
|
|
|
|
|
def get_linked_doctypes(columns, data):
|
|
linked_doctypes = {}
|
|
|
|
columns_dict = get_columns_dict(columns)
|
|
|
|
for idx in range(len(columns)):
|
|
df = columns_dict[idx]
|
|
if df.get("fieldtype") == "Link":
|
|
if data and isinstance(data[0], list | tuple):
|
|
linked_doctypes[df["options"]] = idx
|
|
else:
|
|
# dict
|
|
linked_doctypes[df["options"]] = df["fieldname"]
|
|
|
|
# remove doctype if column is empty
|
|
columns_with_value = []
|
|
for row in data:
|
|
if row:
|
|
if len(row) != len(columns_with_value):
|
|
if isinstance(row, list | tuple):
|
|
row = enumerate(row)
|
|
elif isinstance(row, dict):
|
|
row = row.items()
|
|
|
|
for col, val in row:
|
|
if val and col not in columns_with_value:
|
|
columns_with_value.append(col)
|
|
|
|
items = list(linked_doctypes.items())
|
|
|
|
for doctype, key in items:
|
|
if key not in columns_with_value:
|
|
del linked_doctypes[doctype]
|
|
|
|
return linked_doctypes
|
|
|
|
|
|
def get_columns_dict(columns):
|
|
"""Return a dict with column docfield values as dict.
|
|
|
|
The keys for the dict are both idx and fieldname,
|
|
so either index or fieldname can be used to search for a column's docfield properties.
|
|
"""
|
|
columns_dict = frappe._dict()
|
|
for idx, col in enumerate(columns):
|
|
col_dict = get_column_as_dict(col)
|
|
columns_dict[idx] = col_dict
|
|
columns_dict[col_dict["fieldname"]] = col_dict
|
|
|
|
return columns_dict
|
|
|
|
|
|
def get_column_as_dict(col):
|
|
col_dict = frappe._dict()
|
|
|
|
# string
|
|
if isinstance(col, str):
|
|
col = col.split(":")
|
|
if len(col) > 1:
|
|
if "/" in col[1]:
|
|
col_dict["fieldtype"], col_dict["options"] = col[1].split("/")
|
|
else:
|
|
col_dict["fieldtype"] = col[1]
|
|
if len(col) == 3:
|
|
col_dict["width"] = col[2]
|
|
|
|
col_dict["label"] = col[0]
|
|
col_dict["fieldname"] = frappe.scrub(col[0])
|
|
|
|
# dict
|
|
else:
|
|
col_dict.update(col)
|
|
if "fieldname" not in col_dict:
|
|
col_dict["fieldname"] = frappe.scrub(col_dict["label"])
|
|
|
|
return col_dict
|
|
|
|
|
|
def get_user_match_filters(doctypes, user):
|
|
match_filters = {}
|
|
|
|
for dt in doctypes:
|
|
filter_list = frappe.desk.reportview.build_match_conditions(dt, user, False)
|
|
if filter_list:
|
|
match_filters[dt] = filter_list
|
|
|
|
return match_filters
|
|
|
|
|
|
def validate_filters_permissions(report_name, filters=None, user=None, js_filters=None):
|
|
if not filters:
|
|
return
|
|
|
|
if js_filters is None:
|
|
js_filters = []
|
|
|
|
if isinstance(js_filters, str):
|
|
js_filters = json.loads(js_filters)
|
|
|
|
if isinstance(filters, str):
|
|
filters = json.loads(filters)
|
|
|
|
report = frappe.get_doc("Report", report_name)
|
|
|
|
for field in report.filters + js_filters:
|
|
if hasattr(field, "as_dict"):
|
|
field = field.as_dict()
|
|
if field.get("fieldname") in filters and field.get("fieldtype") == "Link":
|
|
linked_doctype = field.get("options")
|
|
if not has_permission(
|
|
doctype=linked_doctype, ptype="read", doc=filters[field.get("fieldname")], user=user
|
|
) and not has_permission(
|
|
doctype=linked_doctype, ptype="select", doc=filters[field.get("fieldname")], user=user
|
|
):
|
|
frappe.throw(
|
|
_("You do not have permission to access {0}: {1}.").format(
|
|
linked_doctype, filters[field.get("fieldname")]
|
|
)
|
|
)
|
|
|
|
|
|
def translate_report_data(data, total_row: bool):
|
|
for d in data[:-1] if total_row else data:
|
|
for field, value in d.items():
|
|
if isinstance(value, str):
|
|
d[field] = _(value)
|
|
return data
|