diff --git a/frappe/concurrency_limiter.py b/frappe/concurrency_limiter.py index ae08877de3..5915296e8c 100644 --- a/frappe/concurrency_limiter.py +++ b/frappe/concurrency_limiter.py @@ -76,26 +76,33 @@ def concurrent_limit(limit: int | None = None, wait_timeout: int = _DEFAULT_WAIT return decorator -def _ensure_tokens(key: str, limit: int) -> None: - """Ensure the token pool is initialized with the correct number of tokens. +# Lua script that atomically initializes the token pool. +# Combines the SET NX check and the DEL + RPUSH population into a single +# atomic operation, closing the race window between the init-flag check +# and the list population that existed with the prior setnx + pipeline approach. +# KEYS[1] = capacity key, KEYS[2] = token list key, ARGV[1] = limit +_INIT_SCRIPT = """\ +if redis.call('SET', KEYS[1], ARGV[1], 'NX') then + redis.call('DEL', KEYS[2]) + local n = tonumber(ARGV[1]) + for i = 1, n do + redis.call('RPUSH', KEYS[2], tostring(i)) + end +end +""" - Uses ``SET NX`` on a separate capacity key as an atomic init-once flag so - the pool is never re-filled just because all tokens are legitimately in use - (empty list ≠ uninitialised). + +def _ensure_tokens(key: str, limit: int) -> None: + """Ensure the token pool is initialized atomically. + + A Lua script performs ``SET NX`` on the capacity key and populates the + token list in a single atomic operation, closing the race window between + the init-flag check and the list population. """ try: - cap_key = f"{key}:capacity" - - if not frappe.cache.setnx(cap_key, str(limit)): - return # already initialized - - # initialize the token pool - prefixed = frappe.cache.make_key(key) - pipe = frappe.cache.pipeline(transaction=True) - pipe.delete(prefixed) - for i in range(limit): - pipe.rpush(prefixed, str(i)) - pipe.execute() + prefixed_cap_key = frappe.cache.make_key(f"{key}:capacity") + prefixed_key = frappe.cache.make_key(key) + frappe.cache.eval(_INIT_SCRIPT, 2, prefixed_cap_key, prefixed_key, str(limit)) except Exception: frappe.log_error("Concurrency limiter: Failed to initialize tokens")