refactor: make token initialization simple

This commit is contained in:
Saqib Ansari 2026-04-22 16:20:30 +05:30
parent 850cc58664
commit 53e7e34948
3 changed files with 14 additions and 29 deletions

View file

@ -525,6 +525,7 @@ persistent_cache_keys = [
"monitor-transactions",
"rate-limit-counter-*",
"rl:*",
"concurrency:*",
]
user_invitation = {

View file

@ -10,9 +10,8 @@ class RedisSemaphore:
"""A distributed counting semaphore backed by a Redis LIST.
Allows up to *limit* concurrent holders across all processes sharing the
same Redis instance. The token pool is lazily initialized via an atomic
Lua script and self-heals after crashes thanks to a TTL on the capacity
key.
same Redis instance. The token pool is lazily initialized and self-heals
after crashes thanks to a TTL on the capacity key.
Usage as a context manager::
@ -35,19 +34,6 @@ class RedisSemaphore:
# after a worker crash that leaked a token.
CAPACITY_TTL = 3600 # 1 hour
# Lua script that atomically initializes the token pool.
# KEYS[1] = capacity key, KEYS[2] = token list key
# ARGV[1] = limit, ARGV[2] = TTL
_INIT_SCRIPT = """\
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then
redis.call('DEL', KEYS[2])
local n = tonumber(ARGV[1])
for i = 1, n do
redis.call('RPUSH', KEYS[2], tostring(i))
end
end
"""
def __init__(self, key: str, limit: int, wait_timeout: float = 0, shared: bool = False):
"""
:param key: A unique Redis key name for this semaphore (will be
@ -109,18 +95,19 @@ end
# -- internals ---------------------------------------------------------
def _ensure_tokens(self) -> None:
"""Lazily initialize the token pool via an atomic Lua script."""
"""Lazily initialize the token pool."""
try:
prefixed_cap_key = frappe.cache.make_key(f"{self.key}:capacity", shared=self.shared)
prefixed_key = frappe.cache.make_key(self.key, shared=self.shared)
frappe.cache.eval(
self._INIT_SCRIPT,
2,
prefixed_cap_key,
prefixed_key,
str(self.limit),
str(self.CAPACITY_TTL),
if frappe.cache.exists(f"{self.key}:capacity", shared=self.shared):
return
frappe.cache.set_value(
f"{self.key}:capacity",
self.limit,
expires_in_sec=self.CAPACITY_TTL,
shared=self.shared,
)
frappe.cache.delete_value(self.key, shared=self.shared)
for i in range(1, self.limit + 1):
frappe.cache.lpush(self.key, str(i), shared=self.shared)
except Exception:
frappe.log_error(f"RedisSemaphore({self.key}): Failed to initialize tokens")

View file

@ -177,9 +177,6 @@ class RedisWrapper(redis.Redis):
def blpop(self, key, timeout=0, user=None, shared=False):
return super().blpop(self.make_key(key, user=user, shared=shared), timeout=timeout)
def setnx(self, name, value):
return super().setnx(self.make_key(name), value)
def llen(self, key):
return super().llen(self.make_key(key))