Fading Coder

One Final Commit for the Last Sprint

Home > Tools > Content

Implementing Message Queues with Redis List: Consumer Thread Models

Tools May 10 2

Redis List is a string-based linked list data structure that supports bidirectional traversal. In production environments, many organizations leverage Redis List as a lightweight message queue solution. This article explores how to implement message queue functionality using List commands and examines various consumer thread models.

Core Mechanism

The producer uses LPUSH key element[element...] to insert messages at the head of the queue. If the key does not exist, Redis creates an empty list first.

Example: inserting "Java", "Yongge", and "Go" into the queue:

> LPUSH queue Java Yongge Go
(integer) 3

The consumer uses RPOP key to retrieve messages in FIFO order. The "Java" message is consumed first:

> RPOP queue
"Java"
> RPOP queue
"Yongge"
> RPOP queue
"Go"

The following demonstrates the produce-consume workflow using Spring Data Redis:

Producer implementation:

redisTemplate.opsForList().leftPush("queue", "Java");
redisTemplate.opsForList().leftPush("queue", "Yongge");
redisTemplate.opsForList().leftPush("queue", "Go");

Consumer implementation:

Start an independent thread that continuously calls RPOP to retrieve messages from the queue. After successfully reading a message, process the business logic. If no message is available, sleep for a while before the next iteration.

The sleep interval helps reduce unnecessary CPU consumption from empty polling.

Is there a more efficient approach? Yes—using Redis blocking commands for List operations.

Redis provides BLPOP and BRPOP for blocking reads. When the queue is empty, the consumer automatically blocks until new messages are written to the queue.

BRPOP queue 0

The parameter 0 means unlimited blocking time.

In this model, a consumer thread operates as a perpetual worker—after fetching a message, it executes the consumption logic. This model is straightforward and well-suited for sequential processing scenarios. However, if the server crashes or loses power during message consumption, one message may be lost.

Can we achieve higher consumption throughput? Based on practical experience, there are three primary models:

  • Polling thread + consumer thread pool (non-blocking)
  • Polling thread + consumer thread pool (blocking)
  • Polling thread + Disruptor (blocking)

Polling Thread + Consumer Thread Pool (Non-blocking)

To improve consumption speed, separate the polling and consumption operations into distinct threads. A dedicated thread pool handles message polling while another processes the actual business logic.

Pseudo-code structure:

public class MessageConsumer {
    private final ExecutorService consumerPool = Executors.newFixedThreadPool(8);
    
    public void consume() {
        while (running) {
            String message = redisTemplate.opsForList().rightPop("queue");
            if (message != null) {
                consumerPool.submit(() -> processMessage(message));
            } else {
                Thread.sleep(100);
            }
        }
    }
}

This approach significantly improves throughput through parallel processing. However, a critical issue emerges: if consumption is slower than production, messages accumulate in the thread pool queue, creating two hidden risks:

  • Unbounded queue growth may lead to OutOfMemory errors
  • Server crashes would result in substantial message loss

Polling Thread + Consumer Thread Pool (Blocking)

Consider wrapping messages as Runnable tasks and submitting them to the thread pool via execute(). Does this block the polling thread?

The underlying implementation uses a non-blocking offer() method for the work queue. When the queue is full, new tasks are not blocked—they trigger the rejection policy instead.

To achieve blocking behavior, two approaches are available:

  • Use a semaphore to limit concurrent tasks in the wait queue
  • Leverage the thread pool's rejection handler to put new tasks into a blocking queue

Polling Thread + Disruptor

The Disruptor framework follows a producer-consumer pattern similar to thread pools. While thread pools use blocking queues to store pending tasks, Disruptor employs a RingBuffer as its underlying storage.

RingBuffer offers several advantages over traditional blocking queues:

Array-based structure

Using arrays instead of linked lists eliminates garbage collection overhead. Arrays also demonstrate better cache locality for processors.

Efficient index calculation

With array length set to 2^n, position lookup uses bit operations for speed. Indices increment monotonically—there's no overflow concern since long type indices would require approximately 300,000 years to exhaust at 1 million QPS.

Lock-free design

Each producer or consumer thread first acquires a position in the array, then directly reads or writes at that location without synchronization overhead.

Understanding RingBuffer's read/write mechanics isn't essantial here—just recognize that RingBuffer represents Disruptor's core innovation.

Replacing the consumer thread pool with Disruptor provides two significant benefits:

  • Lock-free queue operations yield superior read/write performance
  • When the polling thread submits messages to Disruptor, it blocks if the RingBuffer is full—this natural backpressure mechanism prevents unbounded accumulation and eliminates OOM risks

Implementation overview:

  1. Define the Disruptor instance
  2. Have the polling thread send messages to the Disruptor RingBuffer
  3. Process messages through event handlers

Graceful Shutdown and Scheduled Compensation

Regardless of the consumer model employed, unexpected server failures or power outages result in message loss. Two mitigation strategies are recommended:

Graceful shutdown

Graceful shutdown aims to complete ongoing tasks before terminating the application. In Unix/Linux systems, the kill command sends signals to running processes:

  • SIGTERM (15): Requests termination—can be caught and handled for graceful shutdown
  • SIGKILL (9): Forces immediate termination—cannot be intercepted
  • SIGQUIT (3): Exits and generates a core dump

Use Java's Runtime.getRuntime().addShutdownHook() to register a cleanup handler that executes when SIGTERM is received:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutdown hook triggered. Performing cleanup...");
    // Close resources, save state, etc.
}));

This hook can gracefully close the polling thread pool and consumer thread pool, minimizing message loss.

Scheduled compensation

Since message loss is unavoidable with List-based queues, implement scheduled jobs to compensate. Periodically query the business status in the database. If the status doesn't meet expected conditions, trigger compensation logic.

@Scheduled(cron = "0 */5 * * * ?")
public void compensateUnprocessedMessages() {
    List<BusinessRecord> unprocessed = businessService.findUnprocessedRecords();
    for (BusinessRecord record : unprocessed) {
        redisTemplate.opsForList().leftPush("queue", record.toJson());
    }
}

Related Articles

Efficient Usage of HTTP Client in IntelliJ IDEA

IntelliJ IDEA incorporates a versatile HTTP client tool, enabling developres to interact with RESTful services and APIs effectively with in the editor. This functionality streamlines workflows, replac...

Installing CocoaPods on macOS Catalina (10.15) Using a User-Managed Ruby

System Ruby on macOS 10.15 frequently fails to build native gems required by CocoaPods (for example, ffi), leading to errors like: ERROR: Failed to build gem native extension checking for ffi.h... no...

Resolve PhpStorm "Interpreter is not specified or invalid" on WAMP (Windows)

Symptom PhpStorm displays: "Interpreter is not specified or invalid. Press ‘Fix’ to edit your project configuration." This occurs when the IDE cannot locate a valid PHP CLI executable or when the debu...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.