diff --git a/frappe/app.py b/frappe/app.py index 3c46110c2e..24ce35b514 100644 --- a/frappe/app.py +++ b/frappe/app.py @@ -25,6 +25,7 @@ from frappe.utils.error import make_error_snapshot from frappe.core.doctype.comment.comment import update_comments_in_parent_after_request from frappe import _ import frappe.recorder +import frappe.monitor local_manager = LocalManager([frappe.local]) @@ -52,6 +53,7 @@ def application(request): init_request(request) frappe.recorder.record() + frappe.monitor.start() if frappe.local.form_dict.cmd: response = frappe.handler.handle() @@ -91,6 +93,7 @@ def application(request): if response and hasattr(frappe.local, 'cookie_manager'): frappe.local.cookie_manager.flush_cookies(response=response) + frappe.monitor.stop(response) frappe.recorder.dump() frappe.destroy() diff --git a/frappe/hooks.py b/frappe/hooks.py index a132eb69d5..9d5389b529 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -174,7 +174,8 @@ scheduler_events = { "frappe.email.doctype.email_account.email_account.pull", "frappe.email.doctype.email_account.email_account.notify_unreplied", "frappe.integrations.doctype.razorpay_settings.razorpay_settings.capture_payment", - 'frappe.utils.global_search.sync_global_search' + 'frappe.utils.global_search.sync_global_search', + "frappe.monitor.flush", ], "hourly": [ "frappe.model.utils.link_count.update_link_count", diff --git a/frappe/monitor.py b/frappe/monitor.py new file mode 100644 index 0000000000..7181bd92ad --- /dev/null +++ b/frappe/monitor.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors +# MIT License. See license.txt + +from __future__ import unicode_literals + +from datetime import datetime +import json +import traceback +import frappe +import os +import uuid +import rq + + +MONITOR_REDIS_KEY = "monitor-transactions" +MONITOR_MAX_ENTRIES = 1000000 + + +def start(transaction_type="request", method=None, kwargs=None): + if frappe.conf.monitor: + frappe.local.monitor = Monitor(transaction_type, method, kwargs) + + +def stop(response=None): + if frappe.conf.monitor and hasattr(frappe.local, "monitor"): + frappe.local.monitor.dump(response) + + +def log_file(): + return os.path.join(frappe.utils.get_bench_path(), "logs", "monitor.json.log") + + +class Monitor: + def __init__(self, transaction_type, method, kwargs): + try: + self.data = frappe._dict( + { + "site": frappe.local.site, + "timestamp": datetime.utcnow(), + "transaction_type": transaction_type, + "uuid": str(uuid.uuid4()), + } + ) + + if transaction_type == "request": + self.collect_request_meta() + else: + self.collect_job_meta(method, kwargs) + except Exception: + traceback.print_exc() + + def collect_request_meta(self): + self.data.request = frappe._dict( + { + "ip": frappe.local.request_ip, + "method": frappe.request.method, + "path": frappe.request.path, + } + ) + + def collect_job_meta(self, method, kwargs): + self.data.job = frappe._dict({"method": method, "scheduled": False, "wait": 0}) + if "run_scheduled_job" in method: + self.data.job.method = kwargs["job_type"] + self.data.job.scheduled = True + + job = rq.get_current_job() + if job: + self.data.uuid = job.id + waitdiff = self.data.timestamp - job.enqueued_at + self.data.job.wait = int(waitdiff.total_seconds() * 1000000) + + def dump(self, response=None): + try: + timediff = datetime.utcnow() - self.data.timestamp + # Obtain duration in microseconds + self.data.duration = int(timediff.total_seconds() * 1000000) + + if self.data.transaction_type == "request": + self.data.request.status_code = response.status_code + self.data.request.response_length = int(response.headers["Content-Length"]) + + self.store() + except Exception: + traceback.print_exc() + + def store(self): + if frappe.cache().llen(MONITOR_REDIS_KEY) > MONITOR_MAX_ENTRIES: + frappe.cache().ltrim(MONITOR_REDIS_KEY, 1, -1) + serialized = json.dumps(self.data, sort_keys=True, default=str) + frappe.cache().rpush(MONITOR_REDIS_KEY, serialized) + + +def flush(): + try: + # Fetch all the logs without removing from cache + logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) + if logs: + logs = list(map(frappe.safe_decode, logs)) + with open(log_file(), "a", os.O_NONBLOCK) as f: + f.write("\n".join(logs)) + f.write("\n") + # Remove fetched entries from cache + frappe.cache().ltrim(MONITOR_REDIS_KEY, len(logs) - 1, -1) + except Exception: + traceback.print_exc() diff --git a/frappe/tests/test_monitor.py b/frappe/tests/test_monitor.py new file mode 100644 index 0000000000..b447e89b06 --- /dev/null +++ b/frappe/tests/test_monitor.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors +# MIT License. See license.txt + +from __future__ import unicode_literals +import unittest +import frappe +import frappe.monitor +from frappe.utils import set_request +from frappe.utils.response import build_response +from frappe.monitor import MONITOR_REDIS_KEY + + +class TestMonitor(unittest.TestCase): + def setUp(self): + frappe.conf.monitor = 1 + frappe.cache().delete_value(MONITOR_REDIS_KEY) + + def test_enable_monitor(self): + set_request(method="GET", path="/api/method/frappe.ping") + response = build_response("json") + + frappe.monitor.start() + frappe.monitor.stop(response) + + logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) + self.assertEqual(len(logs), 1) + + log = frappe.parse_json(logs[0].decode()) + self.assertTrue(log.duration) + self.assertTrue(log.site) + self.assertTrue(log.timestamp) + self.assertTrue(log.uuid) + self.assertTrue(log.request) + self.assertEqual(log.transaction_type, "request") + self.assertEqual(log.request["method"], "GET") + + def test_job(self): + frappe.utils.background_jobs.execute_job( + frappe.local.site, "frappe.ping", None, None, {}, is_async=False + ) + + logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) + self.assertEqual(len(logs), 1) + log = frappe.parse_json(logs[0].decode()) + self.assertEqual(log.transaction_type, "job") + self.assertTrue(log.job) + self.assertEqual(log.job["method"], "frappe.ping") + self.assertEqual(log.job["scheduled"], False) + self.assertEqual(log.job["wait"], 0) + + def test_flush(self): + set_request(method="GET", path="/api/method/frappe.ping") + response = build_response("json") + frappe.monitor.start() + frappe.monitor.stop(response) + + open(frappe.monitor.log_file(), "w").close() + frappe.monitor.flush() + + with open(frappe.monitor.log_file()) as f: + logs = f.readlines() + + self.assertEqual(len(logs), 1) + log = frappe.parse_json(logs[0]) + self.assertEqual(log.transaction_type, "request") + + def tearDown(self): + frappe.conf.monitor = 0 + frappe.cache().delete_value(MONITOR_REDIS_KEY) diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 11c710626b..03f063e058 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -9,6 +9,7 @@ import os, socket, time from frappe import _ from six import string_types from uuid import uuid4 +import frappe.monitor # imports - third-party imports @@ -94,6 +95,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, else: method_name = cstr(method.__name__) + frappe.monitor.start("job", method_name, kwargs) try: method(**kwargs) @@ -128,6 +130,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, frappe.db.commit() finally: + frappe.monitor.stop() if is_async: frappe.destroy() diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 4af59bceb2..b0c0990e85 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -140,6 +140,12 @@ class RedisWrapper(redis.Redis): def llen(self, key): return super(RedisWrapper, self).llen(self.make_key(key)) + def lrange(self, key, start, stop): + return super(RedisWrapper, self).lrange(self.make_key(key), start, stop) + + def ltrim(self, key, start, stop): + return super(RedisWrapper, self).ltrim(self.make_key(key), start, stop) + def hset(self, name, key, value, shared=False): if key is None: return