412 lines
11 KiB
Python
412 lines
11 KiB
Python
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
import cProfile
|
|
import functools
|
|
import inspect
|
|
import io
|
|
import json
|
|
import pstats
|
|
import re
|
|
import time
|
|
from collections import Counter
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
|
|
import sqlparse
|
|
|
|
import frappe
|
|
from frappe import _
|
|
from frappe.database.utils import is_query_type
|
|
from frappe.utils import now_datetime
|
|
|
|
RECORDER_INTERCEPT_FLAG = "recorder-intercept"
|
|
RECORDER_CONFIG_FLAG = "recorder-config"
|
|
RECORDER_REQUEST_SPARSE_HASH = "recorder-requests-sparse"
|
|
RECORDER_REQUEST_HASH = "recorder-requests"
|
|
TRACEBACK_PATH_PATTERN = re.compile(".*/apps/")
|
|
RECORDER_AUTO_DISABLE = 5 * 60
|
|
|
|
|
|
@dataclass
|
|
class RecorderConfig:
|
|
record_requests: bool = True # Record web request
|
|
record_jobs: bool = True # record background jobs
|
|
record_sql: bool = True # Record SQL queries
|
|
capture_stack: bool = True # Recod call stack of SQL queries
|
|
profile: bool = False # Run cProfile
|
|
explain: bool = True # Provide explain output of SQL queries
|
|
request_filter: str = "/" # Filter request paths
|
|
jobs_filter: str = "" # Filter background jobs
|
|
|
|
def __post_init__(self):
|
|
if not (self.record_jobs or self.record_requests):
|
|
frappe.throw("You must record one of jobs or requests")
|
|
|
|
def store(self):
|
|
frappe.cache.set_value(RECORDER_CONFIG_FLAG, self, expires_in_sec=RECORDER_AUTO_DISABLE)
|
|
|
|
@classmethod
|
|
def retrieve(cls):
|
|
return frappe.cache.get_value(RECORDER_CONFIG_FLAG) or cls()
|
|
|
|
@staticmethod
|
|
def delete():
|
|
frappe.cache.delete_value(RECORDER_CONFIG_FLAG)
|
|
|
|
|
|
def record_sql(*args, **kwargs):
|
|
start_time = time.monotonic()
|
|
result = frappe.db._sql(*args, **kwargs)
|
|
end_time = time.monotonic()
|
|
|
|
query = getattr(frappe.db, "last_query", None)
|
|
if not query or isinstance(result, str):
|
|
# run=0, doesn't actually run the query so last_query won't be present
|
|
return result
|
|
|
|
stack = []
|
|
if frappe.local._recorder.config.capture_stack:
|
|
stack = list(get_current_stack_frames())
|
|
|
|
data = {
|
|
"query": str(query),
|
|
"stack": stack,
|
|
"explain_result": [],
|
|
"time": start_time,
|
|
"duration": float(f"{(end_time - start_time) * 1000:.3f}"),
|
|
}
|
|
|
|
frappe.local._recorder.register(data)
|
|
return result
|
|
|
|
|
|
def get_current_stack_frames():
|
|
from frappe.utils.safe_exec import SERVER_SCRIPT_FILE_PREFIX
|
|
|
|
try:
|
|
current = inspect.currentframe()
|
|
frames = inspect.getouterframes(current, context=10)
|
|
for frame, filename, lineno, function, context, index in list(reversed(frames))[:-2]: # noqa: B007
|
|
if "/apps/" in filename or SERVER_SCRIPT_FILE_PREFIX in filename:
|
|
yield {
|
|
"filename": TRACEBACK_PATH_PATTERN.sub("", filename),
|
|
"lineno": lineno,
|
|
"function": function,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def post_process():
|
|
"""post process all recorded values.
|
|
|
|
Any processing that can be done later should be done here to avoid overhead while
|
|
profiling. As of now following values are post-processed:
|
|
- `EXPLAIN` output of queries.
|
|
- SQLParse reformatting of queries
|
|
- Mark duplicates
|
|
"""
|
|
frappe.db.rollback()
|
|
frappe.db.begin(read_only=True) # Explicitly start read only transaction
|
|
|
|
config = RecorderConfig.retrieve()
|
|
result = list(frappe.cache.hgetall(RECORDER_REQUEST_HASH).values())
|
|
|
|
for request in result:
|
|
for call in request["calls"]:
|
|
formatted_query = sqlparse.format(
|
|
call["query"].strip(), keyword_case="upper", reindent=True, strip_comments=True
|
|
)
|
|
call["query"] = formatted_query
|
|
|
|
# Collect EXPLAIN for executed query
|
|
if config.explain and is_query_type(formatted_query, ("select", "update", "delete")):
|
|
# Only SELECT/UPDATE/DELETE queries can be "EXPLAIN"ed
|
|
try:
|
|
call["explain_result"] = frappe.db.sql(f"EXPLAIN {formatted_query}", as_dict=True)
|
|
except Exception:
|
|
pass
|
|
mark_duplicates(request)
|
|
frappe.cache.hset(RECORDER_REQUEST_HASH, request["uuid"], request)
|
|
|
|
config.delete()
|
|
|
|
|
|
def mark_duplicates(request):
|
|
exact_duplicates = Counter([call["query"] for call in request["calls"]])
|
|
|
|
for sql_call in request["calls"]:
|
|
sql_call["normalized_query"] = normalize_query(sql_call["query"])
|
|
|
|
normalized_duplicates = Counter([call["normalized_query"] for call in request["calls"]])
|
|
|
|
for index, call in enumerate(request["calls"]):
|
|
call["index"] = index
|
|
call["exact_copies"] = exact_duplicates[call["query"]]
|
|
call["normalized_copies"] = normalized_duplicates[call["normalized_query"]]
|
|
|
|
|
|
def normalize_query(query: str) -> str:
|
|
"""Attempt to normalize query by removing variables.
|
|
This gives a different view of similar duplicate queries.
|
|
|
|
Example:
|
|
These two are distinct queries:
|
|
`select * from user where name = 'x'`
|
|
`select * from user where name = 'z'`
|
|
|
|
But their "normalized" form would be same:
|
|
`select * from user where name = ?`
|
|
"""
|
|
|
|
try:
|
|
q = sqlparse.parse(query)[0]
|
|
for token in q.flatten():
|
|
if "Token.Literal" in str(token.ttype):
|
|
token.value = "?"
|
|
|
|
# Transform IN parts like this: IN (?, ?, ?) -> IN (?)
|
|
q = re.sub(r"( IN )\(\?[\s\n\?\,]*\)", r"\1(?)", str(q), flags=re.IGNORECASE)
|
|
return q
|
|
except Exception as e:
|
|
print("Failed to normalize query ", e)
|
|
|
|
return query
|
|
|
|
|
|
def record(force=False):
|
|
flag_value = frappe.client_cache.get_value(RECORDER_INTERCEPT_FLAG)
|
|
if flag_value or force:
|
|
frappe.local._recorder = Recorder(force=force)
|
|
elif flag_value is None:
|
|
# Explicitly set it once so next requests can use client-side cache
|
|
frappe.client_cache.set_value(RECORDER_INTERCEPT_FLAG, False)
|
|
|
|
|
|
def dump():
|
|
if hasattr(frappe.local, "_recorder"):
|
|
frappe.local._recorder.dump()
|
|
|
|
|
|
class Recorder:
|
|
def __init__(self, force=False):
|
|
self.config = RecorderConfig.retrieve()
|
|
self.calls = []
|
|
self._patched_sql = False
|
|
self.profiler = None
|
|
self._recording = True
|
|
self.force = force
|
|
self.cmd = None
|
|
self.method = None
|
|
self.headers = None
|
|
self.form_dict = None
|
|
|
|
if (
|
|
self.config.record_requests
|
|
and frappe.request
|
|
and self.config.request_filter in frappe.request.path
|
|
):
|
|
self.path = frappe.request.path
|
|
self.cmd = frappe.local.form_dict.cmd or ""
|
|
self.method = frappe.request.method
|
|
self.headers = dict(frappe.local.request.headers)
|
|
self.form_dict = frappe.local.form_dict
|
|
self.event_type = "HTTP Request"
|
|
elif self.config.record_jobs and frappe.job and self.config.jobs_filter in frappe.job.method:
|
|
self.event_type = "Background Job"
|
|
self.path = frappe.job.method
|
|
self.cmd = None
|
|
self.method = None
|
|
self.headers = None
|
|
self.form_dict = None
|
|
elif not self.force:
|
|
self._recording = False
|
|
return
|
|
else:
|
|
self.event_type = "Function Call"
|
|
|
|
self.uuid = frappe.generate_hash(length=10)
|
|
self.time = now_datetime()
|
|
|
|
if self.config.record_sql:
|
|
self._patch_sql()
|
|
self._patched_sql = True
|
|
|
|
if self.config.profile:
|
|
self.profiler = cProfile.Profile()
|
|
self.profiler.enable()
|
|
|
|
def register(self, data):
|
|
self.calls.append(data)
|
|
|
|
def cleanup(self):
|
|
if self.profiler:
|
|
self.profiler.disable()
|
|
if self._patched_sql:
|
|
self._unpatch_sql()
|
|
|
|
def process_profiler(self):
|
|
if self.config.profile or self.profiler:
|
|
self.profiler.disable()
|
|
profiler_output = io.StringIO()
|
|
pstats.Stats(self.profiler, stream=profiler_output).strip_dirs().sort_stats(
|
|
"cumulative"
|
|
).print_stats(200)
|
|
profile = profiler_output.getvalue()
|
|
profiler_output.close()
|
|
return profile
|
|
|
|
def dump(self):
|
|
if not self._recording:
|
|
return
|
|
profiler_output = self.process_profiler()
|
|
|
|
request_data = {
|
|
"uuid": self.uuid,
|
|
"path": self.path,
|
|
"cmd": self.cmd,
|
|
"time": self.time,
|
|
"queries": len(self.calls),
|
|
"time_queries": float("{:0.3f}".format(sum(call["duration"] for call in self.calls))),
|
|
"duration": float(f"{(now_datetime() - self.time).total_seconds() * 1000:0.3f}"),
|
|
"method": self.method,
|
|
"event_type": self.event_type,
|
|
}
|
|
frappe.cache.hset(RECORDER_REQUEST_SPARSE_HASH, self.uuid, request_data)
|
|
|
|
request_data["calls"] = self.calls
|
|
request_data["headers"] = self.headers
|
|
request_data["form_dict"] = self.form_dict
|
|
request_data["profile"] = profiler_output
|
|
frappe.cache.hset(RECORDER_REQUEST_HASH, self.uuid, request_data)
|
|
|
|
if self.config.record_sql:
|
|
self._unpatch_sql()
|
|
|
|
@staticmethod
|
|
def _patch_sql():
|
|
frappe.db._sql = frappe.db.sql
|
|
frappe.db.sql = record_sql
|
|
|
|
@staticmethod
|
|
def _unpatch_sql():
|
|
frappe.db.sql = frappe.db._sql
|
|
|
|
|
|
def do_not_record(function):
|
|
@functools.wraps(function)
|
|
def wrapper(*args, **kwargs):
|
|
if hasattr(frappe.local, "_recorder"):
|
|
frappe.local._recorder.cleanup()
|
|
del frappe.local._recorder
|
|
return function(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def administrator_only(function):
|
|
@functools.wraps(function)
|
|
def wrapper(*args, **kwargs):
|
|
if frappe.session.user != "Administrator":
|
|
frappe.throw(_("Only Administrator is allowed to use Recorder"))
|
|
return function(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def status(*args, **kwargs):
|
|
return bool(frappe.cache.get_value(RECORDER_INTERCEPT_FLAG))
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def start(
|
|
record_jobs: bool = True,
|
|
record_requests: bool = True,
|
|
record_sql: bool = True,
|
|
profile: bool = False,
|
|
capture_stack: bool = True,
|
|
explain: bool = True,
|
|
request_filter: str = "/",
|
|
jobs_filter: str = "",
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
RecorderConfig(
|
|
record_requests=int(record_requests),
|
|
record_jobs=int(record_jobs),
|
|
record_sql=int(record_sql),
|
|
profile=int(profile),
|
|
capture_stack=int(capture_stack),
|
|
explain=int(explain),
|
|
request_filter=request_filter,
|
|
jobs_filter=jobs_filter,
|
|
).store()
|
|
frappe.client_cache.set_value(RECORDER_INTERCEPT_FLAG, True)
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def stop(*args, **kwargs):
|
|
frappe.client_cache.set_value(RECORDER_INTERCEPT_FLAG, False)
|
|
frappe.enqueue(post_process, now=frappe.flags.in_test)
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def get(uuid=None, *args, **kwargs):
|
|
if uuid:
|
|
result = frappe.cache.hget(RECORDER_REQUEST_HASH, uuid)
|
|
else:
|
|
result = list(frappe.cache.hgetall(RECORDER_REQUEST_SPARSE_HASH).values())
|
|
return result
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def export_data(*args, **kwargs):
|
|
return list(frappe.cache.hgetall(RECORDER_REQUEST_HASH).values())
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def delete(*args, **kwargs):
|
|
frappe.cache.delete_value(RECORDER_REQUEST_SPARSE_HASH)
|
|
frappe.cache.delete_value(RECORDER_REQUEST_HASH)
|
|
|
|
|
|
def record_queries(func: Callable):
|
|
"""Decorator to profile a specific function using recorder."""
|
|
|
|
@functools.wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
record(force=True)
|
|
frappe.local._recorder.path = f"Function call: {func.__module__}.{func.__qualname__}"
|
|
ret = func(*args, **kwargs)
|
|
dump()
|
|
Recorder._unpatch_sql()
|
|
post_process()
|
|
print("Recorded queries, open recorder to view them.")
|
|
return ret
|
|
|
|
return wrapped
|
|
|
|
|
|
@frappe.whitelist()
|
|
@do_not_record
|
|
@administrator_only
|
|
def import_data(file: str) -> None:
|
|
file_doc = frappe.get_doc("File", {"file_url": file})
|
|
file_content = json.loads(file_doc.get_content())
|
|
for request in file_content:
|
|
frappe.cache.hset(RECORDER_REQUEST_SPARSE_HASH, request["uuid"], request)
|
|
frappe.cache.hset(RECORDER_REQUEST_HASH, request["uuid"], request)
|
|
file_doc.delete(delete_permanently=True)
|