From 6ebe7fc26e1928d384b35636dc3f326b2ed089d8 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 30 Aug 2025 17:19:39 +0530 Subject: [PATCH] refactor: pulse client --- frappe/api/__init__.py | 4 +- frappe/hooks.py | 5 +- frappe/pulse/app_heartbeat_event.py | 92 ++++++++------------ frappe/pulse/client.py | 129 +++++++++++++++++++++------- frappe/pulse/utils.py | 97 +++++++++++++++++++++ 5 files changed, 234 insertions(+), 93 deletions(-) create mode 100644 frappe/pulse/utils.py diff --git a/frappe/api/__init__.py b/frappe/api/__init__.py index 7d7a665ce7..db7f96da50 100644 --- a/frappe/api/__init__.py +++ b/frappe/api/__init__.py @@ -9,7 +9,7 @@ from werkzeug.wrappers import Request, Response import frappe from frappe import _ -from frappe.pulse.app_heartbeat_event import log_app_heartbeat +from frappe.pulse.app_heartbeat_event import capture_app_heartbeat from frappe.utils.response import build_response @@ -67,7 +67,7 @@ def handle(request: Request): data = build_response("json") with suppress(Exception): - log_app_heartbeat(arguments) + capture_app_heartbeat(arguments) return data diff --git a/frappe/hooks.py b/frappe/hooks.py index 0ed8bc2042..67e7735a51 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -209,15 +209,12 @@ scheduler_events = { "frappe.automation.doctype.reminder.reminder.send_reminders", "frappe.model.utils.link_count.update_link_count", "frappe.search.sqlite_search.build_index_if_not_exists", + "frappe.pulse.client.send_queued_events", ], # 10 minutes "0/10 * * * *": [ "frappe.email.doctype.email_account.email_account.pull", ], - # 6 hours - "0 */6 * * *": [ - "frappe.pulse.app_heartbeat_event.send", - ], # Hourly but offset by 30 minutes "30 * * * *": [], # Daily but offset by 45 minutes diff --git a/frappe/pulse/app_heartbeat_event.py b/frappe/pulse/app_heartbeat_event.py index 4a5e496d8e..76a7b63327 100644 --- a/frappe/pulse/app_heartbeat_event.py +++ b/frappe/pulse/app_heartbeat_event.py @@ -1,27 +1,54 @@ import frappe from frappe.modules import get_doctype_module +from frappe.pulse.utils import get_app_version, get_frappe_version from frappe.utils.caching import site_cache -from .client import is_enabled, post_events +from .client import capture, is_enabled KEY = "pulse:active_apps" EXPIRY = 60 * 60 * 12 # 12 hours -def log_app_heartbeat(req_params): - if not is_enabled() or frappe.session.user in ("Guest", "Administrator"): +def capture_app_heartbeat(req_params): + if not should_capture(): return - status_code = frappe.response.http_status_code or 0 - if status_code and not (200 <= status_code < 300): - return - - method = req_params.get("method") or frappe.form_dict.get("method") - doctype = req_params.get("doctype") or frappe.form_dict.get("doctype") - + method, doctype = get_method_and_doctype(req_params) if not method and not doctype: return + app_name = get_app_name(method, doctype) + if app_name and app_name != "frappe": + capture( + event_name="app_heartbeat", + site=frappe.local.site, + app=app_name, + properties={ + "app_version": get_app_version(app_name), + "frappe_version": get_frappe_version(), + }, + interval="6h", + ) + + +def should_capture(): + if not is_enabled() or frappe.session.user in frappe.STANDARD_USERS: + return False + + status_code = frappe.response.http_status_code or 0 + if status_code and not (200 <= status_code < 300): + return False + + return True + + +def get_method_and_doctype(req_params): + method = req_params.get("method") or frappe.form_dict.get("method") + doctype = req_params.get("doctype") or frappe.form_dict.get("doctype") + return method, doctype + + +def get_app_name(method, doctype): app_name = None if method and "." in method and not method.startswith("frappe."): app_name = method.split(".", 1)[0] @@ -30,50 +57,7 @@ def log_app_heartbeat(req_params): module = get_doctype_module(doctype) app_name = app_module_map().get(module) - if app_name and app_name != "frappe": - _mark_active(app_name) - - -def send(): - if not is_enabled(): - return - - active_apps = frappe.cache.get_value(KEY) or set() - if not active_apps: - return - - events = [] - for app in active_apps: - events.append( - { - "name": "app_heartbeat", - "app": app, - "app_version": _get_app_version(app), - } - ) - - try: - if post_events(events): - frappe.cache.delete_value(KEY) - except Exception: - frappe.log_error(title="Failed to send app heartbeat events") - - -def _mark_active(app): - active_apps = frappe.cache.get_value(KEY) or set() - if app not in active_apps: - active_apps.add(app) - frappe.cache.set_value(KEY, active_apps) - ttl = frappe.cache.ttl(KEY) - if ttl in (-1, None): - frappe.cache.expire(KEY, EXPIRY) - - -def _get_app_version(app_name: str) -> str: - try: - return frappe.get_attr(app_name + ".__version__") - except Exception: - return "0.0.1" + return app_name @site_cache() diff --git a/frappe/pulse/client.py b/frappe/pulse/client.py index 343a95e100..e2edcdc97d 100644 --- a/frappe/pulse/client.py +++ b/frappe/pulse/client.py @@ -1,6 +1,10 @@ -from datetime import datetime, timezone +import time +from contextlib import suppress + +from orjson import JSONDecodeError import frappe +from frappe.pulse.utils import anonymize_user, ensure_http, parse_interval, utc_iso from frappe.utils import get_request_session from frappe.utils.caching import site_cache from frappe.utils.frappecloud import on_frappecloud @@ -17,52 +21,111 @@ def is_enabled() -> bool: ) -def post_events(events): - events = _sanitize_events(events) +def capture(event_name, site=None, app=None, user=None, properties=None, interval=None): + if not is_enabled(): + return + + try: + event_key = f"{event_name}:{site}:{app}:{user}" + if _is_ratelimited(event_key, interval): + return + + _queue_event( + { + "event_name": event_name, + "captured_at": utc_iso(), + "app": app, + "user": anonymize_user(user), + "site": site or frappe.local.site, + "properties": properties, + } + ) + _update_ratelimit(event_key, interval) + except Exception as e: + frappe.logger().error(f"Pulse event capture failed: {e!s}") + + +def _is_ratelimited(event_key, interval): + if not interval: + return False + + interval_seconds = parse_interval(interval) + last_sent_key = f"pulse:last_sent:{event_key}" + last_sent = frappe.cache.get_value(last_sent_key) + + if last_sent and time.monotonic() - float(last_sent) < interval_seconds: + return True + + return False + + +def _update_ratelimit(event_key, interval): + if not interval: + return + last_sent_key = f"pulse:last_sent:{event_key}" + frappe.cache.set_value(last_sent_key, time.monotonic(), expires_in_sec=86400) # 24h TTL + + +def _queue_event(event): + frappe.cache.lpush("pulse:events", frappe.as_json(event)) + frappe.cache.ltrim("pulse:events", 0, 4999) + + +def send_queued_events(): + batch_size = 100 + max_batches = 10 + for _ in range(max_batches): + events = get_next_batch(batch_size) + if not events: + break + try: + if not post(events): + frappe.logger().error("Pulse sending events failed: non-2xx response") + except Exception as e: + frappe.logger().error(f"Pulse sending events failed: {e!s}") + + +def get_next_batch(batch_size=100): + """Get batch of events from the queue""" + events = [] + for _ in range(batch_size): + event_json = frappe.cache.rpop("pulse:events") + if not event_json: + break + event_json = event_json.decode() + with suppress(JSONDecodeError): + data = frappe.parse_json(event_json) + events.append(data) + return events + + +def post(events): + # TODO: implement retry logic session = _create_session() - resp = session.post(_get_ingest_url(), data=events, timeout=5.0) + url = _get_ingest_url() + data = frappe.as_json({"events": events}) + resp = session.post(url, data=data, timeout=15) return 200 <= resp.status_code < 300 def _create_session(): api_key = frappe.conf.get("pulse_api_key") session = get_request_session() - if api_key: - session.headers.update({"Authorization": f"Bearer {api_key}"}) + session.headers.update( + { + "Content-Type": "application/json", + "X-Pulse-API-Key": api_key, + } + ) return session def _get_ingest_url(): host = frappe.conf.get("pulse_host") or "https://pulse.m.frappe.cloud" - if not host.startswith("http"): - host = "https://" + host + host = ensure_http(host) host = host.rstrip("/") - endpoint = frappe.conf.get("pulse_ingest_endpoint") or "/api/method/pulse.api.ingest" + endpoint = frappe.conf.get("pulse_ingest_endpoint") or "/api/method/pulse.api.bulk_ingest" endpoint = endpoint.lstrip("/") return f"{host}/{endpoint}" - - -def _sanitize_events(events): - _events = [] - if not isinstance(events, list): - _events = [events] - - for event in events: - if not isinstance(event, dict) or "name" not in event: - continue - event["site"] = event["site"] or frappe.local.site - event["timestamp"] = event["timestamp"] or _utc_iso() - event["frappe_version"] = event["frappe_version"] or _get_frappe_version() - _events.append(event) - - return _events - - -def _get_frappe_version() -> str: - return getattr(frappe, "__version__", "unknown") - - -def _utc_iso() -> str: - return datetime.now(timezone.utc).isoformat(timespec="seconds") diff --git a/frappe/pulse/utils.py b/frappe/pulse/utils.py new file mode 100644 index 0000000000..fd68a93a01 --- /dev/null +++ b/frappe/pulse/utils.py @@ -0,0 +1,97 @@ +import hashlib +from datetime import datetime, timezone + +import frappe + + +def anonymize_user(user): + """ + Create consistent anonymous ID from user email. + Same email always produces same anonymous ID. + """ + if not user or user in frappe.STANDARD_USERS: + return user + + # Use site-specific salt for additional security + site_salt = frappe.local.site or "default" + + # Create deterministic hash + hash_input = f"{user}:{site_salt}".encode() + user_hash = hashlib.sha256(hash_input).hexdigest() + + # Return first 12 characters for readability + return f"anon_{user_hash[:12]}" + + +def parse_interval(interval): + """ + Parse interval string or integer into seconds. + + Args: + interval: Can be: + - Integer: seconds (e.g., 3600) + - String: number + unit (e.g., "1h", "30m", "7d") + + Returns: + int: Total seconds + + Examples: + parse_interval(3600) -> 3600 + parse_interval("1h") -> 3600 + parse_interval("30m") -> 1800 + parse_interval("7d") -> 604800 + """ + if interval is None: + return None + + # If already an integer, return as-is (assuming seconds) + if isinstance(interval, int): + return interval + + # Parse string format + interval = str(interval).strip().lower() + + # Extract number and unit + if interval[-1].isdigit(): + # No unit specified, assume seconds + return int(interval) + + unit = interval[-1] + try: + number = int(interval[:-1]) + except ValueError: + raise ValueError(f"Invalid interval format: {interval}") + + # Convert to seconds + multipliers = { + "s": 1, # seconds + "m": 60, # minutes + "h": 3600, # hours + "d": 86400, # days + "w": 604800, # weeks + "y": 31536000, # years + } + + if unit not in multipliers: + raise ValueError(f"Invalid time unit '{unit}'. Use: s, m, h, d, w, y") + + return number * multipliers[unit] + + +def get_frappe_version() -> str: + return getattr(frappe, "__version__", "unknown") + + +def utc_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def get_app_version(app_name: str) -> str: + try: + return frappe.get_attr(app_name + ".__version__") + except Exception: + return "0.0.1" + + +def ensure_http(url: str) -> str: + return url if url.startswith(("http://", "https://")) else "https://" + url