Merge pull request #24683 from ankush/record_and_profile

feat(recorder): profiling and granular recording
This commit is contained in:
Ankush Menat 2024-02-05 12:07:49 +05:30 committed by GitHub
commit c668d4f5a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 262 additions and 49 deletions

View file

@ -20,7 +20,9 @@
"section_break_sgro",
"form_dict",
"section_break_9jhm",
"sql_queries"
"sql_queries",
"section_break_optn",
"profile"
],
"fields": [
{
@ -107,6 +109,16 @@
"fieldtype": "Data",
"hidden": 1,
"label": "Event Type"
},
{
"fieldname": "section_break_optn",
"fieldtype": "Section Break"
},
{
"fieldname": "profile",
"fieldtype": "Code",
"label": "cProfile Output",
"read_only": 1
}
],
"hide_toolbar": 1,
@ -114,7 +126,7 @@
"index_web_pages_for_search": 1,
"is_virtual": 1,
"links": [],
"modified": "2024-01-03 16:45:47.110048",
"modified": "2024-02-01 22:13:26.505174",
"modified_by": "Administrator",
"module": "Core",
"name": "Recorder",

View file

@ -24,6 +24,7 @@ class Recorder(Document):
method: DF.Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]
number_of_queries: DF.Int
path: DF.Data | None
profile: DF.Code | None
request_headers: DF.Code | None
sql_queries: DF.Table[RecorderQuery]
time: DF.Datetime | None

View file

