319 lines
8.7 KiB
Python
319 lines
8.7 KiB
Python
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
import datetime
|
|
import decimal
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import sys
|
|
from typing import TYPE_CHECKING, Literal
|
|
from urllib.parse import quote
|
|
|
|
import werkzeug.utils
|
|
from werkzeug.exceptions import Forbidden, NotFound
|
|
from werkzeug.local import LocalProxy
|
|
from werkzeug.wrappers import Response
|
|
from werkzeug.wsgi import wrap_file
|
|
|
|
import frappe
|
|
import frappe.model.document
|
|
import frappe.sessions
|
|
import frappe.utils
|
|
from frappe import _
|
|
from frappe.core.doctype.access_log.access_log import make_access_log
|
|
from frappe.utils import format_timedelta
|
|
|
|
if TYPE_CHECKING:
|
|
from frappe.core.doctype.file.file import File
|
|
|
|
|
|
def report_error(status_code):
|
|
"""Build error. Show traceback in developer mode"""
|
|
from frappe.api import ApiVersion, get_api_version
|
|
|
|
allow_traceback = (
|
|
(frappe.get_system_settings("allow_error_traceback") if frappe.db else False)
|
|
and not frappe.local.flags.disable_traceback
|
|
and (status_code != 404 or frappe.conf.logging)
|
|
)
|
|
|
|
traceback = frappe.utils.get_traceback()
|
|
exc_type, exc_value, _ = sys.exc_info()
|
|
|
|
match get_api_version():
|
|
case ApiVersion.V1:
|
|
if allow_traceback:
|
|
frappe.errprint(traceback)
|
|
frappe.response.exception = traceback.splitlines()[-1]
|
|
frappe.response["exc_type"] = exc_type.__name__
|
|
case ApiVersion.V2:
|
|
error_log = {"type": exc_type.__name__}
|
|
if allow_traceback:
|
|
error_log["exception"] = traceback
|
|
_link_error_with_message_log(error_log, exc_value, frappe.message_log)
|
|
frappe.local.response.errors = [error_log]
|
|
|
|
response = build_response("json")
|
|
response.status_code = status_code
|
|
return response
|
|
|
|
|
|
def _link_error_with_message_log(error_log, exception, message_logs):
|
|
for message in message_logs:
|
|
if message.get("__frappe_exc_id") == exception.__frappe_exc_id:
|
|
error_log.update(message)
|
|
message_logs.remove(message)
|
|
error_log.pop("raise_exception", None)
|
|
error_log.pop("__frappe_exc_id", None)
|
|
return
|
|
|
|
|
|
def build_response(response_type=None):
|
|
if "docs" in frappe.local.response and not frappe.local.response.docs:
|
|
del frappe.local.response["docs"]
|
|
|
|
response_type_map = {
|
|
"csv": as_csv,
|
|
"txt": as_txt,
|
|
"download": as_raw,
|
|
"json": as_json,
|
|
"pdf": as_pdf,
|
|
"page": as_page,
|
|
"redirect": redirect,
|
|
"binary": as_binary,
|
|
}
|
|
|
|
return response_type_map[frappe.response.get("type") or response_type]()
|
|
|
|
|
|
def as_csv():
|
|
response = Response()
|
|
response.mimetype = "text/csv"
|
|
filename = f"{frappe.response['doctype']}.csv"
|
|
response.headers.add("Content-Disposition", "attachment", filename=filename)
|
|
response.data = frappe.response["result"]
|
|
return response
|
|
|
|
|
|
def as_txt():
|
|
response = Response()
|
|
response.mimetype = "text"
|
|
filename = f"{frappe.response['doctype']}.txt"
|
|
response.headers.add("Content-Disposition", "attachment", filename=filename)
|
|
response.data = frappe.response["result"]
|
|
return response
|
|
|
|
|
|
def as_raw():
|
|
response = Response()
|
|
response.mimetype = (
|
|
frappe.response.get("content_type")
|
|
or mimetypes.guess_type(frappe.response["filename"])[0]
|
|
or "application/unknown"
|
|
)
|
|
response.headers.add(
|
|
"Content-Disposition",
|
|
frappe.response.get("display_content_as", "attachment"),
|
|
filename=frappe.response["filename"],
|
|
)
|
|
response.data = frappe.response["filecontent"]
|
|
return response
|
|
|
|
|
|
def as_json():
|
|
make_logs()
|
|
|
|
response = Response()
|
|
if frappe.local.response.http_status_code:
|
|
response.status_code = frappe.local.response["http_status_code"]
|
|
del frappe.local.response["http_status_code"]
|
|
|
|
response.mimetype = "application/json"
|
|
response.data = json.dumps(frappe.local.response, default=json_handler, separators=(",", ":"))
|
|
return response
|
|
|
|
|
|
def as_pdf():
|
|
response = Response()
|
|
response.mimetype = "application/pdf"
|
|
response.headers.add("Content-Disposition", None, filename=frappe.response["filename"])
|
|
response.data = frappe.response["filecontent"]
|
|
return response
|
|
|
|
|
|
def as_binary():
|
|
response = Response()
|
|
response.mimetype = "application/octet-stream"
|
|
response.headers.add("Content-Disposition", None, filename=frappe.response["filename"])
|
|
response.data = frappe.response["filecontent"]
|
|
return response
|
|
|
|
|
|
def make_logs():
|
|
"""make strings for msgprint and errprint"""
|
|
|
|
from frappe.api import ApiVersion, get_api_version
|
|
|
|
match get_api_version():
|
|
case ApiVersion.V1:
|
|
_make_logs_v1()
|
|
case ApiVersion.V2:
|
|
_make_logs_v2()
|
|
|
|
|
|
def _make_logs_v1():
|
|
from frappe.utils.error import guess_exception_source
|
|
|
|
response = frappe.local.response
|
|
|
|
if frappe.error_log:
|
|
if source := guess_exception_source(frappe.local.error_log and frappe.local.error_log[0]["exc"]):
|
|
response["_exc_source"] = source
|
|
response["exc"] = json.dumps([frappe.utils.cstr(d["exc"]) for d in frappe.local.error_log])
|
|
|
|
if frappe.local.message_log:
|
|
response["_server_messages"] = json.dumps([json.dumps(d) for d in frappe.local.message_log])
|
|
|
|
if frappe.debug_log and frappe.conf.get("logging"):
|
|
response["_debug_messages"] = json.dumps(frappe.local.debug_log)
|
|
|
|
if frappe.flags.error_message:
|
|
response["_error_message"] = frappe.flags.error_message
|
|
|
|
|
|
def _make_logs_v2():
|
|
response = frappe.local.response
|
|
|
|
if frappe.local.message_log:
|
|
response["messages"] = frappe.local.message_log
|
|
|
|
if frappe.debug_log and frappe.conf.get("logging"):
|
|
response["debug"] = [{"message": m} for m in frappe.local.debug_log]
|
|
|
|
|
|
def json_handler(obj):
|
|
"""serialize non-serializable data for json"""
|
|
from collections.abc import Iterable
|
|
from re import Match
|
|
|
|
if isinstance(obj, (datetime.date, datetime.datetime, datetime.time)):
|
|
return str(obj)
|
|
|
|
elif isinstance(obj, datetime.timedelta):
|
|
return format_timedelta(obj)
|
|
|
|
elif isinstance(obj, decimal.Decimal):
|
|
return float(obj)
|
|
|
|
elif isinstance(obj, LocalProxy):
|
|
return str(obj)
|
|
|
|
elif isinstance(obj, frappe.model.document.BaseDocument):
|
|
return obj.as_dict(no_nulls=True)
|
|
elif isinstance(obj, Iterable):
|
|
return list(obj)
|
|
|
|
elif isinstance(obj, Match):
|
|
return obj.string
|
|
|
|
elif type(obj) == type or isinstance(obj, Exception):
|
|
return repr(obj)
|
|
|
|
elif callable(obj):
|
|
return repr(obj)
|
|
|
|
else:
|
|
raise TypeError(
|
|
f"""Object of type {type(obj)} with value of {repr(obj)} is not JSON serializable"""
|
|
)
|
|
|
|
|
|
def as_page():
|
|
"""print web page"""
|
|
from frappe.website.serve import get_response
|
|
|
|
return get_response(
|
|
frappe.response["route"], http_status_code=frappe.response.get("http_status_code")
|
|
)
|
|
|
|
|
|
def redirect():
|
|
return werkzeug.utils.redirect(frappe.response.location)
|
|
|
|
|
|
def download_backup(path):
|
|
try:
|
|
frappe.only_for(("System Manager", "Administrator"))
|
|
make_access_log(report_name="Backup")
|
|
except frappe.PermissionError:
|
|
raise Forbidden(
|
|
_("You need to be logged in and have System Manager Role to be able to access backups.")
|
|
)
|
|
|
|
return send_private_file(path)
|
|
|
|
|
|
def download_private_file(path: str) -> Response:
|
|
"""Checks permissions and sends back private file"""
|
|
|
|
files = frappe.get_all("File", filters={"file_url": path}, fields="*")
|
|
# this file might be attached to multiple documents
|
|
# if the file is accessible from any one of those documents
|
|
# then it should be downloadable
|
|
for file_data in files:
|
|
file: "File" = frappe.get_doc(doctype="File", **file_data)
|
|
if file.is_downloadable():
|
|
break
|
|
|
|
else:
|
|
raise Forbidden(_("You don't have permission to access this file"))
|
|
|
|
make_access_log(doctype="File", document=file.name, file_type=os.path.splitext(path)[-1][1:])
|
|
return send_private_file(path.split("/private", 1)[1])
|
|
|
|
|
|
def send_private_file(path: str) -> Response:
|
|
path = os.path.join(frappe.local.conf.get("private_path", "private"), path.strip("/"))
|
|
filename = os.path.basename(path)
|
|
|
|
if frappe.local.request.headers.get("X-Use-X-Accel-Redirect"):
|
|
path = "/protected/" + path
|
|
response = Response()
|
|
response.headers["X-Accel-Redirect"] = quote(frappe.utils.encode(path))
|
|
|
|
else:
|
|
filepath = frappe.utils.get_site_path(path)
|
|
try:
|
|
f = open(filepath, "rb")
|
|
except OSError:
|
|
raise NotFound
|
|
|
|
response = Response(wrap_file(frappe.local.request.environ, f), direct_passthrough=True)
|
|
|
|
# no need for content disposition and force download. let browser handle its opening.
|
|
# Except for those that can be injected with scripts.
|
|
|
|
extension = os.path.splitext(path)[1]
|
|
blacklist = [".svg", ".html", ".htm", ".xml"]
|
|
|
|
if extension.lower() in blacklist:
|
|
response.headers.add("Content-Disposition", "attachment", filename=filename)
|
|
|
|
response.mimetype = mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
|
|
|
return response
|
|
|
|
|
|
def handle_session_stopped():
|
|
from frappe.website.serve import get_response
|
|
|
|
frappe.respond_as_web_page(
|
|
_("Updating"),
|
|
_("The system is being updated. Please refresh again after a few moments."),
|
|
http_status_code=503,
|
|
indicator_color="orange",
|
|
fullpage=True,
|
|
primary_action=None,
|
|
)
|
|
return get_response("message", http_status_code=503)
|