397 lines
9.9 KiB
Python
397 lines
9.9 KiB
Python
import time
|
|
from unittest.mock import patch
|
|
|
|
import frappe
|
|
from frappe.tests import IntegrationTestCase
|
|
from frappe.utils.telemetry.pulse.client import EventQueue, capture, is_enabled
|
|
from frappe.utils.telemetry.pulse.utils import anonymize_user, parse_interval
|
|
|
|
|
|
class TestPulseClient(IntegrationTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
# Clear any existing events from queue
|
|
eq = EventQueue()
|
|
while eq.length > 0:
|
|
eq.collect(batch_size=1000)
|
|
frappe.cache.delete_keys("pulse-client:")
|
|
|
|
def tearDown(self):
|
|
# Clean up after tests
|
|
eq = EventQueue()
|
|
while eq.length > 0:
|
|
eq.collect(batch_size=1000)
|
|
frappe.cache.delete_keys("pulse-client:")
|
|
super().tearDown()
|
|
|
|
|
|
class TestEventQueue(TestPulseClient):
|
|
def test_queue_operations(self):
|
|
"""Test queue add, collect, and FIFO behavior"""
|
|
eq = EventQueue()
|
|
|
|
# Add events
|
|
for i in range(10):
|
|
event = {
|
|
"event_name": f"test_event_{i}",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
self.assertEqual(eq.length, 10)
|
|
|
|
# Collect events (FIFO order)
|
|
events = eq.collect(batch_size=5)
|
|
self.assertEqual(len(events), 5)
|
|
self.assertEqual(eq.length, 5)
|
|
self.assertEqual(events[0]["event_name"], "test_event_0")
|
|
|
|
def test_queue_size_limit(self):
|
|
"""Test that queue respects size limit"""
|
|
eq = EventQueue()
|
|
queue_size = eq.queue_size
|
|
|
|
# Add more events than the queue size
|
|
for i in range(queue_size + 100):
|
|
event = {
|
|
"event_name": f"test_event_{i}",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
# Queue should not exceed max size
|
|
self.assertEqual(eq.length, queue_size)
|
|
|
|
def test_requeue_events(self):
|
|
"""Test requeueing events preserves order"""
|
|
eq = EventQueue()
|
|
|
|
# Add events
|
|
event_names = ["event_1", "event_2", "event_3"]
|
|
for name in event_names:
|
|
event = {
|
|
"event_name": name,
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
# Collect and requeue
|
|
events = eq.collect(batch_size=3)
|
|
eq._requeue_events(events)
|
|
|
|
# Check order is preserved
|
|
requeued = eq.collect(batch_size=3)
|
|
for i, event in enumerate(requeued):
|
|
self.assertEqual(event["event_name"], event_names[i])
|
|
|
|
|
|
class TestRateLimiting(TestPulseClient):
|
|
def test_ratelimit_basic(self):
|
|
"""Test basic rate limiting functionality"""
|
|
eq = EventQueue()
|
|
|
|
event = {
|
|
"event_name": "test_event",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
|
|
# First event should be added
|
|
eq.add(event, interval="5s")
|
|
self.assertEqual(eq.length, 1)
|
|
|
|
# Second event should be rate-limited
|
|
eq.add(event, interval="5s")
|
|
self.assertEqual(eq.length, 1)
|
|
|
|
def test_ratelimit_different_events(self):
|
|
"""Test that rate limiting is per-event"""
|
|
eq = EventQueue()
|
|
|
|
event1 = {
|
|
"event_name": "event_1",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
|
|
event2 = {
|
|
"event_name": "event_2",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
|
|
# Both events should be added as they are different
|
|
eq.add(event1, interval="5s")
|
|
eq.add(event2, interval="5s")
|
|
self.assertEqual(eq.length, 2)
|
|
|
|
def test_ratelimit_expiry(self):
|
|
"""Test that rate limit expires after interval"""
|
|
eq = EventQueue()
|
|
|
|
event = {
|
|
"event_name": "test_event",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
|
|
# Add event with short interval
|
|
eq.add(event, interval="1s")
|
|
self.assertEqual(eq.length, 1)
|
|
|
|
# Wait for interval to expire
|
|
time.sleep(1.1)
|
|
|
|
# Event should be added again
|
|
eq.add(event, interval="1s")
|
|
self.assertEqual(eq.length, 2)
|
|
|
|
|
|
class TestBatchProcessing(TestPulseClient):
|
|
def test_batch_process_success(self):
|
|
"""Test successful batch processing"""
|
|
eq = EventQueue()
|
|
processed = []
|
|
|
|
def process_fn(events):
|
|
processed.extend(events)
|
|
|
|
# Add events
|
|
for i in range(15):
|
|
event = {
|
|
"event_name": f"test_event_{i}",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
# Process in batches
|
|
eq.batch_process(process_fn, batch_size=10, max_batches=2)
|
|
|
|
# All events should be processed
|
|
self.assertEqual(len(processed), 15)
|
|
self.assertEqual(eq.length, 0)
|
|
|
|
def test_batch_process_with_failure_and_retry(self):
|
|
"""Test batch processing with failure and retry"""
|
|
eq = EventQueue()
|
|
call_count = 0
|
|
|
|
def failing_fn(events):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count < 3:
|
|
raise Exception("Temporary failure")
|
|
return True
|
|
|
|
# Add events
|
|
for i in range(5):
|
|
event = {
|
|
"event_name": f"test_event_{i}",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
# Process with retries
|
|
eq.batch_process(failing_fn, batch_size=10, max_retries=5, backoff_seconds=0.1)
|
|
|
|
# Should succeed after retries
|
|
self.assertGreaterEqual(call_count, 3)
|
|
self.assertEqual(eq.length, 0)
|
|
|
|
def test_batch_process_max_retries_exceeded(self):
|
|
"""Test batch processing when max retries is exceeded"""
|
|
eq = EventQueue()
|
|
|
|
def always_failing_fn(events):
|
|
raise Exception("Always fails")
|
|
|
|
# Add events
|
|
for i in range(5):
|
|
event = {
|
|
"event_name": f"test_event_{i}",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
eq.add(event)
|
|
|
|
# Process with limited retries
|
|
eq.batch_process(always_failing_fn, batch_size=10, max_retries=2, backoff_seconds=0.1)
|
|
|
|
# Events should be requeued
|
|
self.assertEqual(eq.length, 5)
|
|
|
|
|
|
class TestCapture(TestPulseClient):
|
|
@patch("frappe.utils.telemetry.pulse.client.is_enabled")
|
|
def test_capture_when_disabled(self, mock_enabled):
|
|
"""Test that capture does nothing when disabled"""
|
|
is_enabled.clear_cache()
|
|
mock_enabled.return_value = False
|
|
eq = EventQueue()
|
|
|
|
capture("test_event", site="test.localhost")
|
|
|
|
self.assertEqual(eq.length, 0)
|
|
|
|
@patch("frappe.utils.telemetry.pulse.client.is_enabled")
|
|
def test_capture_basic(self, mock_enabled):
|
|
"""Test basic event capture"""
|
|
is_enabled.clear_cache()
|
|
mock_enabled.return_value = True
|
|
eq = EventQueue()
|
|
|
|
capture(
|
|
"test_event",
|
|
site="test.localhost",
|
|
app="frappe",
|
|
user="test@example.com",
|
|
properties={"key": "value"},
|
|
)
|
|
|
|
self.assertEqual(eq.length, 1)
|
|
events = eq.collect(batch_size=1)
|
|
self.assertEqual(events[0]["event_name"], "test_event")
|
|
self.assertEqual(events[0]["properties"]["key"], "value")
|
|
|
|
@patch("frappe.utils.telemetry.pulse.client.is_enabled")
|
|
def test_capture_anonymizes_user(self, mock_enabled):
|
|
"""Test that user is anonymized"""
|
|
is_enabled.clear_cache()
|
|
mock_enabled.return_value = True
|
|
eq = EventQueue()
|
|
|
|
test_user = "test@example.com"
|
|
capture("test_event", site="test.localhost", user=test_user)
|
|
|
|
events = eq.collect(batch_size=1)
|
|
# User should be anonymized
|
|
self.assertNotEqual(events[0]["user"], test_user)
|
|
self.assertTrue(events[0]["user"].startswith("anon_"))
|
|
|
|
|
|
class TestUtils(TestPulseClient):
|
|
def test_parse_interval(self):
|
|
"""Test parsing various interval formats"""
|
|
# Seconds
|
|
self.assertEqual(parse_interval(60), 60)
|
|
self.assertEqual(parse_interval("60"), 60)
|
|
|
|
# Minutes, hours, days, weeks
|
|
self.assertEqual(parse_interval("1m"), 60)
|
|
self.assertEqual(parse_interval("1h"), 3600)
|
|
self.assertEqual(parse_interval("1d"), 86400)
|
|
self.assertEqual(parse_interval("1w"), 604800)
|
|
|
|
# Invalid formats
|
|
with self.assertRaises(ValueError):
|
|
parse_interval("1x")
|
|
|
|
def test_anonymize_user(self):
|
|
"""Test user anonymization"""
|
|
user = "test@example.com"
|
|
anon_user = anonymize_user(user)
|
|
|
|
# Should be anonymized and consistent
|
|
self.assertNotEqual(anon_user, user)
|
|
self.assertTrue(anon_user.startswith("anon_"))
|
|
self.assertEqual(anonymize_user(user), anon_user)
|
|
|
|
# Standard users not anonymized
|
|
for standard_user in frappe.STANDARD_USERS:
|
|
self.assertEqual(anonymize_user(standard_user), standard_user)
|
|
|
|
|
|
class TestEventQueueDecoding(TestPulseClient):
|
|
def test_decode_valid_event(self):
|
|
"""Test decoding valid event JSON"""
|
|
eq = EventQueue()
|
|
|
|
event = {
|
|
"event_name": "test_event",
|
|
"captured_at": "2026-01-01T00:00:00",
|
|
"app": "frappe",
|
|
"user": "test@example.com",
|
|
"site": "test.localhost",
|
|
"properties": {},
|
|
}
|
|
|
|
# Add and retrieve
|
|
eq.add(event)
|
|
event_json = frappe.cache.rpop(eq.queue)
|
|
|
|
decoded = eq._decode_event(event_json)
|
|
self.assertIsNotNone(decoded)
|
|
self.assertEqual(decoded["event_name"], "test_event")
|
|
|
|
def test_decode_invalid_json(self):
|
|
"""Test decoding invalid JSON"""
|
|
eq = EventQueue()
|
|
|
|
# Invalid JSON should return None
|
|
decoded = eq._decode_event(b"invalid json{")
|
|
self.assertIsNone(decoded)
|
|
|
|
|
|
class TestEventKey(TestPulseClient):
|
|
def test_event_key_generation_and_uniqueness(self):
|
|
"""Test event key generation and uniqueness for rate limiting"""
|
|
eq = EventQueue()
|
|
|
|
event1 = {
|
|
"event_name": "event_1",
|
|
"app": "frappe",
|
|
"user": "user1@example.com",
|
|
"site": "test.localhost",
|
|
}
|
|
|
|
event2 = {
|
|
"event_name": "event_2",
|
|
"app": "frappe",
|
|
"user": "user1@example.com",
|
|
"site": "test.localhost",
|
|
}
|
|
|
|
# Test key composition
|
|
key1 = eq._get_event_key(event1)
|
|
self.assertIn("event_1", key1)
|
|
self.assertIn("test.localhost", key1)
|
|
self.assertIn("frappe", key1)
|
|
|
|
# Test uniqueness
|
|
key2 = eq._get_event_key(event2)
|
|
self.assertNotEqual(key1, key2)
|