refactor: pulse client

This commit is contained in:
Saqib Ansari 2025-08-30 17:19:39 +05:30
parent 7f0765fc9e
commit 6ebe7fc26e
5 changed files with 234 additions and 93 deletions

View file

@ -9,7 +9,7 @@ from werkzeug.wrappers import Request, Response
import frappe
from frappe import _
from frappe.pulse.app_heartbeat_event import log_app_heartbeat
from frappe.pulse.app_heartbeat_event import capture_app_heartbeat
from frappe.utils.response import build_response
@ -67,7 +67,7 @@ def handle(request: Request):
data = build_response("json")
with suppress(Exception):
log_app_heartbeat(arguments)
capture_app_heartbeat(arguments)
return data

View file

@ -209,15 +209,12 @@ scheduler_events = {
"frappe.automation.doctype.reminder.reminder.send_reminders",
"frappe.model.utils.link_count.update_link_count",
"frappe.search.sqlite_search.build_index_if_not_exists",
"frappe.pulse.client.send_queued_events",
],
# 10 minutes
"0/10 * * * *": [
"frappe.email.doctype.email_account.email_account.pull",
],
# 6 hours
"0 */6 * * *": [
"frappe.pulse.app_heartbeat_event.send",
],
# Hourly but offset by 30 minutes
"30 * * * *": [],
# Daily but offset by 45 minutes

View file

@ -1,27 +1,54 @@
import frappe
from frappe.modules import get_doctype_module
from frappe.pulse.utils import get_app_version, get_frappe_version
from frappe.utils.caching import site_cache
from .client import is_enabled, post_events
from .client import capture, is_enabled
KEY = "pulse:active_apps"
EXPIRY = 60 * 60 * 12 # 12 hours
def log_app_heartbeat(req_params):
if not is_enabled() or frappe.session.user in ("Guest", "Administrator"):
def capture_app_heartbeat(req_params):
if not should_capture():
return
status_code = frappe.response.http_status_code or 0
if status_code and not (200 <= status_code < 300):
return
method = req_params.get("method") or frappe.form_dict.get("method")
doctype = req_params.get("doctype") or frappe.form_dict.get("doctype")
method, doctype = get_method_and_doctype(req_params)
if not method and not doctype:
return
app_name = get_app_name(method, doctype)
if app_name and app_name != "frappe":
capture(
event_name="app_heartbeat",
site=frappe.local.site,
app=app_name,
properties={
"app_version": get_app_version(app_name),
"frappe_version": get_frappe_version(),
},
interval="6h",
)
def should_capture():
if not is_enabled() or frappe.session.user in frappe.STANDARD_USERS:
return False
status_code = frappe.response.http_status_code or 0
if status_code and not (200 <= status_code < 300):
return False
return True
def get_method_and_doctype(req_params):
method = req_params.get("method") or frappe.form_dict.get("method")
doctype = req_params.get("doctype") or frappe.form_dict.get("doctype")
return method, doctype
def get_app_name(method, doctype):
app_name = None
if method and "." in method and not method.startswith("frappe."):
app_name = method.split(".", 1)[0]
@ -30,50 +57,7 @@ def log_app_heartbeat(req_params):
module = get_doctype_module(doctype)
app_name = app_module_map().get(module)
if app_name and app_name != "frappe":
_mark_active(app_name)
def send():
if not is_enabled():
return
active_apps = frappe.cache.get_value(KEY) or set()
if not active_apps:
return
events = []
for app in active_apps:
events.append(
{
"name": "app_heartbeat",
"app": app,
"app_version": _get_app_version(app),
}
)
try:
if post_events(events):
frappe.cache.delete_value(KEY)
except Exception:
frappe.log_error(title="Failed to send app heartbeat events")
def _mark_active(app):
active_apps = frappe.cache.get_value(KEY) or set()
if app not in active_apps:
active_apps.add(app)
frappe.cache.set_value(KEY, active_apps)
ttl = frappe.cache.ttl(KEY)
if ttl in (-1, None):
frappe.cache.expire(KEY, EXPIRY)
def _get_app_version(app_name: str) -> str:
try:
return frappe.get_attr(app_name + ".__version__")
except Exception:
return "0.0.1"
return app_name
@site_cache()

View file

