Fading Coder

One Final Commit for the Last Sprint

Home > Tools > Content

Exploring Blocking Queues: Concepts, Applications, and a Simple Implementation

Tools 1

Blocking Queue Basics

A blocking queue is a thread-safe extension of the standard FIFO queue, integrating automatic blocking mechanisms for both enqueue and dequeue operations:

  1. If the queue is empty, take() operations block the calling thread until an element becomes available.
  2. If the queue reaches its maximum capacity, put() operations block the calling thread untill space frees up.

Producer-Consumer Pattern with Message Queues

By categorizing data via topics and differentiating dequeues with topic specifications, message queues leverage blocking queue properties to implement the classic producer-consumer pattern, delivering two key benefits:

  • Decoupling Components: Prior to using a message queue, components interact directly, embedding tight coupling into code; any change to one often breaks the other. With a queue as an intermediary, producers only care about sending data to the queue, and consumers only care about retrieving data from it, eliminating direct dependencies.
  • Traffic Smoothing: Unpredictable traffic spikes can overwhelm consumers. A queue acts as a buffer, storing incoming requests temporarily so consumers can process them at a fixed, manageable rate.

Standard Library Usage Exampels

Single-Threaded Blocking Queue

The java.util.concurrent.BlockingQueue interface provides blocking methods like put() and take() that throw checked InterruptedException for safe handling. A common bounded implementation is ArrayBlockingQueue:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

public class SingleThreadBQDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(100);
        taskQueue.put("first task");
        taskQueue.put("second task");
        String processedItem = taskQueue.take();
        System.out.println("Processed: " + processedItem);
    }
}

Multi-Threaded Producer-Consumer Pattern

This example creates a bounded queue (capacity=1000), a producer thread that generates incrementing integers, and a consumer thread that processes one item per second:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

public class MultiThreadPCDemo {
    private static int msgCounter = 0;

    public static void main(String[] args) {
        BlockingQueue<Integer> dataQueue = new ArrayBlockingQueue<>(1000);

        Thread producer = new Thread(() -> {
            while (true) {
                try {
                    int currentMsg = msgCounter++;
                    dataQueue.put(currentMsg);
                    System.out.println("Produced message: " + currentMsg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Producer interrupted");
                    break;
                }
            }
        });

        Thread consumerWorker = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Integer consumedMsg = dataQueue.take();
                    System.out.println("Consumed message: " + consumedMsg);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Consumer interrupted");
                    break;
                }
            }
        });

        producer.start();
        consumerWorker.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            producer.interrupt();
            consumerWorker.interrupt();
            try {
                producer.join();
                consumerWorker.join();
            } catch (InterruptedException e) {
                System.err.println("Shutdown interrupted");
            }
            System.out.println("Application shutdown completed");
        }));
    }
}

Note: A shutdown hook is added here for graceful termination, which was not in the original example but improves robustness.

Implementing a Simple Bounded Blocking Queue

A minimal custom implementation uses a fixed-size circular buffer for storage, synchronized blocks for thread safety, and wait()/notifyAll() for blocking (notifyAll avoids false wakeups):

class CustomBoundedBlockingQueue {
    private final Object[] buffer;
    private int count;
    private int headIndex;
    private int tailIndex;

    public CustomBoundedBlockingQueue(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("Capacity must be positive");
        }
        buffer = new Object[capacity];
        count = 0;
        headIndex = 0;
        tailIndex = 0;
    }

    public void enqueue(Object item) throws InterruptedException {
        synchronized (this) {
            while (count == buffer.length) {
                this.wait();
            }
            buffer[tailIndex] = item;
            tailIndex = (tailIndex + 1) % buffer.length;
            count++;
            this.notifyAll();
        }
    }

    public Object dequeue() throws InterruptedException {
        synchronized (this) {
            while (count == 0) {
                this.wait();
            }
            Object item = buffer[headIndex];
            buffer[headIndex] = null;
            headIndex = (headIndex + 1) % buffer.length;
            count--;
            this.notifyAll();
            return item;
        }
    }

    public int size() {
        synchronized (this) {
            return count;
        }
    }
}

public class CustomBQDemo {
    public static void main(String[] args) {
        CustomBoundedBlockingQueue queue = new CustomBoundedBlockingQueue(1000);

        Thread producer = new Thread(() -> {
            int msgId = 0;
            while (true) {
                try {
                    queue.enqueue("Message-" + msgId);
                    System.out.println("Generated: Message-" + msgId);
                    msgId++;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Producer stopped");
                    break;
                }
            }
        });

        Thread consumer = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Object item = queue.dequeue();
                    System.out.println("Processed: " + item);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Consumer stopped");
                    break;
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

Related Articles

Efficient Usage of HTTP Client in IntelliJ IDEA

IntelliJ IDEA incorporates a versatile HTTP client tool, enabling developres to interact with RESTful services and APIs effectively with in the editor. This functionality streamlines workflows, replac...

Installing CocoaPods on macOS Catalina (10.15) Using a User-Managed Ruby

System Ruby on macOS 10.15 frequently fails to build native gems required by CocoaPods (for example, ffi), leading to errors like: ERROR: Failed to build gem native extension checking for ffi.h... no...

Resolve PhpStorm "Interpreter is not specified or invalid" on WAMP (Windows)

Symptom PhpStorm displays: "Interpreter is not specified or invalid. Press ‘Fix’ to edit your project configuration." This occurs when the IDE cannot locate a valid PHP CLI executable or when the debu...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.