Core Java J.U.C. Concurrency Utilities Guide
1. Asynchronous Task Composition witth CompletableFuture
Application Context: This utility is essential when orchestrating multiple independent asynchronous operations, such as fetching data from various microservices, and aggregating their results into a single response payload.
Code Demonstration:
import java.util.concurrent.CompletableFuture;
public class AsyncOrchestrationDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> opOne = CompletableFuture.supplyAsync(() -> "Result A");
CompletableFuture<String> opTwo = CompletableFuture.supplyAsync(() -> "Result B");
// Combine results after both finish
CompletableFuture<String> combined = opOne.thenCombine(opTwo, (a, b) -> a + ":" + b);
System.out.println(combined.join());
}
}
2. Thread-Safe Hash Map
Application Context:
ConcurrentHashMap provides high-performance thread-safe storage without locking the entire map. It is ideal for caching mechanisms or maintaining global state counters where read/write performance must be maximized under contention.
Implementation:
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMapStats {
public static void main(String[] args) throws Exception {
ConcurrentHashMap<String, Integer> stats = new ConcurrentHashMap<>();
Runnable updateTask = () -> {
String key = "counter";
stats.merge(key, 1, Integer::sum);
};
Thread t1 = new Thread(updateTask);
Thread t2 = new Thread(updateTask);
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(stats.get("counter"));
}
}
3. Lock-Free Atomic Counters
Application Context:
For scenarios requiring simple increment or decrement operasions across threads without the overhead of synchronized blocks, AtomicInteger utilizes CAS (Compare-And-Swap) instructions to ensure thread safety and high throughput.
Example:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounterDemo {
public static void main(String[] args) throws Exception {
AtomicInteger sequence = new AtomicInteger(0);
Runnable counterWorker = () -> {
for (int i = 0; i < 500; i++) {
sequence.incrementAndGet();
}
};
Thread t1 = new Thread(counterWorker);
Thread t2 = new Thread(counterWorker);
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(sequence.get());
}
}
4. Waiting for Batch Completion
Application Context:
CountDownLatch acts as a barrier allowing one or more threads to wait until a set of operations performed by other threads completes. This is useful for initializing resources or waiting for parallel subtasks to finish before aggregation.
Sample Logic:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LatchCoordinator {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
int pendingTasks = 3;
CountDownLatch completionGate = new CountDownLatch(pendingTasks);
for (int i = 0; i < pendingTasks; i++) {
executor.submit(() -> {
try {
System.out.println("Processing task...");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
completionGate.countDown();
}
});
}
completionGate.await();
System.out.println("All tasks finished.");
executor.shutdown();
}
}
5. Controlling Resource Access
Application Context:
To limit the number of concurrent threads accessing a shared resource (e.g., database connections), use Semaphore. It maintains a pool of permits; acquiring a permit reduces the count, releasing it increases the count, blocking threads if no permits are available.
Simulation:
import java.util.concurrent.Semaphore;
public class ResourcePoolManager {
public static void main(String[] args) throws Exception {
Semaphore accessLimit = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
accessLimit.acquire();
System.out.println("Access granted: " + Thread.currentThread().getName());
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
accessLimit.release();
}
}).start();
}
}
}
6. Synchronizing Multiple Threads at a Barrier
Application Context:
CyclicBarrier allows a group of threads to pause execution until all participants reach a specific point in code. Once the threshold is met, all threads proceed to gether. Unlike a one-time latch, this can be reused.
Workflow:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class GroupSyncPoint {
public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Group sync completed.");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
Thread.sleep(500);
System.out.println("Thread ready");
barrier.await();
System.out.println("Proceeding");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
7. Producer-Consumer Communication
Application Context:
A BlockingQueue facilitates communication between producer and consumer threads. It handles synchronization automatically: producers block when full, consumers block when empty. This decouples processing stages effectively.
Implementation:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class QueuePipeline {
public static void main(String[] args) throws Exception {
BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(2);
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
buffer.put(i);
System.out.println("Produced " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
while (true) {
Integer val = buffer.take();
System.out.println("Consumed " + val);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}