From 53e7e3494811e89bedf232d2423483dbe43c1696 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Wed, 22 Apr 2026 16:20:30 +0530 Subject: [PATCH] refactor: make token initialization simple --- frappe/hooks.py | 1 + frappe/utils/redis_semaphore.py | 39 +++++++++++---------------------- frappe/utils/redis_wrapper.py | 3 --- 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/frappe/hooks.py b/frappe/hooks.py index 719d94d3c4..564ef34957 100644 --- a/frappe/hooks.py +++ b/frappe/hooks.py @@ -525,6 +525,7 @@ persistent_cache_keys = [ "monitor-transactions", "rate-limit-counter-*", "rl:*", + "concurrency:*", ] user_invitation = { diff --git a/frappe/utils/redis_semaphore.py b/frappe/utils/redis_semaphore.py index 28653be7f7..7228e6ba36 100644 --- a/frappe/utils/redis_semaphore.py +++ b/frappe/utils/redis_semaphore.py @@ -10,9 +10,8 @@ class RedisSemaphore: """A distributed counting semaphore backed by a Redis LIST. Allows up to *limit* concurrent holders across all processes sharing the - same Redis instance. The token pool is lazily initialized via an atomic - Lua script and self-heals after crashes thanks to a TTL on the capacity - key. + same Redis instance. The token pool is lazily initialized and self-heals + after crashes thanks to a TTL on the capacity key. Usage as a context manager:: @@ -35,19 +34,6 @@ class RedisSemaphore: # after a worker crash that leaked a token. CAPACITY_TTL = 3600 # 1 hour - # Lua script that atomically initializes the token pool. - # KEYS[1] = capacity key, KEYS[2] = token list key - # ARGV[1] = limit, ARGV[2] = TTL - _INIT_SCRIPT = """\ -if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then - redis.call('DEL', KEYS[2]) - local n = tonumber(ARGV[1]) - for i = 1, n do - redis.call('RPUSH', KEYS[2], tostring(i)) - end -end -""" - def __init__(self, key: str, limit: int, wait_timeout: float = 0, shared: bool = False): """ :param key: A unique Redis key name for this semaphore (will be @@ -109,18 +95,19 @@ end # -- internals --------------------------------------------------------- def _ensure_tokens(self) -> None: - """Lazily initialize the token pool via an atomic Lua script.""" + """Lazily initialize the token pool.""" try: - prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity", shared=self.shared) - prefixed_key = frappe.cache.make_key(self.key, shared=self.shared) - frappe.cache.eval( - self._INIT_SCRIPT, - 2, - prefixed_cap_key, - prefixed_key, - str(self.limit), - str(self.CAPACITY_TTL), + if frappe.cache.exists(f"{self.key}:capacity", shared=self.shared): + return + frappe.cache.set_value( + f"{self.key}:capacity", + self.limit, + expires_in_sec=self.CAPACITY_TTL, + shared=self.shared, ) + frappe.cache.delete_value(self.key, shared=self.shared) + for i in range(1, self.limit + 1): + frappe.cache.lpush(self.key, str(i), shared=self.shared) except Exception: frappe.log_error(f"RedisSemaphore({self.key}): Failed to initialize tokens") diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index 44a11ebf70..1f02ebbbfd 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -177,9 +177,6 @@ class RedisWrapper(redis.Redis): def blpop(self, key, timeout=0, user=None, shared=False): return super().blpop(self.make_key(key, user=user, shared=shared), timeout=timeout) - def setnx(self, name, value): - return super().setnx(self.make_key(name), value) - def llen(self, key): return super().llen(self.make_key(key))