Fading Coder

One Final Commit for the Last Sprint

Home > Tools > Content

Distributed Locking Mechanisms in Redis

Tools 1

Two fundamental Redis commands form the basis of distributed locking: SETNX and SETEX.

SETNX key value sets the value only if the key does not exist. It returns 1 upon successful insertion and 0 if the key is already present.

SETEX key seconds value assigns a value to a key while simultaneously configuring its expiration time. This operation is atomic, eliminating the risk of a client setting the value but crashing before the timeout is applied, which would result in a deadlock.

In concurrent scenarios, such as deducting available stock, multiple threads operating simultaneously will lead to race conditions and incorrect inventory calculations.

Basic Lock Implementation

A primitive Redis lock utilizes the SET command with NX (Not Exists) and PX (Expiration in milliseconds) parameters. This ensures the lock is only acquired if absent and automatically expires after a specified duration.

Acquiring the Lock

The acquisition process loops until the lock is secured or a timeout threshold is reached.

public boolean acquireLock(String lockId) {
    long startTime = SystemTimeCache.currentTimestamp();
    try {
        while (true) {
            String response = jedis.set(RESOURCE_KEY, lockId, SET_PARAMS);
            if ("OK".equals(response)) {
                return true;
            }
            long elapsed = SystemTimeCache.currentTimestamp() - startTime;
            if (elapsed >= ACQUIRE_TIMEOUT_MS) {
                return false;
            }
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    } finally {
        jedis.close();
    }
}

Frequent invocations of System.currentTimeMillis() can introduce performance overhead. A background daemon thread caching the timestamp can mitigate this cost.

public class SystemTimeCache {
    private static final AtomicLong CACHED_TIME = new AtomicLong(System.currentTimeMillis());

    static {
        Thread updater = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                CACHED_TIME.set(System.currentTimeMillis());
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        updater.setDaemon(true);
        updater.start();
    }

    public static long currentTimestamp() {
        return CACHED_TIME.get();
    }
}

Releasing the Lock

Unlocking requires validating the lock owner to prevent accidentally removing another thread's lock. A Lua script ensures this check and deletion occur atomically.

public boolean releaseLock(String lockId) {
    String luaScript = 
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "   return redis.call('del', KEYS[1]) " +
        "else " +
        "   return 0 " +
        "end";
    try {
        Object evalResult = jedis.eval(luaScript, Collections.singletonList(RESOURCE_KEY), Collections.singletonList(lockId));
        return "1".equals(evalResult.toString());
    } finally {
        jedis.close();
    }
}

Reentrant Locking with Redisson

The basic lock implementation lacks reentrancy. A robust distributed lock must allow the same thread to acquire it multiple times. Redisson provides a fully featured, reentrant distributed lock.

ExecutorService executor = Executors.newFixedThreadPool(WORKER_COUNT);
Config cfg = new Config();
cfg.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redisson = Redisson.create(cfg);
RLock distributedLock = redisson.getLock("inventoryLock");

for (int i = 0; i < TASK_COUNT; i++) {
    executor.submit(() -> {
        distributedLock.lock();
        try {
            stockCount.decrementAndGet();
            System.out.println("Current stock: " + stockCount.get());
        } finally {
            distributedLock.unlock();
        }
    });
}

Lock Acquisition Interanls

When lock() is invoked, Redisson attempts to acquire the lock using the current thread ID. If unsuccessful, it subscribes to a Redis channel dedicated to that lock and waits for a release notification, avoiding constant polling.

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long currentThreadId = Thread.currentThread().getId();
    Long remainingTtl = tryAcquire(leaseTime, unit, currentThreadId);

    if (remainingTtl == null) {
        return;
    }

    RFuture<RedissonLockEntry> subscriptionFuture = subscribe(currentThreadId);
    commandExecutor.syncSubscription(subscriptionFuture);

    try {
        while (true) {
            remainingTtl = tryAcquire(leaseTime, unit, currentThreadId);
            if (remainingTtl == null) {
                break;
            }
            if (remainingTtl >= 0) {
                getEntry(currentThreadId).getLatch().tryAcquire(remainingTtl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(currentThreadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(subscriptionFuture, currentThreadId);
    }
}

If no explicit lease time is provided, Redisson defaults to a 30-second timeout and employs a watchdog mechanism. This background thread continuously renews the lock expiration while the client is active.

private <T> RFuture<Long> attemptAcquireAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
    if (leaseTime != -1) {
        return executeLockAsync(leaseTime, unit, currentThreadId, RedisCommands.EVAL_LONG);
    }

    RFuture<Long> ttlFuture = executeLockAsync(
        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, currentThreadId, RedisCommands.EVAL_LONG);

    ttlFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(currentThreadId);
            }
        }
    });
    return ttlFuture;
}

Underlying Lua Script for Locking

Redisson executes a Lua script to guarantee atomicity during lock acquisition. It utilizes a Hash structure to track reentrancy counts.

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
return redis.call('pttl', KEYS[1]);
  • If the lock key does not exist, it creates the hash, sets the entry count to 1, and applies the expirtaion.
  • If the lock exists and belongs to the current thread, it increments the counter and refreshes the timeout.
  • If the lock belongs to a different thread, it returns the remaining time to live (TTL).

Underlying Lua Script for Unlocking

Releasing the lock involves decrementing the reentrancy counter. If the counter reaches zero, the key is deleted, and a release message is published.

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end; 
return nil;
  • If the key no longer exists, it publishes an unlock event.
  • If the unlocking thread does not hold the lock, it aborts.
  • If the lock count drops above zero, it refreshes the expiration and maintains the lock.
  • If the lock count drops to zero or below, it deletes the key and publishes the unlock event.

Related Articles

Efficient Usage of HTTP Client in IntelliJ IDEA

IntelliJ IDEA incorporates a versatile HTTP client tool, enabling developres to interact with RESTful services and APIs effectively with in the editor. This functionality streamlines workflows, replac...

Installing CocoaPods on macOS Catalina (10.15) Using a User-Managed Ruby

System Ruby on macOS 10.15 frequently fails to build native gems required by CocoaPods (for example, ffi), leading to errors like: ERROR: Failed to build gem native extension checking for ffi.h... no...

Resolve PhpStorm "Interpreter is not specified or invalid" on WAMP (Windows)

Symptom PhpStorm displays: "Interpreter is not specified or invalid. Press ‘Fix’ to edit your project configuration." This occurs when the IDE cannot locate a valid PHP CLI executable or when the debu...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.