@ -1,6 +1,10 @@
from datetime import datetime, timezone
import time
from contextlib import suppress
from orjson import JSONDecodeError
import frappe
from frappe.pulse.utils import anonymize_user, ensure_http, parse_interval, utc_iso
from frappe.utils import get_request_session
from frappe.utils.caching import site_cache
from frappe.utils.frappecloud import on_frappecloud
@ -17,52 +21,111 @@ def is_enabled() -> bool:
)
def post_events(events):
events = _sanitize_events(events)
def capture(event_name, site=None, app=None, user=None, properties=None, interval=None):
if not is_enabled():
return
try:
event_key = f"{event_name}:{site}:{app}:{user}"
if _is_ratelimited(event_key, interval):
return
_queue_event(
{
"event_name": event_name,
"captured_at": utc_iso(),
"app": app,
"user": anonymize_user(user),
"site": site or frappe.local.site,
"properties": properties,
}
)
_update_ratelimit(event_key, interval)
except Exception as e:
frappe.logger().error(f"Pulse event capture failed: {e!s}")
def _is_ratelimited(event_key, interval):
if not interval:
return False
interval_seconds = parse_interval(interval)
last_sent_key = f"pulse:last_sent:{event_key}"
last_sent = frappe.cache.get_value(last_sent_key)
if last_sent and time.monotonic() - float(last_sent) < interval_seconds:
return True
return False
def _update_ratelimit(event_key, interval):
if not interval:
return
last_sent_key = f"pulse:last_sent:{event_key}"
frappe.cache.set_value(last_sent_key, time.monotonic(), expires_in_sec=86400) # 24h TTL
def _queue_event(event):
frappe.cache.lpush("pulse:events", frappe.as_json(event))
frappe.cache.ltrim("pulse:events", 0, 4999)
def send_queued_events():
batch_size = 100
max_batches = 10
for _ in range(max_batches):
events = get_next_batch(batch_size)
if not events:
break
try:
if not post(events):
frappe.logger().error("Pulse sending events failed: non-2xx response")
except Exception as e:
frappe.logger().error(f"Pulse sending events failed: {e!s}")
def get_next_batch(batch_size=100):
"""Get batch of events from the queue"""
events = []
for _ in range(batch_size):
event_json = frappe.cache.rpop("pulse:events")
if not event_json:
break
event_json = event_json.decode()
with suppress(JSONDecodeError):
data = frappe.parse_json(event_json)
events.append(data)
return events
def post(events):
# TODO: implement retry logic
session = _create_session()
resp = session.post(_get_ingest_url(), data=events, timeout=5.0)
url = _get_ingest_url()
data = frappe.as_json({"events": events})
resp = session.post(url, data=data, timeout=15)
return 200 <= resp.status_code < 300
def _create_session():
api_key = frappe.conf.get("pulse_api_key")
session = get_request_session()
if api_key:
session.headers.update({"Authorization": f"Bearer {api_key}"})
session.headers.update(
{
"Content-Type": "application/json",
"X-Pulse-API-Key": api_key,
}
)
return session
def _get_ingest_url():
host = frappe.conf.get("pulse_host") or "https://pulse.m.frappe.cloud"
if not host.startswith("http"):
host = "https://" + host
host = ensure_http(host)
host = host.rstrip("/")
endpoint = frappe.conf.get("pulse_ingest_endpoint") or "/api/method/pulse.api.ingest"
endpoint = frappe.conf.get("pulse_ingest_endpoint") or "/api/method/pulse.api.bulk_ingest"
endpoint = endpoint.lstrip("/")
return f"{host}/{endpoint}"
def _sanitize_events(events):
_events = []
if not isinstance(events, list):
_events = [events]
for event in events:
if not isinstance(event, dict) or "name" not in event:
continue
event["site"] = event["site"] or frappe.local.site
event["timestamp"] = event["timestamp"] or _utc_iso()
event["frappe_version"] = event["frappe_version"] or _get_frappe_version()
_events.append(event)
return _events
def _get_frappe_version() -> str:
return getattr(frappe, "__version__", "unknown")
def _utc_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")

97
frappe/pulse/utils.py Normal file
View file

@ -0,0 +1,97 @@
import hashlib
from datetime import datetime, timezone
import frappe
def anonymize_user(user):
"""
Create consistent anonymous ID from user email.
Same email always produces same anonymous ID.
"""
if not user or user in frappe.STANDARD_USERS:
return user
# Use site-specific salt for additional security
site_salt = frappe.local.site or "default"
# Create deterministic hash
hash_input = f"{user}:{site_salt}".encode()
user_hash = hashlib.sha256(hash_input).hexdigest()
# Return first 12 characters for readability
return f"anon_{user_hash[:12]}"
def parse_interval(interval):
"""
Parse interval string or integer into seconds.
Args:
interval: Can be:
- Integer: seconds (e.g., 3600)
- String: number + unit (e.g., "1h", "30m", "7d")
Returns:
int: Total seconds
Examples:
parse_interval(3600) -> 3600
parse_interval("1h") -> 3600
parse_interval("30m") -> 1800
parse_interval("7d") -> 604800
"""
if interval is None:
return None
# If already an integer, return as-is (assuming seconds)
if isinstance(interval, int):
return interval
# Parse string format
interval = str(interval).strip().lower()
# Extract number and unit
if interval[-1].isdigit():
# No unit specified, assume seconds
return int(interval)
unit = interval[-1]
try:
number = int(interval[:-1])
except ValueError:
raise ValueError(f"Invalid interval format: {interval}")
# Convert to seconds
multipliers = {
"s": 1, # seconds
"m": 60, # minutes
"h": 3600, # hours
"d": 86400, # days
"w": 604800, # weeks
"y": 31536000, # years
}
if unit not in multipliers:
raise ValueError(f"Invalid time unit '{unit}'. Use: s, m, h, d, w, y")
return number * multipliers[unit]
def get_frappe_version() -> str:
return getattr(frappe, "__version__", "unknown")
def utc_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def get_app_version(app_name: str) -> str:
try:
return frappe.get_attr(app_name + ".__version__")
except Exception:
return "0.0.1"
def ensure_http(url: str) -> str:
return url if url.startswith(("http://", "https://")) else "https://" + url