Distributed Locking Mechanisms in Redis
Two fundamental Redis commands form the basis of distributed locking: SETNX and SETEX.
SETNX key value sets the value only if the key does not exist. It returns 1 upon successful insertion and 0 if the key is already present.
SETEX key seconds value assigns a value to a key while simultaneously configuring its expiration time. This operation is atomic, eliminating the risk of a client setting the value but crashing before the timeout is applied, which would result in a deadlock.
In concurrent scenarios, such as deducting available stock, multiple threads operating simultaneously will lead to race conditions and incorrect inventory calculations.
Basic Lock Implementation
A primitive Redis lock utilizes the SET command with NX (Not Exists) and PX (Expiration in milliseconds) parameters. This ensures the lock is only acquired if absent and automatically expires after a specified duration.
Acquiring the Lock
The acquisition process loops until the lock is secured or a timeout threshold is reached.
public boolean acquireLock(String lockId) {
long startTime = SystemTimeCache.currentTimestamp();
try {
while (true) {
String response = jedis.set(RESOURCE_KEY, lockId, SET_PARAMS);
if ("OK".equals(response)) {
return true;
}
long elapsed = SystemTimeCache.currentTimestamp() - startTime;
if (elapsed >= ACQUIRE_TIMEOUT_MS) {
return false;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
} finally {
jedis.close();
}
}
Frequent invocations of System.currentTimeMillis() can introduce performance overhead. A background daemon thread caching the timestamp can mitigate this cost.
public class SystemTimeCache {
private static final AtomicLong CACHED_TIME = new AtomicLong(System.currentTimeMillis());
static {
Thread updater = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
CACHED_TIME.set(System.currentTimeMillis());
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
updater.setDaemon(true);
updater.start();
}
public static long currentTimestamp() {
return CACHED_TIME.get();
}
}
Releasing the Lock
Unlocking requires validating the lock owner to prevent accidentally removing another thread's lock. A Lua script ensures this check and deletion occur atomically.
public boolean releaseLock(String lockId) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
try {
Object evalResult = jedis.eval(luaScript, Collections.singletonList(RESOURCE_KEY), Collections.singletonList(lockId));
return "1".equals(evalResult.toString());
} finally {
jedis.close();
}
}
Reentrant Locking with Redisson
The basic lock implementation lacks reentrancy. A robust distributed lock must allow the same thread to acquire it multiple times. Redisson provides a fully featured, reentrant distributed lock.
ExecutorService executor = Executors.newFixedThreadPool(WORKER_COUNT);
Config cfg = new Config();
cfg.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redisson = Redisson.create(cfg);
RLock distributedLock = redisson.getLock("inventoryLock");
for (int i = 0; i < TASK_COUNT; i++) {
executor.submit(() -> {
distributedLock.lock();
try {
stockCount.decrementAndGet();
System.out.println("Current stock: " + stockCount.get());
} finally {
distributedLock.unlock();
}
});
}
Lock Acquisition Interanls
When lock() is invoked, Redisson attempts to acquire the lock using the current thread ID. If unsuccessful, it subscribes to a Redis channel dedicated to that lock and waits for a release notification, avoiding constant polling.
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long currentThreadId = Thread.currentThread().getId();
Long remainingTtl = tryAcquire(leaseTime, unit, currentThreadId);
if (remainingTtl == null) {
return;
}
RFuture<RedissonLockEntry> subscriptionFuture = subscribe(currentThreadId);
commandExecutor.syncSubscription(subscriptionFuture);
try {
while (true) {
remainingTtl = tryAcquire(leaseTime, unit, currentThreadId);
if (remainingTtl == null) {
break;
}
if (remainingTtl >= 0) {
getEntry(currentThreadId).getLatch().tryAcquire(remainingTtl, TimeUnit.MILLISECONDS);
} else {
getEntry(currentThreadId).getLatch().acquire();
}
}
} finally {
unsubscribe(subscriptionFuture, currentThreadId);
}
}
If no explicit lease time is provided, Redisson defaults to a 30-second timeout and employs a watchdog mechanism. This background thread continuously renews the lock expiration while the client is active.
private <T> RFuture<Long> attemptAcquireAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
if (leaseTime != -1) {
return executeLockAsync(leaseTime, unit, currentThreadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlFuture = executeLockAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, currentThreadId, RedisCommands.EVAL_LONG);
ttlFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
if (ttlRemaining == null) {
scheduleExpirationRenewal(currentThreadId);
}
}
});
return ttlFuture;
}
Underlying Lua Script for Locking
Redisson executes a Lua script to guarantee atomicity during lock acquisition. It utilizes a Hash structure to track reentrancy counts.
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
- If the lock key does not exist, it creates the hash, sets the entry count to 1, and applies the expirtaion.
- If the lock exists and belongs to the current thread, it increments the counter and refreshes the timeout.
- If the lock belongs to a different thread, it returns the remaining time to live (TTL).
Underlying Lua Script for Unlocking
Releasing the lock involves decrementing the reentrancy counter. If the counter reaches zero, the key is deleted, and a release message is published.
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
- If the key no longer exists, it publishes an unlock event.
- If the unlocking thread does not hold the lock, it aborts.
- If the lock count drops above zero, it refreshes the expiration and maintains the lock.
- If the lock count drops to zero or below, it deletes the key and publishes the unlock event.