Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing a Redis-Based Distributed Lock with Spring AOP and Watchdog Renewal

Tech May 8 3

1. Defining the Lock Annotation

To create a declarative distributed locking mechanism, we start by defining a custom annotation. This annotation will allow us to mark methods that require synchronization across distributed instances.

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface DistributedLock {
    
    long DEFAULT_LEASE_TIME = 30L;
    long DEFAULT_WAIT_TIME = 10L;

    /**
     * The specific resource key for the lock.
     */
    String resourceKey() default "global_lock";

    /**
     * The duration the lock should be held before automatic expiration (in seconds).
     */
    long leaseTime() default DEFAULT_LEASE_TIME;

    /**
     * The maximum time to wait for acquiring the lock (in seconds).
     */
    long waitTime() default DEFAULT_WAIT_TIME;
}

The leaseTime defines the TTL (Time To Live) for the lock key in Redis, preventing deadlocks in case of crashes. The waitTime allows the client to retry acquisition for a specified period rather than failing immediately.

2. Usage Scenario

Below is an example of a service managing inventory. To prevent race conditions during stock deduction, we apply the @DistributedLock annotation to the modification method.

@Service
public class InventoryManager {

    @DistributedLock(resourceKey = "inventory_update", leaseTime = 15, waitTime = 5)
    public void reduceStock(Long itemId) {
        // Fetch item
        InventoryItem item = repository.findById(itemId);
        // Modify state
        item.setQuantity(item.getQuantity() - 1);
        // Persist changes
        repository.save(item);
    }
}

3. AOP Interceptor Implementation

We utilize Spring AOP to intercept methods annotated with @DistributedLock. The aspect will handle the logic of acquiring the lock, executing the business method, and ensuring the lock is released.

@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class LockInterceptorAspect {

    private final RedisTemplate<String, String> redisTemplate;
    private final WatchdogScheduler watchdogScheduler;

    private static final String LOCK_PREFIX = "dist_lock:";

    @Pointcut("@annotation(com.example.lock.DistributedLock)")
    public void lockPointcut() {}

    @Around("lockPointcut()")
    public Object handleLock(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        DistributedLock lockAnnotation = signature.getMethod().getAnnotation(DistributedLock.class);
        
        String lockKey = LOCK_PREFIX + lockAnnotation.resourceKey();
        String requestId = UUID.randomUUID().toString();
        long leaseDuration = lockAnnotation.leaseTime();
        long acquireDeadline = System.currentTimeMillis() + lockAnnotation.waitTime() * 1000;

        boolean lockAcquired = false;

        try {
            // Spin until lock is acquired or timeout
            while (System.currentTimeMillis() < acquireDeadline) {
                Boolean success = redisTemplate.opsForValue()
                        .setIfAbsent(lockKey, requestId, leaseDuration, TimeUnit.SECONDS);
                
                if (Boolean.TRUE.equals(success)) {
                    lockAcquired = true;
                    // Register watchdog for auto-renewal
                    watchdogScheduler.registerTask(new RenewalEntry(lockKey, requestId, leaseDuration, Thread.currentThread()));
                    break;
                }
                Thread.sleep(100); // Backoff before retry
            }

            if (!lockAcquired) {
                throw new IllegalStateException("Unable to acquire lock within specified timeout");
            }

            return joinPoint.proceed();

        } catch (Exception ex) {
            log.error("Error during locked execution", ex);
            throw ex;
        } finally {
            if (lockAcquired) {
                releaseLock(lockKey, requestId);
            }
        }
    }

    private void releaseLock(String key, String identifier) {
        // Lua script ensures atomicity of check-and-delete
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), identifier);
    }
}

4. Lock Renewal (Watchdog) Mechanism

To handle scenarios where business logic execution exceeds the lock's TTL, we implement a "Watchdog" mechanism. This background thread renews the lock key if it is still owned by the current thread.

4.1 Renewal Entry Definition

public class RenewalEntry {
    private final String lockKey;
    private final String ownerToken;
    private final long ttlSeconds;
    private final Thread boundThread;
    
    private volatile Instant lastRenewalTime;
    private int renewalCount;
    private static final int MAX_RENEWALS = 30;

    public RenewalEntry(String lockKey, String ownerToken, long ttlSeconds, Thread boundThread) {
        this.lockKey = lockKey;
        this.ownerToken = ownerToken;
        this.ttlSeconds = ttlSeconds;
        this.boundThread = boundThread;
        this.lastRenewalTime = Instant.now();
    }

    public boolean shouldRenew() {
        // Renew when 2/3 of the TTL has elapsed
        Instant threshold = lastRenewalTime.plusMillis((long) (ttlSeconds * 1000 * 0.66));
        return Instant.now().isAfter(threshold) && renewalCount < MAX_RENEWALS;
    }

    public void recordRenewal() {
        this.lastRenewalTime = Instant.now();
        this.renewalCount++;
    }

    public void abortIfHanging() {
        if (renewalCount >= MAX_RENEWALS) {
            boundThread.interrupt();
        }
    }

    // Getters omitted for brevity
}

4.2 Watchdog Scheduler

@Component
@Slf4j
public class WatchdogScheduler {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private final ConcurrentLinkedQueue<RenewalEntry> renewalQueue = new ConcurrentLinkedQueue<>();
    private final ScheduledExecutorService scheduler;

    public WatchdogScheduler() {
        this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("lock-watchdog");
            return t;
        });
        this.scheduler.scheduleAtFixedRate(this::processRenewals, 1, 2, TimeUnit.SECONDS);
    }

    public void registerTask(RenewalEntry entry) {
        renewalQueue.add(entry);
    }

    private void processRenewals() {
        List<RenewalEntry> toRemove = new ArrayList<>();

        renewalQueue.forEach(entry -> {
            try {
                // Check if lock still exists in Redis
                Boolean exists = redisTemplate.hasKey(entry.getLockKey());
                if (Boolean.FALSE.equals(exists)) {
                    toRemove.add(entry);
                    return;
                }

                if (entry.shouldRenew()) {
                    log.debug("Renewing lock for key: {}", entry.getLockKey());
                    redisTemplate.expire(entry.getLockKey(), entry.getTtlSeconds(), TimeUnit.SECONDS);
                    entry.recordRenewal();
                }
                
                entry.abortIfHanging();

                // Remove if thread is no longer alive (task finished)
                if (!entry.getBoundThread().isAlive()) {
                    toRemove.add(entry);
                }

            } catch (Exception e) {
                log.error("Renewal failed for {}", entry.getLockKey(), e);
                toRemove.add(entry);
            }
        });

        renewalQueue.removeAll(toRemove);
    }
}

The WatchdogScheduler maintains a queue of active locks. Periodically, it checks if a lock is approaching expiration (specifically, after 2/3 of its TTL has passed) and resets the TTL in Redis. This ensures that long-running tasks do not lose their lock unexpectedly. Tasks are removed from the queue once the associated business thread completes or the key is deleted.

Tags: Java

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.