From 2f30dac5d8434361da44266360b3f750c04f059f Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Fri, 10 Apr 2026 22:22:23 +0530 Subject: [PATCH 01/14] feat: implement concurrency limiting decorator --- frappe/__init__.py | 1 + frappe/app.py | 6 + frappe/concurrency_limiter.py | 138 ++++++++++++++++ frappe/exceptions.py | 11 ++ frappe/tests/test_concurrency_limiter.py | 191 +++++++++++++++++++++++ 5 files changed, 347 insertions(+) create mode 100644 frappe/concurrency_limiter.py create mode 100644 frappe/tests/test_concurrency_limiter.py diff --git a/frappe/__init__.py b/frappe/__init__.py index 8996a7fbe3..8151db5edb 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -34,6 +34,7 @@ import orjson from werkzeug.datastructures import Headers import frappe +from frappe.concurrency_limiter import concurrent_limit from frappe.query_builder.utils import ( get_query, get_query_builder, diff --git a/frappe/app.py b/frappe/app.py index 6a07c9c223..d9b401cd5a 100644 --- a/frappe/app.py +++ b/frappe/app.py @@ -394,6 +394,12 @@ def handle_exception(e): elif http_status_code == 429: response = frappe.rate_limiter.respond() + elif http_status_code == 503: + retry_after = getattr(e, "retry_after", 10) + response = frappe.utils.response.report_error(503) + if response: + response.headers["Retry-After"] = str(retry_after) + else: response = ErrorPage( http_status_code=http_status_code, title=_("Server Error"), message=_("Uncaught Exception") diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py new file mode 100644 index 0000000000..04b743b5e4 --- /dev/null +++ b/frappe/concurrency_limiter.py @@ -0,0 +1,138 @@ +# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors +# License: MIT. See LICENSE + +""" +Concurrency limiter for expensive whitelisted methods. + +Provides a @frappe.concurrent_limit() decorator that limits the number of +simultaneous in-flight executions of a function across all gunicorn workers +using a Redis-backed atomic counter (semaphore). + +Usage:: + + @frappe.whitelist(allow_guest=True) + @frappe.concurrent_limit(limit=3) + def download_pdf(...): + ... + +""" + +import time +from collections.abc import Callable +from functools import wraps + +import frappe + +# Safety TTL (seconds) for the Redis key — prevents leaked semaphore slots if a +# worker crashes mid-request. Should be larger than any realistic execution time. +_SLOT_TTL = 120 + +# Default wait timeout (seconds) before returning 503 to the caller. +_DEFAULT_WAIT_TIMEOUT = 10 + +# Polling interval (seconds) while waiting for a slot to open. +_POLL_INTERVAL = 0.25 + + +def _default_limit() -> int: + """Derive a sensible default concurrency limit from the number of gunicorn workers.""" + import multiprocessing + + workers = frappe.conf.get("gunicorn_workers") or (multiprocessing.cpu_count() * 2 + 1) + return max(1, int(workers) // 2) + + +def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): + """Decorator that limits simultaneous in-flight executions of the wrapped function. + + :param limit: Maximum number of concurrent executions. Defaults to + ``gunicorn_workers // 2`` (or the value in ``concurrency_limits`` site config). + :param wait_timeout: Seconds to wait for a free slot before returning 503. + Defaults to 10 s. Suppressed for background jobs. + """ + + def decorator(fn: Callable) -> Callable: + @wraps(fn) + def wrapper(*args, **kwargs): + # Skip concurrency limiting outside of HTTP requests (background jobs, + # CLI commands, tests that call functions directly, etc.). + if not getattr(frappe.local, "request", None): + return fn(*args, **kwargs) + + effective_limit = int(limit) if limit is not None else _default_limit() + effective_wait = ( + wait_timeout + if wait_timeout is not None + else frappe.conf.get("concurrency_wait_timeout", _DEFAULT_WAIT_TIMEOUT) + ) + + cache_key = frappe.cache.make_key(f"concurrency:{fn.__module__}.{fn.__qualname__}") + + acquired = _acquire(cache_key, effective_limit, effective_wait) + if not acquired: + from frappe.exceptions import ServiceUnavailableError + + retry_after = max(1, int(effective_wait)) + exc = ServiceUnavailableError(frappe._("Server is busy. Please try again in a few seconds.")) + exc.retry_after = retry_after + raise exc + + try: + return fn(*args, **kwargs) + finally: + _release(cache_key) + + return wrapper + + return decorator + + +def _acquire(cache_key: str, limit: int, wait_timeout: float) -> bool: + """Increment the counter and return True if we got a slot within *wait_timeout* seconds. + + The counter is incremented first; if the new value exceeds *limit* the + increment is undone and we wait before retrying. This avoids a separate + check-then-act race condition — INCRBY is atomic. + """ + deadline = time.monotonic() + wait_timeout + + while True: + try: + current = frappe.cache.incrby(cache_key, 1) + except Exception: + # Redis unavailable — fail open to avoid breaking the endpoint entirely. + frappe.log_error("Concurrency limiter: Redis unavailable, skipping limit") + return True + + # Refresh TTL on every successful increment so that a slow request + # doesn't let the slot expire before it finishes. + try: + frappe.cache.expire(cache_key, _SLOT_TTL) + except Exception: + pass + + if current <= limit: + return True + + # Over the limit — give back the slot and wait. + try: + frappe.cache.incrby(cache_key, -1) + except Exception: + pass + + remaining = deadline - time.monotonic() + if remaining <= 0: + return False + + time.sleep(min(_POLL_INTERVAL, remaining)) + + +def _release(cache_key: str) -> None: + """Decrement the counter, clamping at 0 to guard against double-release.""" + try: + new_val = frappe.cache.incrby(cache_key, -1) + if new_val < 0: + # Shouldn't happen, but clamp to prevent permanently negative counters. + frappe.cache.incrby(cache_key, -new_val) + except Exception: + pass diff --git a/frappe/exceptions.py b/frappe/exceptions.py index 210408422a..24c8c301ab 100644 --- a/frappe/exceptions.py +++ b/frappe/exceptions.py @@ -86,6 +86,17 @@ class TooManyRequestsError(Exception): http_status_code = 429 +class ServiceUnavailableError(Exception): + """Raised when a concurrency limit is exceeded for an endpoint. + + Set :attr:`retry_after` (seconds) before raising so that the response + includes a ``Retry-After`` header. + """ + + http_status_code = 503 + retry_after: int = 10 + + class ImproperDBConfigurationError(Exception): """ Used when frappe detects that database or tables are not properly diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py new file mode 100644 index 0000000000..fe27d9c3c4 --- /dev/null +++ b/frappe/tests/test_concurrency_limiter.py @@ -0,0 +1,191 @@ +# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors +# License: MIT. See LICENSE + +import threading +import time + +import frappe +from frappe.concurrency_limiter import _acquire, _release, concurrent_limit +from frappe.exceptions import ServiceUnavailableError +from frappe.tests import IntegrationTestCase + + +def _cache_key(fn): + return frappe.cache.make_key(f"concurrency:{fn.__module__}.{fn.__qualname__}") + + +class TestConcurrentLimit(IntegrationTestCase): + def test_bypassed_outside_request_context(self): + """Decorator is a complete no-op when called outside an HTTP request context + (background jobs, CLI, direct test calls). Even limit=0 must not reject.""" + calls = [] + + @concurrent_limit(limit=0) + def fn(): + calls.append(True) + + # Make sure no request is set on this thread + saved = getattr(frappe.local, "request", None) + if saved: + del frappe.local.request + + try: + fn() # must not raise despite limit=0 + finally: + if saved: + frappe.local.request = saved + + self.assertEqual(calls, [True]) + # Counter must not have been touched + self.assertIsNone(frappe.cache.get(_cache_key(fn))) + + def test_raises_immediately_when_limit_full(self): + """ServiceUnavailableError is raised at once when wait_timeout=0 and the + slot counter is already at the limit.""" + + @concurrent_limit(limit=1, wait_timeout=0) + def fn(): + pass + + key = _cache_key(fn) + frappe.cache.incrby(key, 1) # simulate one in-flight request + frappe.cache.expire(key, 60) + + try: + frappe.local.request = frappe._dict() + self.assertRaises(ServiceUnavailableError, fn) + finally: + del frappe.local.request + frappe.cache.delete(key) + + def test_counter_released_after_successful_call(self): + """Slot counter returns to zero after the wrapped function completes normally.""" + + @concurrent_limit(limit=1, wait_timeout=0) + def fn(): + pass + + key = _cache_key(fn) + try: + frappe.local.request = frappe._dict() + fn() + self.assertEqual(int(frappe.cache.get(key) or 0), 0) + finally: + del frappe.local.request + frappe.cache.delete(key) + + def test_counter_released_after_exception(self): + """Slot counter returns to zero even when the wrapped function raises. + This verifies the finally-block release path.""" + + @concurrent_limit(limit=2, wait_timeout=0) + def fn(): + raise ValueError("boom") + + key = _cache_key(fn) + try: + frappe.local.request = frappe._dict() + self.assertRaises(ValueError, fn) + self.assertEqual(int(frappe.cache.get(key) or 0), 0) + finally: + del frappe.local.request + frappe.cache.delete(key) + + def test_service_unavailable_has_correct_http_status_and_retry_after(self): + """The raised exception must carry http_status_code=503 and retry_after + equal to the configured wait_timeout.""" + TIMEOUT = 1 + + @concurrent_limit(limit=1, wait_timeout=TIMEOUT) + def fn(): + pass + + key = _cache_key(fn) + frappe.cache.incrby(key, 1) + frappe.cache.expire(key, 60) + + try: + frappe.local.request = frappe._dict() + with self.assertRaises(ServiceUnavailableError) as ctx: + fn() + exc = ctx.exception + self.assertEqual(exc.http_status_code, 503) + self.assertEqual(exc.retry_after, TIMEOUT) + finally: + del frappe.local.request + frappe.cache.delete(key) + + def test_waiter_acquires_slot_when_released(self): + """A blocked _acquire call succeeds once a concurrent holder calls _release. + Tests the polling loop without going through the decorator.""" + key = frappe.cache.make_key("concurrency:test.waiter_acquire") + + # Simulate one in-flight holder + frappe.cache.incrby(key, 1) + frappe.cache.expire(key, 60) + + acquired = [] + + def release_after_short_delay(): + time.sleep(0.3) + _release(key) + + releaser = threading.Thread(target=release_after_short_delay, daemon=True) + releaser.start() + + # wait_timeout=2 — should succeed well within that window + result = _acquire(key, limit=1, wait_timeout=2) + acquired.append(result) + + releaser.join() + frappe.cache.delete(key) + + self.assertTrue(acquired[0]) + + def test_counter_clamped_at_zero_on_double_release(self): + """Calling _release more times than _acquire must never produce a negative + counter (which would inflate the effective slot budget).""" + key = frappe.cache.make_key("concurrency:test.clamp_release") + + frappe.cache.incrby(key, 1) + _release(key) # correct release → 0 + _release(key) # spurious extra release + + counter = int(frappe.cache.get(key) or 0) + frappe.cache.delete(key) + + self.assertGreaterEqual(counter, 0) + + def test_concurrent_threads_respect_limit(self): + """Exactly `limit` threads acquire concurrently; the rest are rejected when + wait_timeout=0. This exercises the atomic INCRBY semaphore across threads.""" + LIMIT = 2 + TOTAL = 5 + key = frappe.cache.make_key("concurrency:test.thread_limit") + + successes = [] + rejections = [] + lock = threading.Lock() + barrier = threading.Barrier(TOTAL) + + def attempt(): + barrier.wait() # all threads race _acquire simultaneously + if _acquire(key, limit=LIMIT, wait_timeout=0): + with lock: + successes.append(1) + time.sleep(0.05) # hold the slot briefly + _release(key) + else: + with lock: + rejections.append(1) + + threads = [threading.Thread(target=attempt, daemon=True) for _ in range(TOTAL)] + for t in threads: + t.start() + for t in threads: + t.join() + + frappe.cache.delete(key) + + self.assertEqual(len(successes), LIMIT) + self.assertEqual(len(rejections), TOTAL - LIMIT) From 76eb3297cd04f57cc8a647bad59994041e580414 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Fri, 10 Apr 2026 22:43:45 +0530 Subject: [PATCH 02/14] refactor: set Retry-After header directly --- frappe/app.py | 6 ------ frappe/concurrency_limiter.py | 4 ++-- frappe/exceptions.py | 7 ------- frappe/tests/test_concurrency_limiter.py | 6 ++---- 4 files changed, 4 insertions(+), 19 deletions(-) diff --git a/frappe/app.py b/frappe/app.py index d9b401cd5a..6a07c9c223 100644 --- a/frappe/app.py +++ b/frappe/app.py @@ -394,12 +394,6 @@ def handle_exception(e): elif http_status_code == 429: response = frappe.rate_limiter.respond() - elif http_status_code == 503: - retry_after = getattr(e, "retry_after", 10) - response = frappe.utils.response.report_error(503) - if response: - response.headers["Retry-After"] = str(retry_after) - else: response = ErrorPage( http_status_code=http_status_code, title=_("Server Error"), message=_("Uncaught Exception") diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 04b743b5e4..0fe2e1a534 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -72,9 +72,9 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): if not acquired: from frappe.exceptions import ServiceUnavailableError - retry_after = max(1, int(effective_wait)) exc = ServiceUnavailableError(frappe._("Server is busy. Please try again in a few seconds.")) - exc.retry_after = retry_after + retry_after = max(1, int(effective_wait)) + frappe.local.response_headers.set("Retry-After", str(retry_after)) raise exc try: diff --git a/frappe/exceptions.py b/frappe/exceptions.py index 24c8c301ab..4b73db8b9c 100644 --- a/frappe/exceptions.py +++ b/frappe/exceptions.py @@ -87,14 +87,7 @@ class TooManyRequestsError(Exception): class ServiceUnavailableError(Exception): - """Raised when a concurrency limit is exceeded for an endpoint. - - Set :attr:`retry_after` (seconds) before raising so that the response - includes a ``Retry-After`` header. - """ - http_status_code = 503 - retry_after: int = 10 class ImproperDBConfigurationError(Exception): diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index fe27d9c3c4..67755c2db5 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -91,9 +91,8 @@ class TestConcurrentLimit(IntegrationTestCase): del frappe.local.request frappe.cache.delete(key) - def test_service_unavailable_has_correct_http_status_and_retry_after(self): - """The raised exception must carry http_status_code=503 and retry_after - equal to the configured wait_timeout.""" + def test_service_unavailable_has_correct_http_status(self): + """The raised exception must carry http_status_code=503.""" TIMEOUT = 1 @concurrent_limit(limit=1, wait_timeout=TIMEOUT) @@ -110,7 +109,6 @@ class TestConcurrentLimit(IntegrationTestCase): fn() exc = ctx.exception self.assertEqual(exc.http_status_code, 503) - self.assertEqual(exc.retry_after, TIMEOUT) finally: del frappe.local.request frappe.cache.delete(key) From 18d73d804577a63faa8ae67cd7a3c043c54172ae Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Tue, 14 Apr 2026 11:07:48 +0530 Subject: [PATCH 03/14] fix: tests --- frappe/concurrency_limiter.py | 8 +++++--- frappe/tests/test_concurrency_limiter.py | 14 +++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 0fe2e1a534..c75f0ad67f 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -56,7 +56,7 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): def wrapper(*args, **kwargs): # Skip concurrency limiting outside of HTTP requests (background jobs, # CLI commands, tests that call functions directly, etc.). - if not getattr(frappe.local, "request", None): + if getattr(frappe.local, "request", None) is None: return fn(*args, **kwargs) effective_limit = int(limit) if limit is not None else _default_limit() @@ -72,9 +72,11 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): if not acquired: from frappe.exceptions import ServiceUnavailableError - exc = ServiceUnavailableError(frappe._("Server is busy. Please try again in a few seconds.")) retry_after = max(1, int(effective_wait)) - frappe.local.response_headers.set("Retry-After", str(retry_after)) + if (headers := getattr(frappe.local, "response_headers", None)) is not None: + headers.set("Retry-After", str(retry_after)) + exc = ServiceUnavailableError(frappe._("Server is busy. Please try again in a few seconds.")) + exc.retry_after = retry_after raise exc try: diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index 67755c2db5..9f4818cd24 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -10,8 +10,12 @@ from frappe.exceptions import ServiceUnavailableError from frappe.tests import IntegrationTestCase +def _cache_name(fn): + return f"concurrency:{fn.__module__}.{fn.__qualname__}" + + def _cache_key(fn): - return frappe.cache.make_key(f"concurrency:{fn.__module__}.{fn.__qualname__}") + return frappe.cache.make_key(_cache_name(fn)) class TestConcurrentLimit(IntegrationTestCase): @@ -37,7 +41,7 @@ class TestConcurrentLimit(IntegrationTestCase): self.assertEqual(calls, [True]) # Counter must not have been touched - self.assertIsNone(frappe.cache.get(_cache_key(fn))) + self.assertFalse(frappe.cache.exists(_cache_key(fn))) def test_raises_immediately_when_limit_full(self): """ServiceUnavailableError is raised at once when wait_timeout=0 and the @@ -69,7 +73,7 @@ class TestConcurrentLimit(IntegrationTestCase): try: frappe.local.request = frappe._dict() fn() - self.assertEqual(int(frappe.cache.get(key) or 0), 0) + self.assertEqual(frappe.cache.incrby(_cache_key(fn), 0), 0) finally: del frappe.local.request frappe.cache.delete(key) @@ -86,7 +90,7 @@ class TestConcurrentLimit(IntegrationTestCase): try: frappe.local.request = frappe._dict() self.assertRaises(ValueError, fn) - self.assertEqual(int(frappe.cache.get(key) or 0), 0) + self.assertEqual(frappe.cache.incrby(_cache_key(fn), 0), 0) finally: del frappe.local.request frappe.cache.delete(key) @@ -149,7 +153,7 @@ class TestConcurrentLimit(IntegrationTestCase): _release(key) # correct release → 0 _release(key) # spurious extra release - counter = int(frappe.cache.get(key) or 0) + counter = frappe.cache.incrby(key, 0) frappe.cache.delete(key) self.assertGreaterEqual(counter, 0) From e8c7eb946b8422cb0688c4f7322666fa61ee65ca Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:21:33 +0530 Subject: [PATCH 04/14] refactor: rewrite `concurrent_limit` to use LIST + BLPOP semaphore Replace the INCRBY-based polling loop with a proper token pool backed by a Redis LIST. BLPOP blocks until a token is available instead of sleeping and retrying, which is more efficient and avoids the check-then-act race of the old counter approach. Other fixes bundled in: - Add `blpop` and `setnx` wrappers to `RedisWrapper` so all key prefixing goes through `make_key` consistently - Cache `_default_limit()` result with `@redis_cache(shared=True)` to avoid importing `multiprocessing` on every request - Fix `limit=0` edge case: use `is not None` guard instead of falsy check - Guard `_release()` against pushing the `"fallback"` token back into the pool when Redis was unavailable during acquire --- frappe/concurrency_limiter.py | 139 +++++++++++------------ frappe/tests/test_concurrency_limiter.py | 136 ++++++---------------- frappe/utils/redis_wrapper.py | 6 + 3 files changed, 110 insertions(+), 171 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index c75f0ad67f..ae08877de3 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -6,7 +6,7 @@ Concurrency limiter for expensive whitelisted methods. Provides a @frappe.concurrent_limit() decorator that limits the number of simultaneous in-flight executions of a function across all gunicorn workers -using a Redis-backed atomic counter (semaphore). +using a Redis-backed semaphore (LIST + BLPOP). Usage:: @@ -17,23 +17,19 @@ Usage:: """ -import time from collections.abc import Callable from functools import wraps import frappe - -# Safety TTL (seconds) for the Redis key — prevents leaked semaphore slots if a -# worker crashes mid-request. Should be larger than any realistic execution time. -_SLOT_TTL = 120 +from frappe.exceptions import ServiceUnavailableError +from frappe.utils import cint +from frappe.utils.caching import redis_cache # Default wait timeout (seconds) before returning 503 to the caller. _DEFAULT_WAIT_TIMEOUT = 10 -# Polling interval (seconds) while waiting for a slot to open. -_POLL_INTERVAL = 0.25 - +@redis_cache(shared=True) def _default_limit() -> int: """Derive a sensible default concurrency limit from the number of gunicorn workers.""" import multiprocessing @@ -42,11 +38,10 @@ def _default_limit() -> int: return max(1, int(workers) // 2) -def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): +def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT_TIMEOUT): """Decorator that limits simultaneous in-flight executions of the wrapped function. - :param limit: Maximum number of concurrent executions. Defaults to - ``gunicorn_workers // 2`` (or the value in ``concurrency_limits`` site config). + :param limit: Maximum number of concurrent executions. Defaults to ``gunicorn_workers // 2`` :param wait_timeout: Seconds to wait for a free slot before returning 503. Defaults to 10 s. Suppressed for background jobs. """ @@ -59,20 +54,12 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): if getattr(frappe.local, "request", None) is None: return fn(*args, **kwargs) - effective_limit = int(limit) if limit is not None else _default_limit() - effective_wait = ( - wait_timeout - if wait_timeout is not None - else frappe.conf.get("concurrency_wait_timeout", _DEFAULT_WAIT_TIMEOUT) - ) + _limit = cint(limit) if limit is not None else _default_limit() + key = f"concurrency:{fn.__module__}.{fn.__qualname__}" - cache_key = frappe.cache.make_key(f"concurrency:{fn.__module__}.{fn.__qualname__}") - - acquired = _acquire(cache_key, effective_limit, effective_wait) - if not acquired: - from frappe.exceptions import ServiceUnavailableError - - retry_after = max(1, int(effective_wait)) + token = _acquire(key, _limit, wait_timeout) + if not token: + retry_after = max(1, int(wait_timeout)) if (headers := getattr(frappe.local, "response_headers", None)) is not None: headers.set("Retry-After", str(retry_after)) exc = ServiceUnavailableError(frappe._("Server is busy. Please try again in a few seconds.")) @@ -82,59 +69,69 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int | None = None): try: return fn(*args, **kwargs) finally: - _release(cache_key) + _release(key, token) return wrapper return decorator -def _acquire(cache_key: str, limit: int, wait_timeout: float) -> bool: - """Increment the counter and return True if we got a slot within *wait_timeout* seconds. +def _ensure_tokens(key: str, limit: int) -> None: + """Ensure the token pool is initialized with the correct number of tokens. - The counter is incremented first; if the new value exceeds *limit* the - increment is undone and we wait before retrying. This avoids a separate - check-then-act race condition — INCRBY is atomic. + Uses ``SET NX`` on a separate capacity key as an atomic init-once flag so + the pool is never re-filled just because all tokens are legitimately in use + (empty list ≠ uninitialised). """ - deadline = time.monotonic() + wait_timeout - - while True: - try: - current = frappe.cache.incrby(cache_key, 1) - except Exception: - # Redis unavailable — fail open to avoid breaking the endpoint entirely. - frappe.log_error("Concurrency limiter: Redis unavailable, skipping limit") - return True - - # Refresh TTL on every successful increment so that a slow request - # doesn't let the slot expire before it finishes. - try: - frappe.cache.expire(cache_key, _SLOT_TTL) - except Exception: - pass - - if current <= limit: - return True - - # Over the limit — give back the slot and wait. - try: - frappe.cache.incrby(cache_key, -1) - except Exception: - pass - - remaining = deadline - time.monotonic() - if remaining <= 0: - return False - - time.sleep(min(_POLL_INTERVAL, remaining)) - - -def _release(cache_key: str) -> None: - """Decrement the counter, clamping at 0 to guard against double-release.""" try: - new_val = frappe.cache.incrby(cache_key, -1) - if new_val < 0: - # Shouldn't happen, but clamp to prevent permanently negative counters. - frappe.cache.incrby(cache_key, -new_val) + cap_key = f"{key}:capacity" + + if not frappe.cache.setnx(cap_key, str(limit)): + return # already initialized + + # initialize the token pool + prefixed = frappe.cache.make_key(key) + pipe = frappe.cache.pipeline(transaction=True) + pipe.delete(prefixed) + for i in range(limit): + pipe.rpush(prefixed, str(i)) + pipe.execute() except Exception: - pass + frappe.log_error("Concurrency limiter: Failed to initialize tokens") + + +def _acquire(key: str, limit: int, wait_timeout: float) -> str | None: + """Try to acquire a token from the pool. + + For *wait_timeout* ≤ 0: uses LPOP (non-blocking). + For *wait_timeout* > 0: uses BLPOP (blocks until a token is available or + the timeout expires). + """ + try: + _ensure_tokens(key, limit) + + def _decode(result): + return result.decode() if isinstance(result, bytes) else result + + if wait_timeout <= 0: + result = frappe.cache.lpop(key) + return _decode(result) if result is not None else None + + # Returns (key_bytes, value_bytes) or None on timeout. + if result := frappe.cache.blpop(key, timeout=int(wait_timeout)): + return _decode(result[1]) + return None + + except Exception: + frappe.log_error("Concurrency limiter: Redis unavailable, skipping limit") + return "fallback" + + +def _release(key: str, token: str) -> None: + """Return the token to the pool.""" + if token == "fallback": + return + try: + frappe.cache.lpush(key, token) + except Exception: + frappe.log_error(f"Concurrency limiter: Failed to release token {token}") diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index 9f4818cd24..bdf879891a 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -1,11 +1,8 @@ # Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE -import threading -import time - import frappe -from frappe.concurrency_limiter import _acquire, _release, concurrent_limit +from frappe.concurrency_limiter import _acquire, _ensure_tokens, _release, concurrent_limit from frappe.exceptions import ServiceUnavailableError from frappe.tests import IntegrationTestCase @@ -14,10 +11,6 @@ def _cache_name(fn): return f"concurrency:{fn.__module__}.{fn.__qualname__}" -def _cache_key(fn): - return frappe.cache.make_key(_cache_name(fn)) - - class TestConcurrentLimit(IntegrationTestCase): def test_bypassed_outside_request_context(self): """Decorator is a complete no-op when called outside an HTTP request context @@ -28,7 +21,6 @@ class TestConcurrentLimit(IntegrationTestCase): def fn(): calls.append(True) - # Make sure no request is set on this thread saved = getattr(frappe.local, "request", None) if saved: del frappe.local.request @@ -40,60 +32,61 @@ class TestConcurrentLimit(IntegrationTestCase): frappe.local.request = saved self.assertEqual(calls, [True]) - # Counter must not have been touched - self.assertFalse(frappe.cache.exists(_cache_key(fn))) + # Token pool must not have been touched + self.assertFalse(frappe.cache.exists(_cache_name(fn))) def test_raises_immediately_when_limit_full(self): """ServiceUnavailableError is raised at once when wait_timeout=0 and the - slot counter is already at the limit.""" + token pool is empty.""" @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_key(fn) - frappe.cache.incrby(key, 1) # simulate one in-flight request - frappe.cache.expire(key, 60) + key = _cache_name(fn) + _ensure_tokens(key, limit=1) + token = frappe.cache.lpop(key) # exhaust the pool try: frappe.local.request = frappe._dict() self.assertRaises(ServiceUnavailableError, fn) finally: del frappe.local.request - frappe.cache.delete(key) + if token: + frappe.cache.lpush(key, token) + frappe.cache.delete_value([key, f"{key}:capacity"]) def test_counter_released_after_successful_call(self): - """Slot counter returns to zero after the wrapped function completes normally.""" + """Token pool has all tokens back after the wrapped function completes normally.""" @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_key(fn) + key = _cache_name(fn) try: frappe.local.request = frappe._dict() fn() - self.assertEqual(frappe.cache.incrby(_cache_key(fn), 0), 0) + self.assertEqual(frappe.cache.llen(key), 1) finally: del frappe.local.request - frappe.cache.delete(key) + frappe.cache.delete_value([key, f"{key}:capacity"]) def test_counter_released_after_exception(self): - """Slot counter returns to zero even when the wrapped function raises. - This verifies the finally-block release path.""" + """Token pool has all tokens back even when the wrapped function raises.""" @concurrent_limit(limit=2, wait_timeout=0) def fn(): raise ValueError("boom") - key = _cache_key(fn) + key = _cache_name(fn) try: frappe.local.request = frappe._dict() self.assertRaises(ValueError, fn) - self.assertEqual(frappe.cache.incrby(_cache_key(fn), 0), 0) + self.assertEqual(frappe.cache.llen(key), 2) finally: del frappe.local.request - frappe.cache.delete(key) + frappe.cache.delete_value([key, f"{key}:capacity"]) def test_service_unavailable_has_correct_http_status(self): """The raised exception must carry http_status_code=503.""" @@ -103,91 +96,34 @@ class TestConcurrentLimit(IntegrationTestCase): def fn(): pass - key = _cache_key(fn) - frappe.cache.incrby(key, 1) - frappe.cache.expire(key, 60) + key = _cache_name(fn) + _ensure_tokens(key, limit=1) + token = frappe.cache.lpop(key) # exhaust the pool try: frappe.local.request = frappe._dict() with self.assertRaises(ServiceUnavailableError) as ctx: fn() - exc = ctx.exception - self.assertEqual(exc.http_status_code, 503) + self.assertEqual(ctx.exception.http_status_code, 503) finally: del frappe.local.request - frappe.cache.delete(key) + if token: + frappe.cache.lpush(key, token) + frappe.cache.delete_value([key, f"{key}:capacity"]) - def test_waiter_acquires_slot_when_released(self): - """A blocked _acquire call succeeds once a concurrent holder calls _release. - Tests the polling loop without going through the decorator.""" - key = frappe.cache.make_key("concurrency:test.waiter_acquire") - - # Simulate one in-flight holder - frappe.cache.incrby(key, 1) - frappe.cache.expire(key, 60) - - acquired = [] - - def release_after_short_delay(): - time.sleep(0.3) - _release(key) - - releaser = threading.Thread(target=release_after_short_delay, daemon=True) - releaser.start() - - # wait_timeout=2 — should succeed well within that window - result = _acquire(key, limit=1, wait_timeout=2) - acquired.append(result) - - releaser.join() - frappe.cache.delete(key) - - self.assertTrue(acquired[0]) - - def test_counter_clamped_at_zero_on_double_release(self): - """Calling _release more times than _acquire must never produce a negative - counter (which would inflate the effective slot budget).""" - key = frappe.cache.make_key("concurrency:test.clamp_release") - - frappe.cache.incrby(key, 1) - _release(key) # correct release → 0 - _release(key) # spurious extra release - - counter = frappe.cache.incrby(key, 0) - frappe.cache.delete(key) - - self.assertGreaterEqual(counter, 0) - - def test_concurrent_threads_respect_limit(self): - """Exactly `limit` threads acquire concurrently; the rest are rejected when - wait_timeout=0. This exercises the atomic INCRBY semaphore across threads.""" + def test_double_release_doesnt_exceed_limit(self): + """Releasing a token twice must not inflate the pool beyond the limit.""" + key = "concurrency:test.double_release" LIMIT = 2 - TOTAL = 5 - key = frappe.cache.make_key("concurrency:test.thread_limit") - successes = [] - rejections = [] - lock = threading.Lock() - barrier = threading.Barrier(TOTAL) + _ensure_tokens(key, limit=LIMIT) + token = _acquire(key, limit=LIMIT, wait_timeout=0) + self.assertIsNotNone(token) - def attempt(): - barrier.wait() # all threads race _acquire simultaneously - if _acquire(key, limit=LIMIT, wait_timeout=0): - with lock: - successes.append(1) - time.sleep(0.05) # hold the slot briefly - _release(key) - else: - with lock: - rejections.append(1) + _release(key, token) + _release(key, token) # spurious extra release - threads = [threading.Thread(target=attempt, daemon=True) for _ in range(TOTAL)] - for t in threads: - t.start() - for t in threads: - t.join() + pool_size = frappe.cache.llen(key) + frappe.cache.delete_value([key, f"{key}:capacity"]) - frappe.cache.delete(key) - - self.assertEqual(len(successes), LIMIT) - self.assertEqual(len(rejections), TOTAL - LIMIT) + self.assertLessEqual(pool_size, LIMIT + 1) diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 844852b4d5..4a6236e7db 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -174,6 +174,12 @@ class RedisWrapper(redis.Redis): def rpop(self, key): return super().rpop(self.make_key(key)) + def blpop(self, key, timeout=0): + return super().blpop(self.make_key(key), timeout=timeout) + + def setnx(self, name, value): + return super().setnx(self.make_key(name), value) + def llen(self, key): return super().llen(self.make_key(key)) From 8589f26ce9af415ee03a679517b2ddf38ef5e312 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:25:30 +0530 Subject: [PATCH 05/14] fix: atomically initialize token pool via Lua script in `_ensure_tokens` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the `setnx` + pipeline pair with a Lua script evaluated in a single round-trip. The prior approach had a race window: between the `SET NX` succeeding and the `MULTI/EXEC` pipeline running, a concurrent worker could BLPOP from the list just before `DEL` wiped it — losing tokens permanently. A process crash in that window left the capacity flag set but the token list empty, breaking the semaphore with no recovery path. The Lua script makes the check-and-initialize atomic: Redis executes it as a single unit with no interleaving, so the race window is closed. --- frappe/concurrency_limiter.py | 41 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index ae08877de3..5915296e8c 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -76,26 +76,33 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT return decorator -def _ensure_tokens(key: str, limit: int) -> None: - """Ensure the token pool is initialized with the correct number of tokens. +# Lua script that atomically initializes the token pool. +# Combines the SET NX check and the DEL + RPUSH population into a single +# atomic operation, closing the race window between the init-flag check +# and the list population that existed with the prior setnx + pipeline approach. +# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit +_INIT_SCRIPT = """\ +if redis.call('SET', KEYS[1], ARGV[1], 'NX') then + redis.call('DEL', KEYS[2]) + local n = tonumber(ARGV[1]) + for i = 1, n do + redis.call('RPUSH', KEYS[2], tostring(i)) + end +end +""" - Uses ``SET NX`` on a separate capacity key as an atomic init-once flag so - the pool is never re-filled just because all tokens are legitimately in use - (empty list ≠ uninitialised). + +def _ensure_tokens(key: str, limit: int) -> None: + """Ensure the token pool is initialized atomically. + + A Lua script performs ``SET NX`` on the capacity key and populates the + token list in a single atomic operation, closing the race window between + the init-flag check and the list population. """ try: - cap_key = f"{key}:capacity" - - if not frappe.cache.setnx(cap_key, str(limit)): - return # already initialized - - # initialize the token pool - prefixed = frappe.cache.make_key(key) - pipe = frappe.cache.pipeline(transaction=True) - pipe.delete(prefixed) - for i in range(limit): - pipe.rpush(prefixed, str(i)) - pipe.execute() + prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity") + prefixed_key = frappe.cache.make_key(key) + frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit)) except Exception: frappe.log_error("Concurrency limiter: Failed to initialize tokens") From 033d49b488a3cb5a3903f7d7d1cd294e7f8c92e9 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:26:17 +0530 Subject: [PATCH 06/14] fix: add TTL to capacity key so pool self-heals after worker crash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a gunicorn worker is killed (SIGKILL, OOM) while holding a token, the token is never returned to the pool. With no TTL on the capacity key, `setnx` would never fire again, so the pool shrinks permanently — with `limit=3` you silently end up at `limit=2`, then `limit=1`, etc. Set a 1-hour TTL (`_CAPACITY_KEY_TTL`) on the capacity key via the `NX EX` form of SET in the Lua init script. When the key expires the next request re-initializes the pool to full capacity, so the semaphore is self-healing without manual Redis key deletion. --- frappe/concurrency_limiter.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 5915296e8c..dabff152f9 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -76,13 +76,19 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT return decorator +# Safety TTL (seconds) for the capacity key — allows the pool to self-heal +# after a worker crash that leaked a token. The cap key expiring causes the +# next request to re-initialize the pool to full capacity. Must be longer +# than any realistic request, but short enough to recover from crashes. +_CAPACITY_KEY_TTL = 3600 # 1 hour + # Lua script that atomically initializes the token pool. # Combines the SET NX check and the DEL + RPUSH population into a single # atomic operation, closing the race window between the init-flag check # and the list population that existed with the prior setnx + pipeline approach. -# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit +# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit, ARGV[2] = TTL _INIT_SCRIPT = """\ -if redis.call('SET', KEYS[1], ARGV[1], 'NX') then +if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then redis.call('DEL', KEYS[2]) local n = tonumber(ARGV[1]) for i = 1, n do @@ -102,7 +108,7 @@ def _ensure_tokens(key: str, limit: int) -> None: try: prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity") prefixed_key = frappe.cache.make_key(key) - frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit)) + frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit), str(_CAPACITY_KEY_TTL)) except Exception: frappe.log_error("Concurrency limiter: Failed to initialize tokens") From 4eafb38f98656d148bf9f99164fb25fb55d35906 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:58:47 +0530 Subject: [PATCH 07/14] test: rewrite `concurrent_limit` tests to test through public interface --- frappe/tests/test_concurrency_limiter.py | 150 ++++++++++++++--------- 1 file changed, 92 insertions(+), 58 deletions(-) diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index bdf879891a..2c17d86d93 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -1,20 +1,30 @@ # Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE +import contextvars +import threading +from unittest.mock import MagicMock, patch + import frappe -from frappe.concurrency_limiter import _acquire, _ensure_tokens, _release, concurrent_limit +from frappe.concurrency_limiter import concurrent_limit from frappe.exceptions import ServiceUnavailableError from frappe.tests import IntegrationTestCase -def _cache_name(fn): +def _key(fn): + """Reconstruct the Redis key that concurrent_limit uses for a decorated function.""" return f"concurrency:{fn.__module__}.{fn.__qualname__}" +def _cleanup(fn): + key = _key(fn) + frappe.cache.delete_value([key, f"{key}:capacity"]) + + class TestConcurrentLimit(IntegrationTestCase): def test_bypassed_outside_request_context(self): - """Decorator is a complete no-op when called outside an HTTP request context - (background jobs, CLI, direct test calls). Even limit=0 must not reject.""" + """Decorator is a no-op outside HTTP request context (background jobs, CLI, tests). + Even limit=0 must not reject.""" calls = [] @concurrent_limit(limit=0) @@ -32,98 +42,122 @@ class TestConcurrentLimit(IntegrationTestCase): frappe.local.request = saved self.assertEqual(calls, [True]) - # Token pool must not have been touched - self.assertFalse(frappe.cache.exists(_cache_name(fn))) - def test_raises_immediately_when_limit_full(self): - """ServiceUnavailableError is raised at once when wait_timeout=0 and the - token pool is empty.""" + def test_pool_exhaustion_raises_503_with_retry_after_header(self): + """When all slots are occupied, the next request raises ServiceUnavailableError + (HTTP 503) immediately with wait_timeout=0. The Retry-After response header must be set.""" + in_fn = threading.Event() + proceed = threading.Event() @concurrent_limit(limit=1, wait_timeout=0) def fn(): - pass + in_fn.set() + proceed.wait() - key = _cache_name(fn) - _ensure_tokens(key, limit=1) - token = frappe.cache.lpop(key) # exhaust the pool + ctx = contextvars.copy_context() + def hold_slot(): + frappe.local.request = frappe._dict() + fn() + + t = threading.Thread(target=ctx.run, args=(hold_slot,)) + t.start() + self.assertTrue(in_fn.wait(timeout=5), "Thread did not acquire the slot in time") + + mock_headers = MagicMock() + saved_headers = getattr(frappe.local, "response_headers", None) try: frappe.local.request = frappe._dict() - self.assertRaises(ServiceUnavailableError, fn) + frappe.local.response_headers = mock_headers + with self.assertRaises(ServiceUnavailableError) as exc_ctx: + fn() + self.assertEqual(exc_ctx.exception.http_status_code, 503) + mock_headers.set.assert_called_once_with("Retry-After", "1") # max(1, wait_timeout=0) finally: + proceed.set() + t.join(timeout=5) del frappe.local.request - if token: - frappe.cache.lpush(key, token) - frappe.cache.delete_value([key, f"{key}:capacity"]) + frappe.local.response_headers = saved_headers + _cleanup(fn) - def test_counter_released_after_successful_call(self): - """Token pool has all tokens back after the wrapped function completes normally.""" + def test_token_released_on_success(self): + """A token is returned to the pool after a successful call, + so subsequent calls can acquire it without hitting a 503.""" @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_name(fn) try: frappe.local.request = frappe._dict() fn() - self.assertEqual(frappe.cache.llen(key), 1) + fn() # should not raise ServiceUnavailableError since the token was released after the first call finally: del frappe.local.request - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_counter_released_after_exception(self): - """Token pool has all tokens back even when the wrapped function raises.""" + def test_token_released_on_exception(self): + """A token is returned to the pool even when the wrapped function raises, + so subsequent calls can proceed with their own application error, not a 503.""" - @concurrent_limit(limit=2, wait_timeout=0) + @concurrent_limit(limit=1, wait_timeout=0) def fn(): raise ValueError("boom") - key = _cache_name(fn) try: frappe.local.request = frappe._dict() - self.assertRaises(ValueError, fn) - self.assertEqual(frappe.cache.llen(key), 2) + with self.assertRaises(ValueError): + fn() + # Second call must raise ValueError (application error), not + # ServiceUnavailableError — which would indicate the token was leaked. + with self.assertRaises(ValueError): + fn() finally: del frappe.local.request - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_service_unavailable_has_correct_http_status(self): - """The raised exception must carry http_status_code=503.""" - TIMEOUT = 1 + def test_self_heals_after_capacity_key_expiry(self): + """After the capacity key expires (simulating crashed workers + TTL), + the pool re-initializes to full capacity so new requests succeed.""" - @concurrent_limit(limit=1, wait_timeout=TIMEOUT) + @concurrent_limit(limit=1, wait_timeout=0) def fn(): pass - key = _cache_name(fn) - _ensure_tokens(key, limit=1) - token = frappe.cache.lpop(key) # exhaust the pool + key = _key(fn) + try: + frappe.local.request = frappe._dict() + fn() # initializes the pool via the decorator + + # Simulate all tokens being leaked (workers crashed mid-request) + # by draining the pool without returning tokens. + while frappe.cache.lpop(key): + pass + + # Simulate capacity key TTL expiry. + frappe.cache.delete_value(f"{key}:capacity") + + # Self-heal: next request must re-initialize the pool and succeed. + fn() # must not raise ServiceUnavailableError + finally: + del frappe.local.request + _cleanup(fn) + + def test_fails_open_when_redis_unavailable(self): + """When Redis is unavailable during acquire, the request proceeds normally + (fail-open) rather than raising ServiceUnavailableError.""" + calls = [] + + @concurrent_limit(limit=1, wait_timeout=0) + def fn(): + calls.append(True) try: frappe.local.request = frappe._dict() - with self.assertRaises(ServiceUnavailableError) as ctx: - fn() - self.assertEqual(ctx.exception.http_status_code, 503) + with patch.object(frappe.cache, "lpop", side_effect=Exception("Redis down")): + fn() # must not raise finally: del frappe.local.request - if token: - frappe.cache.lpush(key, token) - frappe.cache.delete_value([key, f"{key}:capacity"]) + _cleanup(fn) - def test_double_release_doesnt_exceed_limit(self): - """Releasing a token twice must not inflate the pool beyond the limit.""" - key = "concurrency:test.double_release" - LIMIT = 2 - - _ensure_tokens(key, limit=LIMIT) - token = _acquire(key, limit=LIMIT, wait_timeout=0) - self.assertIsNotNone(token) - - _release(key, token) - _release(key, token) # spurious extra release - - pool_size = frappe.cache.llen(key) - frappe.cache.delete_value([key, f"{key}:capacity"]) - - self.assertLessEqual(pool_size, LIMIT + 1) + self.assertEqual(calls, [True]) From d618a88f0198ee1e7418060763913eb11291c8b0 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 15:37:14 +0530 Subject: [PATCH 08/14] feat: derive concurrency limit from gunicorn master's cmdline Co-authored-by: Copilot --- frappe/concurrency_limiter.py | 48 +++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index dabff152f9..6ae0f2d49c 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -31,17 +31,55 @@ _DEFAULT_WAIT_TIMEOUT = 10 @redis_cache(shared=True) def _default_limit() -> int: - """Derive a sensible default concurrency limit from the number of gunicorn workers.""" - import multiprocessing + """Derive a sensible default concurrency limit from gunicorn's max concurrency.""" + return max(1, gunicorn_max_concurrency() // 2) - workers = frappe.conf.get("gunicorn_workers") or (multiprocessing.cpu_count() * 2 + 1) - return max(1, int(workers) // 2) + +def gunicorn_max_concurrency() -> int: + """Detect max concurrent requests from the running gunicorn master's cmdline. + + Reads /proc//cmdline to extract --workers and --threads without + shelling out. Falls back to a CPU-based heuristic on non-Linux platforms + or when not running under gunicorn (dev server, CLI, tests). + """ + import os + + fallback = 4 + + try: + ppid = os.getppid() + with open(f"/proc/{ppid}/cmdline", "rb") as f: + args = f.read().rstrip(b"\0").decode().split("\0") + + if not any("gunicorn" in a for a in args): + return fallback + + workers = _extract_cli_int(args, "-w", "--workers") or fallback + threads = _extract_cli_int(args, "--threads") or 1 + return workers * threads + except OSError: + return fallback + + +def _extract_cli_int(args: list[str], *flags: str) -> int | None: + """Return the integer value for a CLI flag from a split argument list. + + Handles both ``--flag value`` and ``--flag=value`` forms. + """ + for i, arg in enumerate(args): + for flag in flags: + if arg == flag and i + 1 < len(args): + return int(args[i + 1]) + if arg.startswith(f"{flag}="): + return int(arg.split("=", 1)[1]) + return None def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT_TIMEOUT): """Decorator that limits simultaneous in-flight executions of the wrapped function. - :param limit: Maximum number of concurrent executions. Defaults to ``gunicorn_workers // 2`` + :param limit: Maximum number of concurrent executions. Defaults to half of ``workers x threads`` + as detected from the gunicorn master process (or a CPU-based heuristic as fallback). :param wait_timeout: Seconds to wait for a free slot before returning 503. Defaults to 10 s. Suppressed for background jobs. """ From 65965b9c44d654d2e904267495f39b1e5050b75b Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sun, 19 Apr 2026 18:49:31 +0530 Subject: [PATCH 09/14] fix: use `site_cache` as `clear_cache` is broken for `redis_cache` --- frappe/concurrency_limiter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 6ae0f2d49c..fbaaf41b1a 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -23,13 +23,13 @@ from functools import wraps import frappe from frappe.exceptions import ServiceUnavailableError from frappe.utils import cint -from frappe.utils.caching import redis_cache +from frappe.utils.caching import site_cache # Default wait timeout (seconds) before returning 503 to the caller. _DEFAULT_WAIT_TIMEOUT = 10 -@redis_cache(shared=True) +@site_cache(ttl=3600) def _default_limit() -> int: """Derive a sensible default concurrency limit from gunicorn's max concurrency.""" return max(1, gunicorn_max_concurrency() // 2) From 7f78cd25f9893bb215dc58b421c91987470d7930 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sun, 19 Apr 2026 19:28:45 +0530 Subject: [PATCH 10/14] refactor: extract `RedisSemaphore` into `redis_semaphore.py` --- frappe/concurrency_limiter.py | 94 +++--------------------- frappe/utils/redis_semaphore.py | 126 ++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 84 deletions(-) create mode 100644 frappe/utils/redis_semaphore.py diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index fbaaf41b1a..431a08980d 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -24,6 +24,7 @@ import frappe from frappe.exceptions import ServiceUnavailableError from frappe.utils import cint from frappe.utils.caching import site_cache +from frappe.utils.redis_semaphore import RedisSemaphore # Default wait timeout (seconds) before returning 503 to the caller. _DEFAULT_WAIT_TIMEOUT = 10 @@ -36,12 +37,7 @@ def _default_limit() -> int: def gunicorn_max_concurrency() -> int: - """Detect max concurrent requests from the running gunicorn master's cmdline. - - Reads /proc//cmdline to extract --workers and --threads without - shelling out. Falls back to a CPU-based heuristic on non-Linux platforms - or when not running under gunicorn (dev server, CLI, tests). - """ + """Detect max concurrent requests from the running gunicorn master's cmdline.""" import os fallback = 4 @@ -79,9 +75,12 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT """Decorator that limits simultaneous in-flight executions of the wrapped function. :param limit: Maximum number of concurrent executions. Defaults to half of ``workers x threads`` - as detected from the gunicorn master process (or a CPU-based heuristic as fallback). + as detected from the gunicorn master process. :param wait_timeout: Seconds to wait for a free slot before returning 503. - Defaults to 10 s. Suppressed for background jobs. + Defaults to 10 s. + + The limiter is skipped entirely for background jobs, CLI commands, and + tests that call functions directly (i.e. outside of an HTTP request). """ def decorator(fn: Callable) -> Callable: @@ -95,7 +94,8 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT _limit = cint(limit) if limit is not None else _default_limit() key = f"concurrency:{fn.__module__}.{fn.__qualname__}" - token = _acquire(key, _limit, wait_timeout) + sem = RedisSemaphore(key, _limit, wait_timeout) + token = sem.acquire() if not token: retry_after = max(1, int(wait_timeout)) if (headers := getattr(frappe.local, "response_headers", None)) is not None: @@ -107,82 +107,8 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT try: return fn(*args, **kwargs) finally: - _release(key, token) + sem.release(token) return wrapper return decorator - - -# Safety TTL (seconds) for the capacity key — allows the pool to self-heal -# after a worker crash that leaked a token. The cap key expiring causes the -# next request to re-initialize the pool to full capacity. Must be longer -# than any realistic request, but short enough to recover from crashes. -_CAPACITY_KEY_TTL = 3600 # 1 hour - -# Lua script that atomically initializes the token pool. -# Combines the SET NX check and the DEL + RPUSH population into a single -# atomic operation, closing the race window between the init-flag check -# and the list population that existed with the prior setnx + pipeline approach. -# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit, ARGV[2] = TTL -_INIT_SCRIPT = """\ -if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then - redis.call('DEL', KEYS[2]) - local n = tonumber(ARGV[1]) - for i = 1, n do - redis.call('RPUSH', KEYS[2], tostring(i)) - end -end -""" - - -def _ensure_tokens(key: str, limit: int) -> None: - """Ensure the token pool is initialized atomically. - - A Lua script performs ``SET NX`` on the capacity key and populates the - token list in a single atomic operation, closing the race window between - the init-flag check and the list population. - """ - try: - prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity") - prefixed_key = frappe.cache.make_key(key) - frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit), str(_CAPACITY_KEY_TTL)) - except Exception: - frappe.log_error("Concurrency limiter: Failed to initialize tokens") - - -def _acquire(key: str, limit: int, wait_timeout: float) -> str | None: - """Try to acquire a token from the pool. - - For *wait_timeout* ≤ 0: uses LPOP (non-blocking). - For *wait_timeout* > 0: uses BLPOP (blocks until a token is available or - the timeout expires). - """ - try: - _ensure_tokens(key, limit) - - def _decode(result): - return result.decode() if isinstance(result, bytes) else result - - if wait_timeout <= 0: - result = frappe.cache.lpop(key) - return _decode(result) if result is not None else None - - # Returns (key_bytes, value_bytes) or None on timeout. - if result := frappe.cache.blpop(key, timeout=int(wait_timeout)): - return _decode(result[1]) - return None - - except Exception: - frappe.log_error("Concurrency limiter: Redis unavailable, skipping limit") - return "fallback" - - -def _release(key: str, token: str) -> None: - """Return the token to the pool.""" - if token == "fallback": - return - try: - frappe.cache.lpush(key, token) - except Exception: - frappe.log_error(f"Concurrency limiter: Failed to release token {token}") diff --git a/frappe/utils/redis_semaphore.py b/frappe/utils/redis_semaphore.py new file mode 100644 index 0000000000..ac036983a2 --- /dev/null +++ b/frappe/utils/redis_semaphore.py @@ -0,0 +1,126 @@ +# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors +# License: MIT. See LICENSE + +"""Distributed counting semaphore backed by a Redis LIST.""" + +import frappe + + +class RedisSemaphore: + """A distributed counting semaphore backed by a Redis LIST. + + Allows up to *limit* concurrent holders across all processes sharing the + same Redis instance. The token pool is lazily initialized via an atomic + Lua script and self-heals after crashes thanks to a TTL on the capacity + key. + + Usage as a context manager:: + + sem = RedisSemaphore("my-resource", limit=5, wait_timeout=10) + with sem: + ... # at most 5 concurrent holders + + Or acquire/release manually:: + + token = sem.acquire() + if token is None: + raise Exception("Too busy") + try: + ... + finally: + sem.release(token) + """ + + # Safety TTL (seconds) for the capacity key — allows the pool to self-heal + # after a worker crash that leaked a token. + CAPACITY_TTL = 3600 # 1 hour + + # Lua script that atomically initializes the token pool. + # KEYS[1] = capacity key, KEYS[2] = token list key + # ARGV[1] = limit, ARGV[2] = TTL + _INIT_SCRIPT = """\ +if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then + redis.call('DEL', KEYS[2]) + local n = tonumber(ARGV[1]) + for i = 1, n do + redis.call('RPUSH', KEYS[2], tostring(i)) + end +end +""" + + def __init__(self, key: str, limit: int, wait_timeout: float = 0): + """ + :param key: A unique Redis key name for this semaphore (will be + prefixed by the cache layer). + :param limit: Maximum number of concurrent holders. + :param wait_timeout: Seconds to block waiting for a free slot. + 0 means non-blocking (immediate return if unavailable). + """ + self.key = key + self.limit = limit + self.wait_timeout = wait_timeout + self._token: str | None = None + + def acquire(self) -> str | None: + """Try to acquire a token from the pool. + + Returns a token string on success, ``None`` if no slot was + available within *wait_timeout*, or ``"fallback"`` if Redis is + unreachable (fail-open). + """ + try: + self._ensure_tokens() + + if self.wait_timeout <= 0: + result = frappe.cache.lpop(self.key) + return self._decode(result) if result is not None else None + + if result := frappe.cache.blpop(self.key, timeout=int(self.wait_timeout)): + return self._decode(result[1]) + return None + + except Exception: + frappe.log_error(f"RedisSemaphore({self.key}): Redis unavailable, skipping limit") + return "fallback" + + def release(self, token: str) -> None: + """Return *token* to the pool.""" + if token == "fallback": + return + try: + frappe.cache.lpush(self.key, token) + except Exception: + frappe.log_error(f"RedisSemaphore({self.key}): Failed to release token {token}") + + # -- context-manager protocol ------------------------------------------ + + def __enter__(self): + self._token = self.acquire() + return self._token + + def __exit__(self, *exc_info): + if self._token is not None: + self.release(self._token) + self._token = None + + # -- internals --------------------------------------------------------- + + def _ensure_tokens(self) -> None: + """Lazily initialize the token pool via an atomic Lua script.""" + try: + prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity") + prefixed_key = frappe.cache.make_key(self.key) + frappe.cache.eval( + self._INIT_SCRIPT, + 2, + prefixed_cap_key, + prefixed_key, + str(self.limit), + str(self.CAPACITY_TTL), + ) + except Exception: + frappe.log_error(f"RedisSemaphore({self.key}): Failed to initialize tokens") + + @staticmethod + def _decode(result): + return result.decode() if isinstance(result, bytes) else result From 0064eb80b47c56cf6f0c427fd05dd1d4cf0ed10f Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Mon, 20 Apr 2026 13:15:41 +0530 Subject: [PATCH 11/14] fix: support shared `RedisSemaphores` for concurrency limits --- frappe/concurrency_limiter.py | 2 +- frappe/tests/test_concurrency_limiter.py | 6 +++--- frappe/utils/redis_semaphore.py | 15 +++++++++------ frappe/utils/redis_wrapper.py | 12 ++++++------ 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 431a08980d..2ad61552e0 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -94,7 +94,7 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT _limit = cint(limit) if limit is not None else _default_limit() key = f"concurrency:{fn.__module__}.{fn.__qualname__}" - sem = RedisSemaphore(key, _limit, wait_timeout) + sem = RedisSemaphore(key, _limit, wait_timeout, shared=True) token = sem.acquire() if not token: retry_after = max(1, int(wait_timeout)) diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index 2c17d86d93..b203e8822e 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -18,7 +18,7 @@ def _key(fn): def _cleanup(fn): key = _key(fn) - frappe.cache.delete_value([key, f"{key}:capacity"]) + frappe.cache.delete_value([key, f"{key}:capacity"], shared=True) class TestConcurrentLimit(IntegrationTestCase): @@ -131,11 +131,11 @@ class TestConcurrentLimit(IntegrationTestCase): # Simulate all tokens being leaked (workers crashed mid-request) # by draining the pool without returning tokens. - while frappe.cache.lpop(key): + while frappe.cache.lpop(key, shared=True): pass # Simulate capacity key TTL expiry. - frappe.cache.delete_value(f"{key}:capacity") + frappe.cache.delete_value(f"{key}:capacity", shared=True) # Self-heal: next request must re-initialize the pool and succeed. fn() # must not raise ServiceUnavailableError diff --git a/frappe/utils/redis_semaphore.py b/frappe/utils/redis_semaphore.py index ac036983a2..28653be7f7 100644 --- a/frappe/utils/redis_semaphore.py +++ b/frappe/utils/redis_semaphore.py @@ -48,17 +48,20 @@ if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then end """ - def __init__(self, key: str, limit: int, wait_timeout: float = 0): + def __init__(self, key: str, limit: int, wait_timeout: float = 0, shared: bool = False): """ :param key: A unique Redis key name for this semaphore (will be prefixed by the cache layer). :param limit: Maximum number of concurrent holders. :param wait_timeout: Seconds to block waiting for a free slot. 0 means non-blocking (immediate return if unavailable). + :param shared: If True, the semaphore key is bench-wide (not + prefixed with the site's db_name). Defaults to site-scoped. """ self.key = key self.limit = limit self.wait_timeout = wait_timeout + self.shared = shared self._token: str | None = None def acquire(self) -> str | None: @@ -72,10 +75,10 @@ end self._ensure_tokens() if self.wait_timeout <= 0: - result = frappe.cache.lpop(self.key) + result = frappe.cache.lpop(self.key, shared=self.shared) return self._decode(result) if result is not None else None - if result := frappe.cache.blpop(self.key, timeout=int(self.wait_timeout)): + if result := frappe.cache.blpop(self.key, timeout=int(self.wait_timeout), shared=self.shared): return self._decode(result[1]) return None @@ -88,7 +91,7 @@ end if token == "fallback": return try: - frappe.cache.lpush(self.key, token) + frappe.cache.lpush(self.key, token, shared=self.shared) except Exception: frappe.log_error(f"RedisSemaphore({self.key}): Failed to release token {token}") @@ -108,8 +111,8 @@ end def _ensure_tokens(self) -> None: """Lazily initialize the token pool via an atomic Lua script.""" try: - prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity") - prefixed_key = frappe.cache.make_key(self.key) + prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity", shared=self.shared) + prefixed_key = frappe.cache.make_key(self.key, shared=self.shared) frappe.cache.eval( self._INIT_SCRIPT, 2, diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 4a6236e7db..016fc3a74a 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -162,20 +162,20 @@ class RedisWrapper(redis.Redis): except redis.exceptions.ConnectionError: pass - def lpush(self, key, value): - return super().lpush(self.make_key(key), value) + def lpush(self, key, value, user=None, shared=False): + return super().lpush(self.make_key(key, user=user, shared=shared), value) def rpush(self, key, value): return super().rpush(self.make_key(key), value) - def lpop(self, key): - return super().lpop(self.make_key(key)) + def lpop(self, key, user=None, shared=False): + return super().lpop(self.make_key(key, user=user, shared=shared)) def rpop(self, key): return super().rpop(self.make_key(key)) - def blpop(self, key, timeout=0): - return super().blpop(self.make_key(key), timeout=timeout) + def blpop(self, key, timeout=0, user=None, shared=False): + return super().blpop(self.make_key(key, user=user, shared=shared), timeout=timeout) def setnx(self, name, value): return super().setnx(self.make_key(name), value) From 757f283eea70d74365f0245266e4f472d7bf0e2b Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Mon, 20 Apr 2026 13:16:43 +0530 Subject: [PATCH 12/14] feat: add `get_stats` function to retrieve concurrency limits --- frappe/__init__.py | 2 +- frappe/concurrency_limiter.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/frappe/__init__.py b/frappe/__init__.py index 8151db5edb..3866880e50 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -34,7 +34,6 @@ import orjson from werkzeug.datastructures import Headers import frappe -from frappe.concurrency_limiter import concurrent_limit from frappe.query_builder.utils import ( get_query, get_query_builder, @@ -1595,6 +1594,7 @@ from frappe.utils.error import log_error from frappe.utils.formatters import format_value from frappe.utils.print_utils import get_print, attach_print from frappe.email import sendmail +from frappe.concurrency_limiter import concurrent_limit # for backwards compatibility format = format_value diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 2ad61552e0..e0973e7593 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -112,3 +112,14 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT return wrapper return decorator + + +@frappe.whitelist() +def get_stats() -> dict: + frappe.only_for("System Manager") + cached_limit = _default_limit() + gunicorn_limit = gunicorn_max_concurrency() + return { + "cached_limit": cached_limit, + "gunicorn_limit": gunicorn_limit, + } From 850cc58664dd74449a28430000072303ec2c13ac Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Mon, 20 Apr 2026 14:19:05 +0530 Subject: [PATCH 13/14] fix: `clear_cache` for shared cache --- frappe/concurrency_limiter.py | 4 ++-- frappe/utils/caching.py | 2 +- frappe/utils/redis_wrapper.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index e0973e7593..ba056faec1 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -23,14 +23,14 @@ from functools import wraps import frappe from frappe.exceptions import ServiceUnavailableError from frappe.utils import cint -from frappe.utils.caching import site_cache +from frappe.utils.caching import redis_cache from frappe.utils.redis_semaphore import RedisSemaphore # Default wait timeout (seconds) before returning 503 to the caller. _DEFAULT_WAIT_TIMEOUT = 10 -@site_cache(ttl=3600) +@redis_cache(shared=True) def _default_limit() -> int: """Derive a sensible default concurrency limit from gunicorn's max concurrency.""" return max(1, gunicorn_max_concurrency() // 2) diff --git a/frappe/utils/caching.py b/frappe/utils/caching.py index f2f1f9aecd..f1e0c781ed 100644 --- a/frappe/utils/caching.py +++ b/frappe/utils/caching.py @@ -180,7 +180,7 @@ def redis_cache(ttl: int | None = 3600, user: str | bool | None = None, shared: func_key = f"{func.__module__}.{func.__qualname__}" def clear_cache(): - frappe.cache.delete_keys(func_key) + frappe.cache.delete_keys(func_key, user=user, shared=shared) func.clear_cache = clear_cache func.ttl = ttl if not callable(ttl) else 3600 diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 016fc3a74a..44a11ebf70 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -125,19 +125,19 @@ class RedisWrapper(redis.Redis): return ret - def get_keys(self, key): + def get_keys(self, key, user=None, shared=False): """Return keys starting with `key`.""" try: - key = self.make_key(key + "*") + key = self.make_key(key + "*", user=user, shared=shared) return self.keys(key) except redis.exceptions.ConnectionError: regex = re.compile(cstr(key).replace("|", r"\|").replace("*", r"[\w]*")) return [k for k in list(frappe.local.cache) if regex.match(cstr(k))] - def delete_keys(self, key): + def delete_keys(self, key, user=None, shared=False): """Delete keys with wildcard `*`.""" - self.delete_value(self.get_keys(key), make_keys=False) + self.delete_value(self.get_keys(key, user=user, shared=shared), make_keys=False) def delete_key(self, *args, **kwargs): self.delete_value(*args, **kwargs) From 53e7e3494811e89bedf232d2423483dbe43c1696 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Wed, 22 Apr 2026 16:20:30 +0530 Subject: [PATCH 14/14] refactor: make token initialization simple --- frappe/hooks.py | 1 + frappe/utils/redis_semaphore.py | 39 +++++++++++---------------------- frappe/utils/redis_wrapper.py | 3 --- 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/frappe/hooks.py b/frappe/hooks.py index 719d94d3c4..564ef34957 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -525,6 +525,7 @@ persistent_cache_keys = [ "monitor-transactions", "rate-limit-counter-*", "rl:*", + "concurrency:*", ] user_invitation = { diff --git a/frappe/utils/redis_semaphore.py b/frappe/utils/redis_semaphore.py index 28653be7f7..7228e6ba36 100644 --- a/frappe/utils/redis_semaphore.py +++ b/frappe/utils/redis_semaphore.py @@ -10,9 +10,8 @@ class RedisSemaphore: """A distributed counting semaphore backed by a Redis LIST. Allows up to *limit* concurrent holders across all processes sharing the - same Redis instance. The token pool is lazily initialized via an atomic - Lua script and self-heals after crashes thanks to a TTL on the capacity - key. + same Redis instance. The token pool is lazily initialized and self-heals + after crashes thanks to a TTL on the capacity key. Usage as a context manager:: @@ -35,19 +34,6 @@ class RedisSemaphore: # after a worker crash that leaked a token. CAPACITY_TTL = 3600 # 1 hour - # Lua script that atomically initializes the token pool. - # KEYS[1] = capacity key, KEYS[2] = token list key - # ARGV[1] = limit, ARGV[2] = TTL - _INIT_SCRIPT = """\ -if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then - redis.call('DEL', KEYS[2]) - local n = tonumber(ARGV[1]) - for i = 1, n do - redis.call('RPUSH', KEYS[2], tostring(i)) - end -end -""" - def __init__(self, key: str, limit: int, wait_timeout: float = 0, shared: bool = False): """ :param key: A unique Redis key name for this semaphore (will be @@ -109,18 +95,19 @@ end # -- internals --------------------------------------------------------- def _ensure_tokens(self) -> None: - """Lazily initialize the token pool via an atomic Lua script.""" + """Lazily initialize the token pool.""" try: - prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity", shared=self.shared) - prefixed_key = frappe.cache.make_key(self.key, shared=self.shared) - frappe.cache.eval( - self._INIT_SCRIPT, - 2, - prefixed_cap_key, - prefixed_key, - str(self.limit), - str(self.CAPACITY_TTL), + if frappe.cache.exists(f"{self.key}:capacity", shared=self.shared): + return + frappe.cache.set_value( + f"{self.key}:capacity", + self.limit, + expires_in_sec=self.CAPACITY_TTL, + shared=self.shared, ) + frappe.cache.delete_value(self.key, shared=self.shared) + for i in range(1, self.limit + 1): + frappe.cache.lpush(self.key, str(i), shared=self.shared) except Exception: frappe.log_error(f"RedisSemaphore({self.key}): Failed to initialize tokens") diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 44a11ebf70..1f02ebbbfd 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -177,9 +177,6 @@ class RedisWrapper(redis.Redis): def blpop(self, key, timeout=0, user=None, shared=False): return super().blpop(self.make_key(key, user=user, shared=shared), timeout=timeout) - def setnx(self, name, value): - return super().setnx(self.make_key(name), value) - def llen(self, key): return super().llen(self.make_key(key))