@ -10,12 +10,7 @@ frappe.listview_settings["Recorder"] = {
}
listview.page.add_button(__("Clear"), () => {
frappe.call({
method: "frappe.recorder.delete",
callback: function () {
listview.refresh();
},
});
frappe.xcall("frappe.recorder.delete").then(listview.refresh);
});
listview.page.add_menu_item(__("Import"), () => {
@ -88,18 +83,125 @@ frappe.listview_settings["Recorder"] = {
},
setup_recorder_controls(listview) {
let me = this;
listview.page.set_primary_action(listview.enabled ? __("Stop") : __("Start"), () => {
frappe.call({
method: listview.enabled ? "frappe.recorder.stop" : "frappe.recorder.start",
callback: function () {
listview.refresh();
},
});
listview.enabled = !listview.enabled;
this.refresh_controls(listview);
if (listview.enabled) {
me.stop_recorder(listview);
} else {
me.start_recorder(listview);
}
});
},
stop_recorder(listview) {
let me = this;
frappe.xcall("frappe.recorder.stop", {}).then(() => {
listview.refresh();
listview.enabled = false;
me.refresh_controls(listview);
});
},
start_recorder(listview) {
let me = this;
frappe.prompt(
[
{
fieldtype: "Section Break",
fieldname: "req_job_section",
},
{
fieldtype: "Column Break",
fieldname: "web_request_columns",
label: "Web Requests",
},
{
fieldname: "record_requests",
fieldtype: "Check",
label: "Record Web Requests",
default: 1,
},
{
fieldname: "request_filter",
fieldtype: "Data",
label: "Request path filter",
default: "/",
depends_on: "record_requests",
description: `This will be used for filtering paths which will be recorded.
You can use this to avoid slowing down other traffic.
e.g. <code>/api/method/erpnext</code>. Leave it empty to record every request.`,
},
{
fieldtype: "Column Break",
fieldname: "background_col",
label: "Background Jobs",
},
{
fieldname: "record_jobs",
fieldtype: "Check",
label: "Record Background Jobs",
default: 1,
},
{
fieldname: "jobs_filter",
fieldtype: "Data",
label: "Background Jobs filter",
default: "",
depends_on: "record_jobs",
description: `This will be used for filtering jobs which will be recorded.
You can use this to avoid slowing down other jobs. e.g. <code>email_queue.pull</code>.
Leave it empty to record every job.`,
},
{
fieldtype: "Section Break",
fieldname: "sql_section",
label: "SQL",
},
{
fieldname: "record_sql",
fieldtype: "Check",
label: "Record SQL queries",
default: 1,
},
{
fieldname: "explain",
fieldtype: "Check",
label: "Generate EXPLAIN for SQL queries",
default: 1,
},
{
fieldname: "capture_stack",
fieldtype: "Check",
label: "Capture callstack of SQL queries",
default: 1,
},
{
fieldtype: "Section Break",
fieldname: "python_section",
label: "Python",
},
{
fieldname: "profile",
fieldtype: "Check",
label: "Run cProfile",
default: 0,
description:
"Warning: cProfile adds a lot of overhead. For best results, disable stack capturing when using cProfile.",
},
],
(values) => {
frappe.xcall("frappe.recorder.start", values).then(() => {
listview.refresh();
listview.enabled = true;
me.refresh_controls(listview);
});
},
__("Configure Recorder"),
__("Start Recordig")
);
},
update_indicators(listview) {
if (listview.enabled) {
listview.page.set_indicator(__("Active"), "green");

View file

@ -1,32 +1,67 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import datetime
import cProfile
import functools
import inspect
import io
import json
import pstats
import re
import time
from collections import Counter
from collections.abc import Callable
from dataclasses import dataclass
import sqlparse
import frappe
from frappe import _
from frappe.database.database import is_query_type
from frappe.utils import now_datetime
RECORDER_INTERCEPT_FLAG = "recorder-intercept"
RECORDER_CONFIG_FLAG = "recorder-config"
RECORDER_REQUEST_SPARSE_HASH = "recorder-requests-sparse"
RECORDER_REQUEST_HASH = "recorder-requests"
TRACEBACK_PATH_PATTERN = re.compile(".*/apps/")
RECORDER_AUTO_DISABLE = 5 * 60
def sql(*args, **kwargs):
@dataclass
class RecorderConfig:
record_requests: bool = True # Record web request
record_jobs: bool = True # record background jobs
record_sql: bool = True # Record SQL queries
capture_stack: bool = True # Recod call stack of SQL queries
profile: bool = False # Run cProfile
explain: bool = True # Provide explain output of SQL queries
request_filter: str = "/" # Filter request paths
jobs_filter: str = "" # Filter background jobs
def __post_init__(self):
if not (self.record_jobs or self.record_requests):
frappe.throw("You must record one of jobs or requests")
def store(self):
frappe.cache.set_value(RECORDER_CONFIG_FLAG, self, expires_in_sec=RECORDER_AUTO_DISABLE)
@classmethod
def retrieve(cls):
return frappe.cache.get_value(RECORDER_CONFIG_FLAG) or cls()
@staticmethod
def delete():
frappe.cache.delete_value(RECORDER_CONFIG_FLAG)
def record_sql(*args, **kwargs):
start_time = time.monotonic()
result = frappe.db._sql(*args, **kwargs)
end_time = time.monotonic()
stack = list(get_current_stack_frames())
stack = []
if frappe.local._recorder.config.capture_stack:
stack = list(get_current_stack_frames())
data = {
"query": str(frappe.db.last_query),
@ -69,6 +104,7 @@ def post_process():
frappe.db.rollback()
frappe.db.begin(read_only=True) # Explicitly start read only transaction
config = RecorderConfig.retrieve()
result = list(frappe.cache.hgetall(RECORDER_REQUEST_HASH).values())
for request in result:
@ -79,7 +115,7 @@ def post_process():
call["query"] = formatted_query
# Collect EXPLAIN for executed query
if is_query_type(formatted_query, ("select", "update", "delete")):
if config.explain and is_query_type(formatted_query, ("select", "update", "delete")):
# Only SELECT/UPDATE/DELETE queries can be "EXPLAIN"ed
try:
call["explain_result"] = frappe.db.sql(f"EXPLAIN {formatted_query}", as_dict=True)
@ -88,6 +124,8 @@ def post_process():
mark_duplicates(request)
frappe.cache.hset(RECORDER_REQUEST_HASH, request["uuid"], request)
config.delete()
def mark_duplicates(request):
exact_duplicates = Counter([call["query"] for call in request["calls"]])
@ -134,7 +172,7 @@ def normalize_query(query: str) -> str:
def record(force=False):
if __debug__:
if frappe.cache.get_value(RECORDER_INTERCEPT_FLAG) or force:
frappe.local._recorder = Recorder()
frappe.local._recorder = Recorder(force=force)
def dump():
@ -144,38 +182,78 @@ def dump():
class Recorder:
def __init__(self):
self.uuid = frappe.generate_hash(length=10)
self.time = datetime.datetime.now()
def __init__(self, force=False):
self.config = RecorderConfig.retrieve()
self.calls = []
if frappe.request:
self._patched_sql = False
self.profiler = None
self._recording = True
self.force = force
self.cmd = None
self.method = None
self.headers = None
self.form_dict = None
if (
self.config.record_requests
and frappe.request
and self.config.request_filter in frappe.request.path
):
self.path = frappe.request.path
self.cmd = frappe.local.form_dict.cmd or ""
self.method = frappe.request.method
self.headers = dict(frappe.local.request.headers)
self.form_dict = frappe.local.form_dict
self.event_type = "HTTP Request"
elif frappe.job:
elif self.config.record_jobs and frappe.job and self.config.jobs_filter in frappe.job.method:
self.event_type = "Background Job"
self.path = frappe.job.method
self.cmd = None
self.method = None
self.headers = None
self.form_dict = None
elif not self.force:
self._recording = False
return
else:
self.event_type = None
self.path = None
self.cmd = None
self.method = None
self.headers = None
self.form_dict = None
self.event_type = "Function Call"
_patch()
self.uuid = frappe.generate_hash(length=10)
self.time = now_datetime()
if self.config.record_sql:
self._patch_sql()
self._patched_sql = True
if self.config.profile:
self.profiler = cProfile.Profile()
self.profiler.enable()
def register(self, data):
self.calls.append(data)
def cleanup(self):
if self.profiler:
self.profiler.disable()
if self._patched_sql:
self._unpatch_sql()
def process_profiler(self):
if self.config.profile or self.profiler:
self.profiler.disable()
profiler_output = io.StringIO()
pstats.Stats(self.profiler, stream=profiler_output).strip_dirs().sort_stats(
"cumulative"
).print_stats()
profile = profiler_output.getvalue()
profiler_output.close()
return profile
def dump(self):
if not self._recording:
return
profiler_output = self.process_profiler()
request_data = {
"uuid": self.uuid,
"path": self.path,
@ -183,38 +261,37 @@ class Recorder:
"time": self.time,
"queries": len(self.calls),
"time_queries": float("{:0.3f}".format(sum(call["duration"] for call in self.calls))),
"duration": float(f"{(datetime.datetime.now() - self.time).total_seconds() * 1000:0.3f}"),
"duration": float(f"{(now_datetime() - self.time).total_seconds() * 1000:0.3f}"),
"method": self.method,
"event_type": self.event_type,
}
frappe.cache.hset(RECORDER_REQUEST_SPARSE_HASH, self.uuid, request_data)
frappe.publish_realtime(
event="recorder-dump-event",
message=json.dumps(request_data, default=str),
user="Administrator",
)
request_data["calls"] = self.calls
request_data["headers"] = self.headers
request_data["form_dict"] = self.form_dict
request_data["profile"] = profiler_output
frappe.cache.hset(RECORDER_REQUEST_HASH, self.uuid, request_data)
if self.config.record_sql:
self._unpatch_sql()
def _patch():
frappe.db._sql = frappe.db.sql
frappe.db.sql = sql
@staticmethod
def _patch_sql():
frappe.db._sql = frappe.db.sql
frappe.db.sql = record_sql
def _unpatch():
frappe.db.sql = frappe.db._sql
@staticmethod
def _unpatch_sql():
frappe.db.sql = frappe.db._sql
def do_not_record(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
if hasattr(frappe.local, "_recorder"):
frappe.local._recorder.cleanup()
del frappe.local._recorder
frappe.db.sql = frappe.db._sql
return function(*args, **kwargs)
return wrapper
@ -240,8 +317,29 @@ def status(*args, **kwargs):
@frappe.whitelist()
@do_not_record
@administrator_only
def start(*args, **kwargs):
frappe.cache.set_value(RECORDER_INTERCEPT_FLAG, 1, expires_in_sec=60 * 60)
def start(
record_jobs: bool = True,
record_requests: bool = True,
record_sql: bool = True,
profile: bool = False,
capture_stack: bool = True,
explain: bool = True,
request_filter: str = "/",
jobs_filter: str = "",
*args,
**kwargs,
):
RecorderConfig(
record_requests=int(record_requests),
record_jobs=int(record_jobs),
record_sql=int(record_sql),
profile=int(profile),
capture_stack=int(capture_stack),
explain=int(explain),
request_filter=request_filter,
jobs_filter=jobs_filter,
).store()
frappe.cache.set_value(RECORDER_INTERCEPT_FLAG, 1, expires_in_sec=RECORDER_AUTO_DISABLE)
@frappe.whitelist()
@ -287,7 +385,7 @@ def record_queries(func: Callable):
frappe.local._recorder.path = f"Function call: {func.__module__}.{func.__qualname__}"
ret = func(*args, **kwargs)
dump()
_unpatch()
Recorder._unpatch_sql()
post_process()
print("Recorded queries, open recorder to view them.")
return ret