From 8589f26ce9af415ee03a679517b2ddf38ef5e312 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Sat, 18 Apr 2026 14:25:30 +0530 Subject: [PATCH] fix: atomically initialize token pool via Lua script in `_ensure_tokens` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the `setnx` + pipeline pair with a Lua script evaluated in a single round-trip. The prior approach had a race window: between the `SET NX` succeeding and the `MULTI/EXEC` pipeline running, a concurrent worker could BLPOP from the list just before `DEL` wiped it — losing tokens permanently. A process crash in that window left the capacity flag set but the token list empty, breaking the semaphore with no recovery path. The Lua script makes the check-and-initialize atomic: Redis executes it as a single unit with no interleaving, so the race window is closed. --- frappe/concurrency_limiter.py | 41 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index ae08877de3..5915296e8c 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -76,26 +76,33 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT return decorator -def _ensure_tokens(key: str, limit: int) -> None: - """Ensure the token pool is initialized with the correct number of tokens. +# Lua script that atomically initializes the token pool. +# Combines the SET NX check and the DEL + RPUSH population into a single +# atomic operation, closing the race window between the init-flag check +# and the list population that existed with the prior setnx + pipeline approach. +# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit +_INIT_SCRIPT = """\ +if redis.call('SET', KEYS[1], ARGV[1], 'NX') then + redis.call('DEL', KEYS[2]) + local n = tonumber(ARGV[1]) + for i = 1, n do + redis.call('RPUSH', KEYS[2], tostring(i)) + end +end +""" - Uses ``SET NX`` on a separate capacity key as an atomic init-once flag so - the pool is never re-filled just because all tokens are legitimately in use - (empty list ≠ uninitialised). + +def _ensure_tokens(key: str, limit: int) -> None: + """Ensure the token pool is initialized atomically. + + A Lua script performs ``SET NX`` on the capacity key and populates the + token list in a single atomic operation, closing the race window between + the init-flag check and the list population. """ try: - cap_key = f"{key}:capacity" - - if not frappe.cache.setnx(cap_key, str(limit)): - return # already initialized - - # initialize the token pool - prefixed = frappe.cache.make_key(key) - pipe = frappe.cache.pipeline(transaction=True) - pipe.delete(prefixed) - for i in range(limit): - pipe.rpush(prefixed, str(i)) - pipe.execute() + prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity") + prefixed_key = frappe.cache.make_key(key) + frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit)) except Exception: frappe.log_error("Concurrency limiter: Failed to initialize tokens")