From 0064eb80b47c56cf6f0c427fd05dd1d4cf0ed10f Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Mon, 20 Apr 2026 13:15:41 +0530 Subject: [PATCH] fix: support shared `RedisSemaphores` for concurrency limits --- frappe/concurrency_limiter.py | 2 +- frappe/tests/test_concurrency_limiter.py | 6 +++--- frappe/utils/redis_semaphore.py | 15 +++++++++------ frappe/utils/redis_wrapper.py | 12 ++++++------ 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index 431a08980d..2ad61552e0 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -94,7 +94,7 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT _limit = cint(limit) if limit is not None else _default_limit() key = f"concurrency:{fn.__module__}.{fn.__qualname__}" - sem = RedisSemaphore(key, _limit, wait_timeout) + sem = RedisSemaphore(key, _limit, wait_timeout, shared=True) token = sem.acquire() if not token: retry_after = max(1, int(wait_timeout)) diff --git a/frappe/tests/test_concurrency_limiter.py b/frappe/tests/test_concurrency_limiter.py index 2c17d86d93..b203e8822e 100644 --- a/frappe/tests/test_concurrency_limiter.py +++ b/frappe/tests/test_concurrency_limiter.py @@ -18,7 +18,7 @@ def _key(fn): def _cleanup(fn): key = _key(fn) - frappe.cache.delete_value([key, f"{key}:capacity"]) + frappe.cache.delete_value([key, f"{key}:capacity"], shared=True) class TestConcurrentLimit(IntegrationTestCase): @@ -131,11 +131,11 @@ class TestConcurrentLimit(IntegrationTestCase): # Simulate all tokens being leaked (workers crashed mid-request) # by draining the pool without returning tokens. - while frappe.cache.lpop(key): + while frappe.cache.lpop(key, shared=True): pass # Simulate capacity key TTL expiry. - frappe.cache.delete_value(f"{key}:capacity") + frappe.cache.delete_value(f"{key}:capacity", shared=True) # Self-heal: next request must re-initialize the pool and succeed. fn() # must not raise ServiceUnavailableError diff --git a/frappe/utils/redis_semaphore.py b/frappe/utils/redis_semaphore.py index ac036983a2..28653be7f7 100644 --- a/frappe/utils/redis_semaphore.py +++ b/frappe/utils/redis_semaphore.py @@ -48,17 +48,20 @@ if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then end """ - def __init__(self, key: str, limit: int, wait_timeout: float = 0): + def __init__(self, key: str, limit: int, wait_timeout: float = 0, shared: bool = False): """ :param key: A unique Redis key name for this semaphore (will be prefixed by the cache layer). :param limit: Maximum number of concurrent holders. :param wait_timeout: Seconds to block waiting for a free slot. 0 means non-blocking (immediate return if unavailable). + :param shared: If True, the semaphore key is bench-wide (not + prefixed with the site's db_name). Defaults to site-scoped. """ self.key = key self.limit = limit self.wait_timeout = wait_timeout + self.shared = shared self._token: str | None = None def acquire(self) -> str | None: @@ -72,10 +75,10 @@ end self._ensure_tokens() if self.wait_timeout <= 0: - result = frappe.cache.lpop(self.key) + result = frappe.cache.lpop(self.key, shared=self.shared) return self._decode(result) if result is not None else None - if result := frappe.cache.blpop(self.key, timeout=int(self.wait_timeout)): + if result := frappe.cache.blpop(self.key, timeout=int(self.wait_timeout), shared=self.shared): return self._decode(result[1]) return None @@ -88,7 +91,7 @@ end if token == "fallback": return try: - frappe.cache.lpush(self.key, token) + frappe.cache.lpush(self.key, token, shared=self.shared) except Exception: frappe.log_error(f"RedisSemaphore({self.key}): Failed to release token {token}") @@ -108,8 +111,8 @@ end def _ensure_tokens(self) -> None: """Lazily initialize the token pool via an atomic Lua script.""" try: - prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity") - prefixed_key = frappe.cache.make_key(self.key) + prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity", shared=self.shared) + prefixed_key = frappe.cache.make_key(self.key, shared=self.shared) frappe.cache.eval( self._INIT_SCRIPT, 2, diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 4a6236e7db..016fc3a74a 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -162,20 +162,20 @@ class RedisWrapper(redis.Redis): except redis.exceptions.ConnectionError: pass - def lpush(self, key, value): - return super().lpush(self.make_key(key), value) + def lpush(self, key, value, user=None, shared=False): + return super().lpush(self.make_key(key, user=user, shared=shared), value) def rpush(self, key, value): return super().rpush(self.make_key(key), value) - def lpop(self, key): - return super().lpop(self.make_key(key)) + def lpop(self, key, user=None, shared=False): + return super().lpop(self.make_key(key, user=user, shared=shared)) def rpop(self, key): return super().rpop(self.make_key(key)) - def blpop(self, key, timeout=0): - return super().blpop(self.make_key(key), timeout=timeout) + def blpop(self, key, timeout=0, user=None, shared=False): + return super().blpop(self.make_key(key, user=user, shared=shared), timeout=timeout) def setnx(self, name, value): return super().setnx(self.make_key(name), value)