Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Implementing the Producer-Consumer Pattern with Blocking Queues and Thread Synchronization

Notes May 18 2

Thread-based concurrency requires careful coordination when multiple threads share access to common resources. The producer-consumer pattern addresses this challenge by decoupling data generation from data processing through an intermediate buffer.

Core Implementation with BlockingQueue

Java's java.util.concurrent.BlockingQueue interface provides thread-safe operations that automatically handle synchronization. The following implementation demonstrates a bounded buffer that blocks producers when full and consumers when empty:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TaskBuffer {
    private final BlockingQueue<Integer> buffer;

    public TaskBuffer(int maxSize) {
        this.buffer = new LinkedBlockingQueue<>(maxSize);
    }

    public void submit(Integer task) throws InterruptedException {
        buffer.put(task);
        System.out.println("Submitted task: " + task + " | Buffer size: " + buffer.size());
    }

    public Integer retrieve() throws InterruptedException {
        Integer task = buffer.take();
        System.out.println("Retrieved task: " + task + " | Buffer size: " + buffer.size());
        return task;
    }
}

The put() method blocks the calling thread if the queue has reached capacity, while take() blocks if no elements are available. This internal blocking mechanism eliminates the need for explicit wait-notify patterns.

Producer and Consumer Thread Implementation

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        TaskBuffer taskBuffer = new TaskBuffer(3);

        Thread producer = new Thread(() -> {
            int taskId = 0;
            while (true) {
                try {
                    taskBuffer.submit(++taskId);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Producer-Thread");

        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Integer task = taskBuffer.retrieve();
                    Thread.sleep(800);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Consumer-Thread");

        producer.start();
        consumer.start();
    }
}

Thread Lifecycle and Execution States

When start() is invoked on a thread object, it enters the runnable state, waiting for the JVM scheduler to allocate CPU time. A thread transitions through several states during its lifecycle:

  • Runnable: Ready for execution but waiting for CPU allocation
  • Running: Currently executing instructions
  • Blocked/Waiting: Suspended due to I/O operations, lock contention, or explicit wait calls
  • Terminated: Execution completed or interrupted

Threads can voluntarily yield control through Thread.sleep() or involuntarily lose CPU time through scheduler preemption. When a thread performs blocking I/O or waits for a monitor lock, it cannot proceed until the blocking condition resolves.

Data Structure Comparison for Buffering

Selecting the appropriate data structure for a buffer depends on access patterns and performance requirements:

StructureOrderingUse Case
QueueFIFO (First-In-First-Out)Task scheduling, message passing, event handling
StackLIFO (Last-In-First-Out)Undo operations, function call management, depth-first search
Priority Queue/HeapBy priority valueJob scheduling, shortest-path algorithms, resource allocation

Queues align naturally with producer-consumer scenarios where processing order must match arrival order. Circular buffers (ring buffers) offer a memory-efficient alternative for fixed-size requirements, avoiding allocation overhead by reusing array positions.

GUI Event Handling with Producer-Consumer Pattern

In graphical applications, user interactions generate events that must be processed without freezing the interface. The following Swing application demonstrates event-driven production with background consumption:

import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

public class EventDrivenUI extends JFrame {
    private final TaskBuffer eventBuffer;

    public EventDrivenUI() {
        setTitle("Event Queue Demo");
        eventBuffer = new TaskBuffer(10);
        setLayout(new FlowLayout());

        String[] buttonLabels = {"Action A", "Action B", "Action C"};
        for (String label : buttonLabels) {
            JButton btn = new JButton(label);
            btn.addActionListener(new EventListener());
            add(btn);
        }

        startBackgroundProcessor();
        setSize(400, 150);
        setDefaultCloseOperation(EXIT_ON_CLOSE);
        setLocationRelativeTo(null);
        setVisible(true);
    }

    private void startBackgroundProcessor() {
        Thread processor = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Integer eventId = eventBuffer.retrieve();
                    processEvent(eventId);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "Event-Processor");
        processor.setDaemon(true);
        processor.start();
    }

    private void processEvent(int eventId) {
        System.out.println("Processing event ID: " + eventId + " on " + Thread.currentThread().getName());
    }

    private class EventListener implements ActionListener {
        private int eventCounter = 0;

        @Override
        public void actionPerformed(ActionEvent e) {
            try {
                eventBuffer.submit(++eventCounter);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        SwingUtilities.invokeLater(EventDrivenUI::new);
    }
}

Synchronous vs Asynchronous Execution Models

Synchronous execution enforces sequential task completion—each operation must finish before the next begins. This model simplifies control flow but risks blocking the calling thread during lengthy operations.

Asynchronous execution allows tasks to proceed independently. The caller initiates an operation and continues without waiting for completion, typically receiving results through callbacks or futures. This approach maintains responsiveness, particularly critical for user interface threads.

In desktop and mobile platforms, the main thread serves as the UI thread, handling rendering and input events. Blocking this thread with long-running operations causes interface freezes or Application Not Responding (ANR) errors on Android. Background threads handle intensive workloads while communicating results back to the main thread for UI updates.

Android Threading Considerations

Android components enforce strict threading rules:

  • Activity: Lifecycle callbacks execute on the main thread; delegate database, network, or file operations to background threads
  • Service: Default execution occurs on the main thread; use IntentService, JobIntentService, or manual threading for background work
  • BroadcastReceiver: onReceive() runs on the main thread with a 10-second limit; offload processing to avoid ANR

Handler and Looper mechanisms facilitate inter-thread communication, enabling background threads to post Runnable objects for execution on the main thread.

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.