Implementing a Redis-Based Distributed Lock with Spring AOP and Watchdog Renewal
1. Defining the Lock Annotation
To create a declarative distributed locking mechanism, we start by defining a custom annotation. This annotation will allow us to mark methods that require synchronization across distributed instances.
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface DistributedLock {
long DEFAULT_LEASE_TIME = 30L;
long DEFAULT_WAIT_TIME = 10L;
/**
* The specific resource key for the lock.
*/
String resourceKey() default "global_lock";
/**
* The duration the lock should be held before automatic expiration (in seconds).
*/
long leaseTime() default DEFAULT_LEASE_TIME;
/**
* The maximum time to wait for acquiring the lock (in seconds).
*/
long waitTime() default DEFAULT_WAIT_TIME;
}
The leaseTime defines the TTL (Time To Live) for the lock key in Redis, preventing deadlocks in case of crashes. The waitTime allows the client to retry acquisition for a specified period rather than failing immediately.
2. Usage Scenario
Below is an example of a service managing inventory. To prevent race conditions during stock deduction, we apply the @DistributedLock annotation to the modification method.
@Service
public class InventoryManager {
@DistributedLock(resourceKey = "inventory_update", leaseTime = 15, waitTime = 5)
public void reduceStock(Long itemId) {
// Fetch item
InventoryItem item = repository.findById(itemId);
// Modify state
item.setQuantity(item.getQuantity() - 1);
// Persist changes
repository.save(item);
}
}
3. AOP Interceptor Implementation
We utilize Spring AOP to intercept methods annotated with @DistributedLock. The aspect will handle the logic of acquiring the lock, executing the business method, and ensuring the lock is released.
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class LockInterceptorAspect {
private final RedisTemplate<String, String> redisTemplate;
private final WatchdogScheduler watchdogScheduler;
private static final String LOCK_PREFIX = "dist_lock:";
@Pointcut("@annotation(com.example.lock.DistributedLock)")
public void lockPointcut() {}
@Around("lockPointcut()")
public Object handleLock(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
DistributedLock lockAnnotation = signature.getMethod().getAnnotation(DistributedLock.class);
String lockKey = LOCK_PREFIX + lockAnnotation.resourceKey();
String requestId = UUID.randomUUID().toString();
long leaseDuration = lockAnnotation.leaseTime();
long acquireDeadline = System.currentTimeMillis() + lockAnnotation.waitTime() * 1000;
boolean lockAcquired = false;
try {
// Spin until lock is acquired or timeout
while (System.currentTimeMillis() < acquireDeadline) {
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, requestId, leaseDuration, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(success)) {
lockAcquired = true;
// Register watchdog for auto-renewal
watchdogScheduler.registerTask(new RenewalEntry(lockKey, requestId, leaseDuration, Thread.currentThread()));
break;
}
Thread.sleep(100); // Backoff before retry
}
if (!lockAcquired) {
throw new IllegalStateException("Unable to acquire lock within specified timeout");
}
return joinPoint.proceed();
} catch (Exception ex) {
log.error("Error during locked execution", ex);
throw ex;
} finally {
if (lockAcquired) {
releaseLock(lockKey, requestId);
}
}
}
private void releaseLock(String key, String identifier) {
// Lua script ensures atomicity of check-and-delete
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), identifier);
}
}
4. Lock Renewal (Watchdog) Mechanism
To handle scenarios where business logic execution exceeds the lock's TTL, we implement a "Watchdog" mechanism. This background thread renews the lock key if it is still owned by the current thread.
4.1 Renewal Entry Definition
public class RenewalEntry {
private final String lockKey;
private final String ownerToken;
private final long ttlSeconds;
private final Thread boundThread;
private volatile Instant lastRenewalTime;
private int renewalCount;
private static final int MAX_RENEWALS = 30;
public RenewalEntry(String lockKey, String ownerToken, long ttlSeconds, Thread boundThread) {
this.lockKey = lockKey;
this.ownerToken = ownerToken;
this.ttlSeconds = ttlSeconds;
this.boundThread = boundThread;
this.lastRenewalTime = Instant.now();
}
public boolean shouldRenew() {
// Renew when 2/3 of the TTL has elapsed
Instant threshold = lastRenewalTime.plusMillis((long) (ttlSeconds * 1000 * 0.66));
return Instant.now().isAfter(threshold) && renewalCount < MAX_RENEWALS;
}
public void recordRenewal() {
this.lastRenewalTime = Instant.now();
this.renewalCount++;
}
public void abortIfHanging() {
if (renewalCount >= MAX_RENEWALS) {
boundThread.interrupt();
}
}
// Getters omitted for brevity
}
4.2 Watchdog Scheduler
@Component
@Slf4j
public class WatchdogScheduler {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final ConcurrentLinkedQueue<RenewalEntry> renewalQueue = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler;
public WatchdogScheduler() {
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("lock-watchdog");
return t;
});
this.scheduler.scheduleAtFixedRate(this::processRenewals, 1, 2, TimeUnit.SECONDS);
}
public void registerTask(RenewalEntry entry) {
renewalQueue.add(entry);
}
private void processRenewals() {
List<RenewalEntry> toRemove = new ArrayList<>();
renewalQueue.forEach(entry -> {
try {
// Check if lock still exists in Redis
Boolean exists = redisTemplate.hasKey(entry.getLockKey());
if (Boolean.FALSE.equals(exists)) {
toRemove.add(entry);
return;
}
if (entry.shouldRenew()) {
log.debug("Renewing lock for key: {}", entry.getLockKey());
redisTemplate.expire(entry.getLockKey(), entry.getTtlSeconds(), TimeUnit.SECONDS);
entry.recordRenewal();
}
entry.abortIfHanging();
// Remove if thread is no longer alive (task finished)
if (!entry.getBoundThread().isAlive()) {
toRemove.add(entry);
}
} catch (Exception e) {
log.error("Renewal failed for {}", entry.getLockKey(), e);
toRemove.add(entry);
}
});
renewalQueue.removeAll(toRemove);
}
}
The WatchdogScheduler maintains a queue of active locks. Periodically, it checks if a lock is approaching expiration (specifically, after 2/3 of its TTL has passed) and resets the TTL in Redis. This ensures that long-running tasks do not lose their lock unexpectedly. Tasks are removed from the queue once the associated business thread completes or the key is deleted.