fix: atomically initialize token pool via Lua script in _ensure_tokens

Replace the `setnx` + pipeline pair with a Lua script evaluated in a
single round-trip. The prior approach had a race window: between the
`SET NX` succeeding and the `MULTI/EXEC` pipeline running, a concurrent
worker could BLPOP from the list just before `DEL` wiped it — losing
tokens permanently. A process crash in that window left the capacity flag
set but the token list empty, breaking the semaphore with no recovery path.

The Lua script makes the check-and-initialize atomic: Redis executes it as
a single unit with no interleaving, so the race window is closed.
This commit is contained in:
Saqib Ansari 2026-04-18 14:25:30 +05:30
parent e8c7eb946b
commit 8589f26ce9

View file

@ -76,26 +76,33 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT
return decorator
def _ensure_tokens(key: str, limit: int) -> None:
"""Ensure the token pool is initialized with the correct number of tokens.
# Lua script that atomically initializes the token pool.
# Combines the SET NX check and the DEL + RPUSH population into a single
# atomic operation, closing the race window between the init-flag check
# and the list population that existed with the prior setnx + pipeline approach.
# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit
_INIT_SCRIPT = """\
if redis.call('SET', KEYS[1], ARGV[1], 'NX') then
redis.call('DEL', KEYS[2])
local n = tonumber(ARGV[1])
for i = 1, n do
redis.call('RPUSH', KEYS[2], tostring(i))
end
end
"""
Uses ``SET NX`` on a separate capacity key as an atomic init-once flag so
the pool is never re-filled just because all tokens are legitimately in use
(empty list uninitialised).
def _ensure_tokens(key: str, limit: int) -> None:
"""Ensure the token pool is initialized atomically.
A Lua script performs ``SET NX`` on the capacity key and populates the
token list in a single atomic operation, closing the race window between
the init-flag check and the list population.
"""
try:
cap_key = f"{key}:capacity"
if not frappe.cache.setnx(cap_key, str(limit)):
return # already initialized
# initialize the token pool
prefixed = frappe.cache.make_key(key)
pipe = frappe.cache.pipeline(transaction=True)
pipe.delete(prefixed)
for i in range(limit):
pipe.rpush(prefixed, str(i))
pipe.execute()
prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity")
prefixed_key = frappe.cache.make_key(key)
frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit))
except Exception:
frappe.log_error("Concurrency limiter: Failed to initialize tokens")