seitime-frappe/frappe/utils/error.py
2023-06-24 15:22:01 +05:30

325 lines
8.6 KiB
Python

# Copyright (c) 2015, Maxwell Morais and contributors
# License: MIT. See LICENSE
import datetime
import functools
import inspect
import json
import linecache
import os
import sys
import traceback
import frappe
from frappe.utils import cstr, encode
EXCLUDE_EXCEPTIONS = (
frappe.AuthenticationError,
frappe.CSRFTokenError, # CSRF covers OAuth too
frappe.SecurityException,
frappe.InReadOnlyMode,
)
LDAP_BASE_EXCEPTION = "LDAPException"
def _is_ldap_exception(e):
"""Check if exception is from LDAP library.
This is a hack but ensures that LDAP is not imported unless it's required. This is tested in
unittests in case the exception changes in future.
"""
for t in type(e).__mro__:
if t.__name__ == LDAP_BASE_EXCEPTION:
return True
return False
def make_error_snapshot(exception):
if frappe.conf.disable_error_snapshot:
return
if isinstance(exception, EXCLUDE_EXCEPTIONS) or _is_ldap_exception(exception):
return
logger = frappe.logger(with_more_info=True)
try:
error_id = "{timestamp:s}-{ip:s}-{hash:s}".format(
timestamp=cstr(datetime.datetime.now()),
ip=frappe.local.request_ip or "127.0.0.1",
hash=frappe.generate_hash(length=3),
)
snapshot_folder = get_error_snapshot_path()
frappe.create_folder(snapshot_folder)
snapshot_file_path = os.path.join(snapshot_folder, f"{error_id}.json")
snapshot = get_snapshot(exception)
with open(encode(snapshot_file_path), "wb") as error_file:
error_file.write(encode(frappe.as_json(snapshot)))
logger.error(f"New Exception collected with id: {error_id}")
except Exception as e:
logger.error(f"Could not take error snapshot: {e}", exc_info=True)
def get_snapshot(exception, context=10):
import pydoc
"""
Return a dict describing a given traceback (based on cgitb.text)
"""
etype, evalue, etb = sys.exc_info()
if isinstance(etype, type):
etype = etype.__name__
# creates a snapshot dict with some basic information
s = {
"pyver": "Python {version:s}: {executable:s} (prefix: {prefix:s})".format(
version=sys.version.split(maxsplit=1)[0], executable=sys.executable, prefix=sys.prefix
),
"timestamp": cstr(datetime.datetime.now()),
"traceback": traceback.format_exc(),
"frames": [],
"etype": cstr(etype),
"evalue": cstr(repr(evalue)),
"exception": {},
"locals": {},
}
# start to process frames
records = inspect.getinnerframes(etb, 5)
for frame, file, lnum, func, lines, index in records:
file = file and os.path.abspath(file) or "?"
args, varargs, varkw, locals = inspect.getargvalues(frame)
call = ""
if func != "?":
call = inspect.formatargvalues(
args, varargs, varkw, locals, formatvalue=lambda value: f"={pydoc.text.repr(value)}"
)
# basic frame information
f = {"file": file, "func": func, "call": call, "lines": {}, "lnum": lnum}
def reader(lnum=[lnum]): # noqa
try:
# B023: function is evaluated immediately, binding not necessary
return linecache.getline(file, lnum[0]) # noqa: B023
finally:
lnum[0] += 1
vars = _scanvars(reader, frame, locals)
# if it is a view, replace with generated code
# if file.endswith('html'):
# lmin = lnum > context and (lnum - context) or 0
# lmax = lnum + context
# lines = code.split("\n")[lmin:lmax]
# index = min(context, lnum) - 1
if index is not None:
i = lnum - index
for line in lines:
f["lines"][i] = line.rstrip()
i += 1
# dump local variable (referenced in current line only)
f["dump"] = {}
for name, where, value in vars:
if name in f["dump"]:
continue
if value is not __UNDEF__:
if where == "global":
name = f"global {name:s}"
elif where != "local":
name = where + " " + name.split(".")[-1]
f["dump"][name] = pydoc.text.repr(value)
else:
f["dump"][name] = "undefined"
s["frames"].append(f)
# add exception type, value and attributes
if isinstance(evalue, BaseException):
for name in dir(evalue):
if name != "messages" and not name.startswith("__"):
value = pydoc.text.repr(getattr(evalue, name))
s["exception"][name] = encode(value)
# add all local values (of last frame) to the snapshot
for name, value in locals.items():
s["locals"][name] = value if isinstance(value, str) else pydoc.text.repr(value)
return s
def collect_error_snapshots():
"""Scheduled task to collect error snapshots from files and push into Error Snapshot table"""
if frappe.conf.disable_error_snapshot:
return
try:
path = get_error_snapshot_path()
if not os.path.exists(path):
return
for fname in os.listdir(path):
fullpath = os.path.join(path, fname)
try:
with open(fullpath) as filedata:
data = json.load(filedata)
except ValueError:
# empty file
os.remove(fullpath)
continue
for field in ["locals", "exception", "frames"]:
data[field] = frappe.as_json(data[field])
doc = frappe.new_doc("Error Snapshot")
doc.update(data)
doc.save()
frappe.db.commit()
os.remove(fullpath)
clear_old_snapshots()
except Exception as e:
make_error_snapshot(e)
# prevent creation of unlimited error snapshots
raise
def clear_old_snapshots():
"""Clear snapshots that are older than a month"""
from frappe.query_builder import DocType, Interval
from frappe.query_builder.functions import Now
ErrorSnapshot = DocType("Error Snapshot")
frappe.db.delete(ErrorSnapshot, filters=(ErrorSnapshot.creation < (Now() - Interval(months=1))))
path = get_error_snapshot_path()
today = datetime.datetime.now()
for file in os.listdir(path):
p = os.path.join(path, file)
ctime = datetime.datetime.fromtimestamp(os.path.getctime(p))
if (today - ctime).days > 31:
os.remove(os.path.join(path, p))
def get_error_snapshot_path():
return frappe.get_site_path("error-snapshots")
def get_default_args(func):
"""Get default arguments of a function from its signature."""
signature = inspect.signature(func)
return {
k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty
}
def raise_error_on_no_output(error_message, error_type=None, keep_quiet=None):
"""Decorate any function to throw error incase of missing output.
TODO: Remove keep_quiet flag after testing and fixing sendmail flow.
:param error_message: error message to raise
:param error_type: type of error to raise
:param keep_quiet: control error raising with external factor.
:type error_message: str
:type error_type: Exception Class
:type keep_quiet: function
>>> @raise_error_on_no_output("Ingradients missing")
... def get_indradients(_raise_error=1): return
...
>>> get_ingradients()
`Exception Name`: Ingradients missing
"""
def decorator_raise_error_on_no_output(func):
@functools.wraps(func)
def wrapper_raise_error_on_no_output(*args, **kwargs):
response = func(*args, **kwargs)
if callable(keep_quiet) and keep_quiet():
return response
default_kwargs = get_default_args(func)
default_raise_error = default_kwargs.get("_raise_error")
raise_error = kwargs.get("_raise_error") if "_raise_error" in kwargs else default_raise_error
if (not response) and raise_error:
frappe.throw(error_message, error_type or Exception)
return response
return wrapper_raise_error_on_no_output
return decorator_raise_error_on_no_output
# Vendored from cgitb standard library reused under PSF License:
# https://github.com/python/cpython/blob/main/LICENSE
import keyword
import tokenize
__UNDEF__ = [] # a special sentinel object
def _scanvars(reader, frame, locals):
"""Scan one logical line of Python and look up values of variables used."""
vars, lasttoken, parent, prefix, value = [], None, None, "", __UNDEF__
for ttype, token, start, end, line in tokenize.generate_tokens(reader):
if ttype == tokenize.NEWLINE:
break
if ttype == tokenize.NAME and token not in keyword.kwlist:
if lasttoken == ".":
if parent is not __UNDEF__:
value = getattr(parent, token, __UNDEF__)
vars.append((prefix + token, prefix, value))
else:
where, value = _lookup(token, frame, locals)
vars.append((token, where, value))
elif token == ".":
prefix += lasttoken + "."
parent = value
else:
parent, prefix = None, ""
lasttoken = token
return vars
def _lookup(name, frame, locals):
"""Find the value for a given name in the given environment."""
if name in locals:
return "local", locals[name]
if name in frame.f_globals:
return "global", frame.f_globals[name]
if "__builtins__" in frame.f_globals:
builtins = frame.f_globals["__builtins__"]
if type(builtins) is type({}): # noqa
if name in builtins:
return "builtin", builtins[name]
else:
if hasattr(builtins, name):
return "builtin", getattr(builtins, name)
return None, __UNDEF__
# end: vendored code