diff --git a/frappe/utils/telemetry/pulse/client.py b/frappe/utils/telemetry/pulse/client.py index 7228eb3559..6341e8262a 100644 --- a/frappe/utils/telemetry/pulse/client.py +++ b/frappe/utils/telemetry/pulse/client.py @@ -22,26 +22,23 @@ def is_enabled() -> bool: @frappe.whitelist() -def capture(event_name, site=None, app=None, user=None, properties=None, interval=None): +def capture(event_name, site=None, app=None, user=None, captured_at=None, properties=None, interval=None): if not is_enabled(): return try: - event_key = f"{event_name}:{site or None}:{app or None}:{user or None}" - if _is_ratelimited(event_key, interval): - return - - _queue_event( + eq = EventQueue() + eq.add( { "event_name": event_name, - "captured_at": utc_iso(), + "captured_at": captured_at or utc_iso(), "app": app, "user": anonymize_user(user), "site": site or frappe.local.site, "properties": properties, - } + }, + interval=interval, ) - _update_ratelimit(event_key, interval) except Exception as e: frappe.logger("pulse").error(f"pulse-client - capture failed: {e!s}") @@ -52,84 +49,23 @@ def bulk_capture(events): return for event in events: - try: - # not supporting rate-limiting for bulk events - # so queue all events as-is - _queue_event( - { - "event_name": event.get("event_name"), - "captured_at": event.get("captured_at") or utc_iso(), - "site": frappe.local.site, - "app": event.get("app"), - "user": anonymize_user(event.get("user")), - "properties": event.get("properties"), - } - ) - except Exception as e: - frappe.logger("pulse").error(f"pulse-client - bulk capture failed for event {event}: {e!s}") - - -def _is_ratelimited(event_key, interval): - if not interval: - return False - - interval_seconds = parse_interval(interval) - last_sent_key = f"pulse-client:last_sent:{event_key}" - last_sent = frappe.cache.get_value(last_sent_key) - - if last_sent and time.monotonic() - float(last_sent) < interval_seconds: - return True - - return False - - -def _update_ratelimit(event_key, interval): - if not interval: - return - last_sent_key = f"pulse-client:last_sent:{event_key}" - frappe.cache.set_value(last_sent_key, time.monotonic()) - - -def _queue_event(event): - frappe.cache.lpush("pulse-client:events", frappe.as_json(event)) - frappe.cache.ltrim("pulse-client:events", 0, 9999) - - -def queue_length(): - return frappe.cache.llen("pulse-client:events") + capture( + event.get("event_name"), + site=event.get("site"), + app=event.get("app"), + user=event.get("user"), + captured_at=event.get("captured_at"), + properties=event.get("properties"), + interval=event.get("interval"), + ) def send_queued_events(): - batch_size = 100 - max_batches = 10 - for _ in range(max_batches): - events = collect_events(batch_size) - if not events: - break - try: - if not post(events): - frappe.logger().error("Pulse sending events failed: non-2xx response") - except Exception as e: - frappe.logger().error(f"Pulse sending events failed: {e!s}") + if not is_enabled(): + return - -def collect_events(batch_size=100): - """Pop batch of events from the queue""" - events = [] - for _ in range(batch_size): - event_json = frappe.cache.rpop("pulse-client:events") - if not event_json: - break - data = decode_event(event_json) - if data: - events.append(data) - return events - - -def decode_event(event_json): - event_json = event_json.decode() - with suppress(JSONDecodeError): - return frappe.parse_json(event_json) + eq = EventQueue() + eq.batch_process(post, batch_size=100, max_batches=10) def post(events): @@ -138,7 +74,9 @@ def post(events): url = _get_ingest_url() data = frappe.as_json({"events": events}) resp = session.post(url, data=data, timeout=15) - return 200 <= resp.status_code < 300 + if not (200 <= resp.status_code < 300): + frappe.logger("pulse").error(f"pulse-client - post failed: {resp.status_code} {resp.text}") + return resp def _create_session(): @@ -164,6 +102,102 @@ def _get_ingest_url(): return f"{host}/{endpoint}" +class EventQueue: + def __init__(self): + self.queue = "pulse-client:events" + self.queue_size = 10000 + self.ratelimit_prefix = "pulse-client:last_sent:" + + @property + def length(self): + return frappe.cache.llen(self.queue) + + def add(self, event, interval=None): + if self._is_ratelimited(event, interval): + return + + self._queue_event(event) + self._update_ratelimit(event, interval) + + def _is_ratelimited(self, event, interval): + if not interval: + return False + + interval_seconds = parse_interval(interval) + event_key = self._get_event_key(event) + last_sent_key = f"{self.ratelimit_prefix}{event_key}" + last_sent = frappe.cache.get_value(last_sent_key) + + if last_sent and time.monotonic() - float(last_sent) < interval_seconds: + return True + + return False + + def _get_event_key(self, event): + return f"{event.get('event_name')}:{event.get('site')}:{event.get('app')}:{event.get('user')}" + + def _update_ratelimit(self, event, interval): + if not interval: + return + event_key = self._get_event_key(event) + last_sent_key = f"{self.ratelimit_prefix}{event_key}" + frappe.cache.set_value(last_sent_key, time.monotonic()) + + def _queue_event(self, event): + frappe.cache.lpush(self.queue, frappe.as_json(event)) + frappe.cache.ltrim(self.queue, 0, self.queue_size - 1) + + def batch_process(self, fn, batch_size=100, max_batches=10): + for _ in range(max_batches): + events = self.collect(batch_size) + if not events: + break + try: + fn(events) + except Exception as e: + frappe.logger("pulse").error(f"pulse-client - batch_process failed: {e!s}") + + def collect(self, batch_size=100): + events = [] + for _ in range(batch_size): + event_json = frappe.cache.rpop(self.queue) + if not event_json: + break + data = self._decode_event(event_json) + if data: + events.append(data) + return events + + def _decode_event(self, event_json): + event_json = event_json.decode() + with suppress(JSONDecodeError): + return frappe.parse_json(event_json) + + def get_events(self, limit=20): + events = [] + for _ in range(limit): + event_json = frappe.cache.lindex(self.queue, _) + if not event_json: + break + data = self._decode_event(event_json) + if data: + events.append(data) + return events + + def get_last_sent_events(self, limit=20): + events = [] + keys = frappe.cache.get_keys(f"{self.ratelimit_prefix}*")[:limit] + for key in keys: + last_sent = frappe.cache.get_value(key) + event_key = key.replace(self.ratelimit_prefix, "") + events.append( + { + "event_key": event_key, + "last_sent": last_sent, + } + ) + + @frappe.whitelist() def get_debug_info(fetch_events=None, fetch_rate_limited_events=None): frappe.only_for("System Manager") @@ -172,28 +206,15 @@ def get_debug_info(fetch_events=None, fetch_rate_limited_events=None): info.is_enabled = is_enabled() if info.is_enabled: - info.queued_event_count = queue_length() + eq = EventQueue() + info.queued_event_count = eq.length if fetch_events: - info.queued_events = [] limit = int(fetch_events) if str(fetch_events).isdigit() else 20 - for _ in range(min(limit, info.queued_event_count)): - event_json = frappe.cache.lindex("pulse-client:events", _) - data = decode_event(event_json) - if data: - info.queued_events.append(data) + info.queued_events = eq.get_events(limit) if fetch_rate_limited_events: - info.rate_limited_events = [] limit = int(fetch_rate_limited_events) if str(fetch_rate_limited_events).isdigit() else 20 - for key in frappe.cache.get_keys("pulse-client:last_sent:*")[:limit]: - last_sent = frappe.cache.get_value(key) - event_key = key.replace("pulse-client:last_sent:", "") - info.rate_limited_events.append( - { - "event_key": event_key, - "last_sent": last_sent, - } - ) + info.rate_limited_events = eq.get_last_sent_events(limit) return info