refactor: create event queue class

This commit is contained in:
Saqib Ansari 2026-01-04 12:39:08 +05:30
parent 3ae959c651
commit f3cc4301f8

View file

@ -22,26 +22,23 @@ def is_enabled() -> bool:
@frappe.whitelist()
def capture(event_name, site=None, app=None, user=None, properties=None, interval=None):
def capture(event_name, site=None, app=None, user=None, captured_at=None, properties=None, interval=None):
if not is_enabled():
return
try:
event_key = f"{event_name}:{site or None}:{app or None}:{user or None}"
if _is_ratelimited(event_key, interval):
return
_queue_event(
eq = EventQueue()
eq.add(
{
"event_name": event_name,
"captured_at": utc_iso(),
"captured_at": captured_at or utc_iso(),
"app": app,
"user": anonymize_user(user),
"site": site or frappe.local.site,
"properties": properties,
}
},
interval=interval,
)
_update_ratelimit(event_key, interval)
except Exception as e:
frappe.logger("pulse").error(f"pulse-client - capture failed: {e!s}")
@ -52,84 +49,23 @@ def bulk_capture(events):
return
for event in events:
try:
# not supporting rate-limiting for bulk events
# so queue all events as-is
_queue_event(
{
"event_name": event.get("event_name"),
"captured_at": event.get("captured_at") or utc_iso(),
"site": frappe.local.site,
"app": event.get("app"),
"user": anonymize_user(event.get("user")),
"properties": event.get("properties"),
}
)
except Exception as e:
frappe.logger("pulse").error(f"pulse-client - bulk capture failed for event {event}: {e!s}")
def _is_ratelimited(event_key, interval):
if not interval:
return False
interval_seconds = parse_interval(interval)
last_sent_key = f"pulse-client:last_sent:{event_key}"
last_sent = frappe.cache.get_value(last_sent_key)
if last_sent and time.monotonic() - float(last_sent) < interval_seconds:
return True
return False
def _update_ratelimit(event_key, interval):
if not interval:
return
last_sent_key = f"pulse-client:last_sent:{event_key}"
frappe.cache.set_value(last_sent_key, time.monotonic())
def _queue_event(event):
frappe.cache.lpush("pulse-client:events", frappe.as_json(event))
frappe.cache.ltrim("pulse-client:events", 0, 9999)
def queue_length():
return frappe.cache.llen("pulse-client:events")
capture(
event.get("event_name"),
site=event.get("site"),
app=event.get("app"),
user=event.get("user"),
captured_at=event.get("captured_at"),
properties=event.get("properties"),
interval=event.get("interval"),
)
def send_queued_events():
batch_size = 100
max_batches = 10
for _ in range(max_batches):
events = collect_events(batch_size)
if not events:
break
try:
if not post(events):
frappe.logger().error("Pulse sending events failed: non-2xx response")
except Exception as e:
frappe.logger().error(f"Pulse sending events failed: {e!s}")
if not is_enabled():
return
def collect_events(batch_size=100):
"""Pop batch of events from the queue"""
events = []
for _ in range(batch_size):
event_json = frappe.cache.rpop("pulse-client:events")
if not event_json:
break
data = decode_event(event_json)
if data:
events.append(data)
return events
def decode_event(event_json):
event_json = event_json.decode()
with suppress(JSONDecodeError):
return frappe.parse_json(event_json)
eq = EventQueue()
eq.batch_process(post, batch_size=100, max_batches=10)
def post(events):
@ -138,7 +74,9 @@ def post(events):
url = _get_ingest_url()
data = frappe.as_json({"events": events})
resp = session.post(url, data=data, timeout=15)
return 200 <= resp.status_code < 300
if not (200 <= resp.status_code < 300):
frappe.logger("pulse").error(f"pulse-client - post failed: {resp.status_code} {resp.text}")
return resp
def _create_session():
@ -164,6 +102,102 @@ def _get_ingest_url():
return f"{host}/{endpoint}"
class EventQueue:
def __init__(self):
self.queue = "pulse-client:events"
self.queue_size = 10000
self.ratelimit_prefix = "pulse-client:last_sent:"
@property
def length(self):
return frappe.cache.llen(self.queue)
def add(self, event, interval=None):
if self._is_ratelimited(event, interval):
return
self._queue_event(event)
self._update_ratelimit(event, interval)
def _is_ratelimited(self, event, interval):
if not interval:
return False
interval_seconds = parse_interval(interval)
event_key = self._get_event_key(event)
last_sent_key = f"{self.ratelimit_prefix}{event_key}"
last_sent = frappe.cache.get_value(last_sent_key)
if last_sent and time.monotonic() - float(last_sent) < interval_seconds:
return True
return False
def _get_event_key(self, event):
return f"{event.get('event_name')}:{event.get('site')}:{event.get('app')}:{event.get('user')}"
def _update_ratelimit(self, event, interval):
if not interval:
return
event_key = self._get_event_key(event)
last_sent_key = f"{self.ratelimit_prefix}{event_key}"
frappe.cache.set_value(last_sent_key, time.monotonic())
def _queue_event(self, event):
frappe.cache.lpush(self.queue, frappe.as_json(event))
frappe.cache.ltrim(self.queue, 0, self.queue_size - 1)
def batch_process(self, fn, batch_size=100, max_batches=10):
for _ in range(max_batches):
events = self.collect(batch_size)
if not events:
break
try:
fn(events)
except Exception as e:
frappe.logger("pulse").error(f"pulse-client - batch_process failed: {e!s}")
def collect(self, batch_size=100):
events = []
for _ in range(batch_size):
event_json = frappe.cache.rpop(self.queue)
if not event_json:
break
data = self._decode_event(event_json)
if data:
events.append(data)
return events
def _decode_event(self, event_json):
event_json = event_json.decode()
with suppress(JSONDecodeError):
return frappe.parse_json(event_json)
def get_events(self, limit=20):
events = []
for _ in range(limit):
event_json = frappe.cache.lindex(self.queue, _)
if not event_json:
break
data = self._decode_event(event_json)
if data:
events.append(data)
return events
def get_last_sent_events(self, limit=20):
events = []
keys = frappe.cache.get_keys(f"{self.ratelimit_prefix}*")[:limit]
for key in keys:
last_sent = frappe.cache.get_value(key)
event_key = key.replace(self.ratelimit_prefix, "")
events.append(
{
"event_key": event_key,
"last_sent": last_sent,
}
)
@frappe.whitelist()
def get_debug_info(fetch_events=None, fetch_rate_limited_events=None):
frappe.only_for("System Manager")
@ -172,28 +206,15 @@ def get_debug_info(fetch_events=None, fetch_rate_limited_events=None):
info.is_enabled = is_enabled()
if info.is_enabled:
info.queued_event_count = queue_length()
eq = EventQueue()
info.queued_event_count = eq.length
if fetch_events:
info.queued_events = []
limit = int(fetch_events) if str(fetch_events).isdigit() else 20
for _ in range(min(limit, info.queued_event_count)):
event_json = frappe.cache.lindex("pulse-client:events", _)
data = decode_event(event_json)
if data:
info.queued_events.append(data)
info.queued_events = eq.get_events(limit)
if fetch_rate_limited_events:
info.rate_limited_events = []
limit = int(fetch_rate_limited_events) if str(fetch_rate_limited_events).isdigit() else 20
for key in frappe.cache.get_keys("pulse-client:last_sent:*")[:limit]:
last_sent = frappe.cache.get_value(key)
event_key = key.replace("pulse-client:last_sent:", "")
info.rate_limited_events.append(
{
"event_key": event_key,
"last_sent": last_sent,
}
)
info.rate_limited_events = eq.get_last_sent_events(limit)
return info