From 4eafb38f98656d148bf9f99164fb25fb55d35906 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:58:47 +0530 Subject: [PATCH] test: rewrite `concurrent_limit` tests to test through public interface --- frappe/tests/test_concurrency_limiter.py | 150 ++++++++++++++--------- 1 file changed, 92 insertions(+), 58 deletions(-) diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index bdf879891a..2c17d86d93 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -1,20 +1,30 @@ # Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE +import contextvars +import threading +from unittest.mock import MagicMock, patch + import frappe -from frappe.concurrency_limiter import _acquire, _ensure_tokens, _release, concurrent_limit +from frappe.concurrency_limiter import concurrent_limit from frappe.exceptions import ServiceUnavailableError from frappe.tests import IntegrationTestCase -def _cache_name(fn): +def _key(fn): + """Reconstruct the Redis key that concurrent_limit uses for a decorated function.""" return f"concurrency:{fn.__module__}.{fn.__qualname__}" +def _cleanup(fn): + key = _key(fn) + frappe.cache.delete_value([key, f"{key}:capacity"]) + + class TestConcurrentLimit(IntegrationTestCase): def test_bypassed_outside_request_context(self): - """Decorator is a complete no-op when called outside an HTTP request context - (background jobs, CLI, direct test calls). Even limit=0 must not reject.""" + """Decorator is a no-op outside HTTP request context (background jobs, CLI, tests). + Even limit=0 must not reject.""" calls = [] @concurrent_limit(limit=0) @@ -32,98 +42,122 @@ class TestConcurrentLimit(IntegrationTestCase): frappe.local.request = saved self.assertEqual(calls, [True]) - # Token pool must not have been touched - self.assertFalse(frappe.cache.exists(_cache_name(fn))) - def test_raises_immediately_when_limit_full(self): - """ServiceUnavailableError is raised at once when wait_timeout=0 and the - token pool is empty.""" + def test_pool_exhaustion_raises_503_with_retry_after_header(self): + """When all slots are occupied, the next request raises ServiceUnavailableError + (HTTP 503) immediately with wait_timeout=0. The Retry-After response header must be set.""" + in_fn = threading.Event() + proceed = threading.Event() @concurrent_limit(limit=1, wait_timeout=0) def fn(): - pass + in_fn.set() + proceed.wait() - key = _cache_name(fn) - _ensure_tokens(key, limit=1) - token = frappe.cache.lpop(key) # exhaust the pool + ctx = contextvars.copy_context() + def hold_slot(): + frappe.local.request = frappe._dict() + fn() + + t = threading.Thread(target=ctx.run, args=(hold_slot,)) + t.start() + self.assertTrue(in_fn.wait(timeout=5), "Thread did not acquire the slot in time") + + mock_headers = MagicMock() + saved_headers = getattr(frappe.local, "response_headers", None) try: frappe.local.request = frappe._dict() - self.assertRaises(ServiceUnavailableError, fn) + frappe.local.response_headers = mock_headers + with self.assertRaises(ServiceUnavailableError) as exc_ctx: + fn() + self.assertEqual(exc_ctx.exception.http_status_code, 503) + mock_headers.set.assert_called_once_with("Retry-After", "1") # max(1, wait_timeout=0) finally: + proceed.set() + t.join(timeout=5) del frappe.local.request - if token: - frappe.cache.lpush(key, token) - frappe.cache.delete_value([key, f"{key}:capacity"]) + frappe.local.response_headers = saved_headers + _cleanup(fn) - def test_counter_released_after_successful_call(self): - """Token pool has all tokens back after the wrapped function completes normally.""" + def test_token_released_on_success(self): + """A token is returned to the pool after a successful call, + so subsequent calls can acquire it without hitting a 503.""" @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_name(fn) try: frappe.local.request = frappe._dict() fn() - self.assertEqual(frappe.cache.llen(key), 1) + fn() # should not raise ServiceUnavailableError since the token was released after the first call finally: del frappe.local.request - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_counter_released_after_exception(self): - """Token pool has all tokens back even when the wrapped function raises.""" + def test_token_released_on_exception(self): + """A token is returned to the pool even when the wrapped function raises, + so subsequent calls can proceed with their own application error, not a 503.""" - @concurrent_limit(limit=2, wait_timeout=0) + @concurrent_limit(limit=1, wait_timeout=0) def fn(): raise ValueError("boom") - key = _cache_name(fn) try: frappe.local.request = frappe._dict() - self.assertRaises(ValueError, fn) - self.assertEqual(frappe.cache.llen(key), 2) + with self.assertRaises(ValueError): + fn() + # Second call must raise ValueError (application error), not + # ServiceUnavailableError — which would indicate the token was leaked. + with self.assertRaises(ValueError): + fn() finally: del frappe.local.request - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_service_unavailable_has_correct_http_status(self): - """The raised exception must carry http_status_code=503.""" - TIMEOUT = 1 + def test_self_heals_after_capacity_key_expiry(self): + """After the capacity key expires (simulating crashed workers + TTL), + the pool re-initializes to full capacity so new requests succeed.""" - @concurrent_limit(limit=1, wait_timeout=TIMEOUT) + @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_name(fn) - _ensure_tokens(key, limit=1) - token = frappe.cache.lpop(key) # exhaust the pool + key = _key(fn) + try: + frappe.local.request = frappe._dict() + fn() # initializes the pool via the decorator + + # Simulate all tokens being leaked (workers crashed mid-request) + # by draining the pool without returning tokens. + while frappe.cache.lpop(key): + pass + + # Simulate capacity key TTL expiry. + frappe.cache.delete_value(f"{key}:capacity") + + # Self-heal: next request must re-initialize the pool and succeed. + fn() # must not raise ServiceUnavailableError + finally: + del frappe.local.request + _cleanup(fn) + + def test_fails_open_when_redis_unavailable(self): + """When Redis is unavailable during acquire, the request proceeds normally + (fail-open) rather than raising ServiceUnavailableError.""" + calls = [] + + @concurrent_limit(limit=1, wait_timeout=0) + def fn(): + calls.append(True) try: frappe.local.request = frappe._dict() - with self.assertRaises(ServiceUnavailableError) as ctx: - fn() - self.assertEqual(ctx.exception.http_status_code, 503) + with patch.object(frappe.cache, "lpop", side_effect=Exception("Redis down")): + fn() # must not raise finally: del frappe.local.request - if token: - frappe.cache.lpush(key, token) - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_double_release_doesnt_exceed_limit(self): - """Releasing a token twice must not inflate the pool beyond the limit.""" - key = "concurrency:test.double_release" - LIMIT = 2 - - _ensure_tokens(key, limit=LIMIT) - token = _acquire(key, limit=LIMIT, wait_timeout=0) - self.assertIsNotNone(token) - - _release(key, token) - _release(key, token) # spurious extra release - - pool_size = frappe.cache.llen(key) - frappe.cache.delete_value([key, f"{key}:capacity"]) - - self.assertLessEqual(pool_size, LIMIT + 1) + self.assertEqual(calls, [True])