commit
d5fc0cef5d
6 changed files with 191 additions and 1 deletions
|
|
@ -25,6 +25,7 @@ from frappe.utils.error import make_error_snapshot
|
|||
from frappe.core.doctype.comment.comment import update_comments_in_parent_after_request
|
||||
from frappe import _
|
||||
import frappe.recorder
|
||||
import frappe.monitor
|
||||
|
||||
local_manager = LocalManager([frappe.local])
|
||||
|
||||
|
|
@ -52,6 +53,7 @@ def application(request):
|
|||
init_request(request)
|
||||
|
||||
frappe.recorder.record()
|
||||
frappe.monitor.start()
|
||||
|
||||
if frappe.local.form_dict.cmd:
|
||||
response = frappe.handler.handle()
|
||||
|
|
@ -91,6 +93,7 @@ def application(request):
|
|||
if response and hasattr(frappe.local, 'cookie_manager'):
|
||||
frappe.local.cookie_manager.flush_cookies(response=response)
|
||||
|
||||
frappe.monitor.stop(response)
|
||||
frappe.recorder.dump()
|
||||
|
||||
frappe.destroy()
|
||||
|
|
|
|||
|
|
@ -174,7 +174,8 @@ scheduler_events = {
|
|||
"frappe.email.doctype.email_account.email_account.pull",
|
||||
"frappe.email.doctype.email_account.email_account.notify_unreplied",
|
||||
"frappe.integrations.doctype.razorpay_settings.razorpay_settings.capture_payment",
|
||||
'frappe.utils.global_search.sync_global_search'
|
||||
'frappe.utils.global_search.sync_global_search',
|
||||
"frappe.monitor.flush",
|
||||
],
|
||||
"hourly": [
|
||||
"frappe.model.utils.link_count.update_link_count",
|
||||
|
|
|
|||
107
frappe/monitor.py
Normal file
107
frappe/monitor.py
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
|
||||
# MIT License. See license.txt
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
import traceback
|
||||
import frappe
|
||||
import os
|
||||
import uuid
|
||||
import rq
|
||||
|
||||
|
||||
MONITOR_REDIS_KEY = "monitor-transactions"
|
||||
MONITOR_MAX_ENTRIES = 1000000
|
||||
|
||||
|
||||
def start(transaction_type="request", method=None, kwargs=None):
|
||||
if frappe.conf.monitor:
|
||||
frappe.local.monitor = Monitor(transaction_type, method, kwargs)
|
||||
|
||||
|
||||
def stop(response=None):
|
||||
if frappe.conf.monitor and hasattr(frappe.local, "monitor"):
|
||||
frappe.local.monitor.dump(response)
|
||||
|
||||
|
||||
def log_file():
|
||||
return os.path.join(frappe.utils.get_bench_path(), "logs", "monitor.json.log")
|
||||
|
||||
|
||||
class Monitor:
|
||||
def __init__(self, transaction_type, method, kwargs):
|
||||
try:
|
||||
self.data = frappe._dict(
|
||||
{
|
||||
"site": frappe.local.site,
|
||||
"timestamp": datetime.utcnow(),
|
||||
"transaction_type": transaction_type,
|
||||
"uuid": str(uuid.uuid4()),
|
||||
}
|
||||
)
|
||||
|
||||
if transaction_type == "request":
|
||||
self.collect_request_meta()
|
||||
else:
|
||||
self.collect_job_meta(method, kwargs)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
def collect_request_meta(self):
|
||||
self.data.request = frappe._dict(
|
||||
{
|
||||
"ip": frappe.local.request_ip,
|
||||
"method": frappe.request.method,
|
||||
"path": frappe.request.path,
|
||||
}
|
||||
)
|
||||
|
||||
def collect_job_meta(self, method, kwargs):
|
||||
self.data.job = frappe._dict({"method": method, "scheduled": False, "wait": 0})
|
||||
if "run_scheduled_job" in method:
|
||||
self.data.job.method = kwargs["job_type"]
|
||||
self.data.job.scheduled = True
|
||||
|
||||
job = rq.get_current_job()
|
||||
if job:
|
||||
self.data.uuid = job.id
|
||||
waitdiff = self.data.timestamp - job.enqueued_at
|
||||
self.data.job.wait = int(waitdiff.total_seconds() * 1000000)
|
||||
|
||||
def dump(self, response=None):
|
||||
try:
|
||||
timediff = datetime.utcnow() - self.data.timestamp
|
||||
# Obtain duration in microseconds
|
||||
self.data.duration = int(timediff.total_seconds() * 1000000)
|
||||
|
||||
if self.data.transaction_type == "request":
|
||||
self.data.request.status_code = response.status_code
|
||||
self.data.request.response_length = int(response.headers["Content-Length"])
|
||||
|
||||
self.store()
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
def store(self):
|
||||
if frappe.cache().llen(MONITOR_REDIS_KEY) > MONITOR_MAX_ENTRIES:
|
||||
frappe.cache().ltrim(MONITOR_REDIS_KEY, 1, -1)
|
||||
serialized = json.dumps(self.data, sort_keys=True, default=str)
|
||||
frappe.cache().rpush(MONITOR_REDIS_KEY, serialized)
|
||||
|
||||
|
||||
def flush():
|
||||
try:
|
||||
# Fetch all the logs without removing from cache
|
||||
logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1)
|
||||
if logs:
|
||||
logs = list(map(frappe.safe_decode, logs))
|
||||
with open(log_file(), "a", os.O_NONBLOCK) as f:
|
||||
f.write("\n".join(logs))
|
||||
f.write("\n")
|
||||
# Remove fetched entries from cache
|
||||
frappe.cache().ltrim(MONITOR_REDIS_KEY, len(logs) - 1, -1)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
70
frappe/tests/test_monitor.py
Normal file
70
frappe/tests/test_monitor.py
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
|
||||
# MIT License. See license.txt
|
||||
|
||||
from __future__ import unicode_literals
|
||||
import unittest
|
||||
import frappe
|
||||
import frappe.monitor
|
||||
from frappe.utils import set_request
|
||||
from frappe.utils.response import build_response
|
||||
from frappe.monitor import MONITOR_REDIS_KEY
|
||||
|
||||
|
||||
class TestMonitor(unittest.TestCase):
|
||||
def setUp(self):
|
||||
frappe.conf.monitor = 1
|
||||
frappe.cache().delete_value(MONITOR_REDIS_KEY)
|
||||
|
||||
def test_enable_monitor(self):
|
||||
set_request(method="GET", path="/api/method/frappe.ping")
|
||||
response = build_response("json")
|
||||
|
||||
frappe.monitor.start()
|
||||
frappe.monitor.stop(response)
|
||||
|
||||
logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1)
|
||||
self.assertEqual(len(logs), 1)
|
||||
|
||||
log = frappe.parse_json(logs[0].decode())
|
||||
self.assertTrue(log.duration)
|
||||
self.assertTrue(log.site)
|
||||
self.assertTrue(log.timestamp)
|
||||
self.assertTrue(log.uuid)
|
||||
self.assertTrue(log.request)
|
||||
self.assertEqual(log.transaction_type, "request")
|
||||
self.assertEqual(log.request["method"], "GET")
|
||||
|
||||
def test_job(self):
|
||||
frappe.utils.background_jobs.execute_job(
|
||||
frappe.local.site, "frappe.ping", None, None, {}, is_async=False
|
||||
)
|
||||
|
||||
logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1)
|
||||
self.assertEqual(len(logs), 1)
|
||||
log = frappe.parse_json(logs[0].decode())
|
||||
self.assertEqual(log.transaction_type, "job")
|
||||
self.assertTrue(log.job)
|
||||
self.assertEqual(log.job["method"], "frappe.ping")
|
||||
self.assertEqual(log.job["scheduled"], False)
|
||||
self.assertEqual(log.job["wait"], 0)
|
||||
|
||||
def test_flush(self):
|
||||
set_request(method="GET", path="/api/method/frappe.ping")
|
||||
response = build_response("json")
|
||||
frappe.monitor.start()
|
||||
frappe.monitor.stop(response)
|
||||
|
||||
open(frappe.monitor.log_file(), "w").close()
|
||||
frappe.monitor.flush()
|
||||
|
||||
with open(frappe.monitor.log_file()) as f:
|
||||
logs = f.readlines()
|
||||
|
||||
self.assertEqual(len(logs), 1)
|
||||
log = frappe.parse_json(logs[0])
|
||||
self.assertEqual(log.transaction_type, "request")
|
||||
|
||||
def tearDown(self):
|
||||
frappe.conf.monitor = 0
|
||||
frappe.cache().delete_value(MONITOR_REDIS_KEY)
|
||||
|
|
@ -9,6 +9,7 @@ import os, socket, time
|
|||
from frappe import _
|
||||
from six import string_types
|
||||
from uuid import uuid4
|
||||
import frappe.monitor
|
||||
|
||||
# imports - third-party imports
|
||||
|
||||
|
|
@ -94,6 +95,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
|
|||
else:
|
||||
method_name = cstr(method.__name__)
|
||||
|
||||
frappe.monitor.start("job", method_name, kwargs)
|
||||
try:
|
||||
method(**kwargs)
|
||||
|
||||
|
|
@ -128,6 +130,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
|
|||
frappe.db.commit()
|
||||
|
||||
finally:
|
||||
frappe.monitor.stop()
|
||||
if is_async:
|
||||
frappe.destroy()
|
||||
|
||||
|
|
|
|||
|
|
@ -140,6 +140,12 @@ class RedisWrapper(redis.Redis):
|
|||
def llen(self, key):
|
||||
return super(RedisWrapper, self).llen(self.make_key(key))
|
||||
|
||||
def lrange(self, key, start, stop):
|
||||
return super(RedisWrapper, self).lrange(self.make_key(key), start, stop)
|
||||
|
||||
def ltrim(self, key, start, stop):
|
||||
return super(RedisWrapper, self).ltrim(self.make_key(key), start, stop)
|
||||
|
||||
def hset(self, name, key, value, shared=False):
|
||||
if key is None:
|
||||
return
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue