Core Java Concurrency Utilities: From Fundamentals to Advanced Patterns
1. Process and Thread Fundamentals
1.1 Understanding Processes and Threads
A process represents an executing program within the operating system. It serves as the fundamental unit for resource allocation and scheduling. In modern thread-based computing architectures, processes function as containers for threads. A process encompasses program code, data, and system resources.
A thread constitutes the smallest unit of processing that the operating system can schedule. Threads exist within processes and represent the actual execution path. A single process can contain multiple concurrent threads, with each thread handling different tasks simultaneously.
Key distinctions:
- Process: An executing application in the system; once running, it becomes a process; represents the minimum unit of resource allocation
- Thread: The basic unit of CPU time allocation; the minimum unit of program execution
1.2 Thread States in Java
The Thread.State inner class defines six distinct thread states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, and TERMINATED.
1.3 wait() and sleep() Differences
sleep()is a static method in Thread;wait()is an Object method callable on any instancesleep()does NOT release the lock (it doesn't require one);wait()releases the lock BUT only when called within a synchronized block- Both methods can be interrupted via the
interrupt()method
1.4 Concurrency versus Parallelism
- Concurrency: Multiple threads executing within the same time interval—parallel at the macro level, sequential at the micro level
- Parallelism: Multiple threads executing simultaneously at the exact same moment
1.5 Monitors
A monitor ensures that only one process can execute within its critical section at any given time. Operations defined within the monitor are executed by only one thread at a time, enforced by the compiler.
1.6 User Threads vs Daemon Threads
User threads: Custom-created threads that keep the JVM alive when the main thread completes
Daemon threads: Background service threads (e.g., garbage collector)
When only daemon threads remain, the JVM terminates.
public class ThreadTypeDemo {
public static void main(String[] args) {
Thread worker = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Daemon: " + Thread.currentThread().isDaemon());
while (true) {
// Infinite loop - JVM continues running
}
}, "Worker");
worker.setDaemon(true);
worker.start();
System.out.println(Thread.currentThread().getName() + " finished");
}
}
Output:
main finished Worker Daemon: true
2. The Lock Interface
2.1 Synchronized Keyword Review
The synchronized keyword serves as an intrinsic lock mechanism in Java. It can modify objects, variables, or methods to control sequential access. Below demonstrates a ticket sales system using synchronized:
class TicketInventory {
private int remaining = 1000;
public synchronized void sell() {
if (remaining > 0) {
System.out.println(Thread.currentThread().getName() + " sold ticket, remaining: " + --remaining);
}
}
}
public class TicketDemo {
public static void main(String[] args) {
TicketInventory inventory = new TicketInventory();
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
inventory.sell();
}
};
new Thread(task, "WindowA").start();
new Thread(task, "WindowB").start();
new Thread(task, "WindowC").start();
}
}
2.2 Lock Interface Overview
The Lock interface provides more flexible locking operations than synchronized blocks. It supports non-blocking lock attempts (tryLock()), interruptible lock acquisition (lockInterruptibly()), and timeout-based lock attempts.
ReentrantLock Implementation:
private final ReentrantLock monitor = new ReentrantLock();
try {
monitor.lock();
// business logic
} finally {
monitor.unlock();
}
2.3 Thread Creation Strategies
- Extend the Thread class
- Implement the Runnable interface
- Implement the Callable interface
- Utilize thread pools
2.4 Ticket Sales with Lock
class TicketInventory {
private int remaining = 1000;
private final ReentrantLock monitor = new ReentrantLock();
public void sell() {
monitor.lock();
try {
if (remaining > 0) {
System.out.println(Thread.currentThread().getName() + " sold ticket, remaining: " + --remaining);
}
} finally {
monitor.unlock();
}
}
}
public class LockTicketDemo {
public static void main(String[] args) {
TicketInventory inventory = new TicketInventory();
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
inventory.sell();
}
};
new Thread(task, "WindowA").start();
new Thread(task, "WindowB").start();
new Thread(task, "WindowC").start();
}
}
2.5 Synchronized vs Lock
| Feature | Synchronized | Lock |
|---|---|---|
| Type | Built-in keyword | Separate class |
| Lock Release | Automatic | Manual (finally block required) |
| Interrupt Response | No | Yes |
| Lock Status Check | No | Yes (tryLock()) |
| Performance | Basic | Better for concurrent reads |
3. Inter-Thread Communication
Two communication models exist: shared memory and message passing.
3.1 Synchronized Communication Pattern
The pattern involves:判断 (check), 操作 (process), 通知 (notify)
class SharedResource {
private int value = 0;
public synchronized void increment() throws InterruptedException {
while (value != 0) {
wait();
}
value++;
System.out.print(Thread.currentThread().getName() + ":" + value + "--->");
notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (value != 1) {
wait();
}
value--;
System.out.println(Thread.currentThread().getName() + ":" + value);
notifyAll();
}
}
public class CommunicationDemo {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread incThread = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) resource.increment();
} catch (InterruptedException e) { e.printStackTrace(); }
}, "Incrementer");
Thread decThread = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) resource.decrement();
} catch (InterruptedException e) { e.printStackTrace(); }
}, "Decrementer");
incThread.start();
decThread.start();
}
}
3.2 Spurious Wakeup Problem
Spuriosu wakeups occur in multi-threaded scenarios. With four threads (two incrementers, two decrementers), using if statements causes incorrect behavior because threads may wake up without proper signaling.
Solution: Always use while loops instead of if statements for condition checking:
while (value != 0) {
wait();
}
3.3 Lock-Based Communication
Using Condition objects provides more granular control:
import java.util.concurrent.locks.*;
class SharedResource {
private int value = 0;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (value != 0) condition.await();
value++;
System.out.print(Thread.currentThread().getName() + ":" + value + "--->");
condition.signalAll();
} catch (InterruptedException e) { e.printStackTrace(); }
finally { lock.unlock(); }
}
public void decrement() {
lock.lock();
try {
while (value != 1) condition.await();
value--;
System.out.println(Thread.currentThread().getName() + ":" + value);
condition.signalAll();
} catch (InterruptedException e) { e.printStackTrace(); }
finally { lock.unlock(); }
}
}
4. Customized Inter-Thread Communication
Scenario: Three threads print specific counts—Thread A prints 5 times, Thread B prints 10 times, Thread C prints 15 times, repeated for 10 rounds.
class Controller {
private int flag = 1;
private final Lock lock = new ReentrantLock();
private final Condition condA = lock.newCondition();
private final Condition condB = lock.newCondition();
private final Condition condC = lock.newCondition();
public void printA(int loop) {
lock.lock();
try {
while (flag != 1) condA.await();
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
}
flag = 2;
condB.signal();
} catch (InterruptedException e) { e.printStackTrace(); }
finally { lock.unlock(); }
}
public void printB(int loop) {
lock.lock();
try {
while (flag != 2) condB.await();
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
}
flag = 3;
condC.signal();
} catch (InterruptedException e) { e.printStackTrace(); }
finally { lock.unlock(); }
}
public void printC(int loop) {
lock.lock();
try {
while (flag != 3) condC.await();
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
}
flag = 1;
condA.signal();
} catch (InterruptedException e) { e.printStackTrace(); }
finally { lock.unlock(); }
}
}
This represents the single-flag synchronization pattern, following the principles of mutual exclusion.
5. Thread-Safe Collections
5.1 Collection Thread Safety Issues
The standard ArrayList.add() method lacks synchronization, causing ConcurrentModificationException in concurrent scenarios:
public class UnsafeCollectionDemo {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int id = i;
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(id)).start();
}
}
}
5.2 Solution: Vector
Vector's add() method is synchronized but uses heavyweight locking with poor performance.
5.3 Solution: Collections Wrapper
List<String> list = Collections.synchronizedList(new ArrayList<>());
5.4 Solution: CopyOnWriteArrayList (Preferred)
Implements copy-on-write semantics: reads occur concurrently while writes operate on separate copies.
List<String> list = new CopyOnWriteArrayList<>();
Implementation insight:
public boolean add(E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
Object[] updated = Arrays.copyOf(current, len + 1);
updated[len] = element;
setArray(updated);
return true;
} finally {
lock.unlock();
}
}
5.5 HashSet Thread Safety
HashSet has the same concurrency issues. Solution:
Set<String> set = new CopyOnWriteArraySet<>();
5.6 HashMap Thread Safety
Use ConcurrentHashMap for thread-safe map operations.
6. Multi-Threaded Locking Mechanisms
6.1 Synchronized Lock Scenarios
- Instance methods: Lock on the current instance
- Static methods: Lock on the Class object
- Synchronized blocks: Lock on the specified object
class Device {
public synchronized void method1() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("Method1 executed");
}
public synchronized void method2() {
System.out.println("Method2 executed");
}
public void method3() {
System.out.println("Method3 executed");
}
}
Lock behavior depends on whether methods are static, the number of instances, and object identity.
6.2 Fair vs Unfair Locks
- Fair lock: Lower efficiency but better CPU utilization
- Unfair lock: Higher efficiency but potential thread starvation
ReentrantLock fairLock = new ReentrantLock(true);
ReentrantLock unfairLock = new ReentrantLock(false);
6.3 Reentrant Locks
Both synchronized and ReentrantLock are reentrant (recursive) locks. The same thread can acquire the lock multiple times without deadlocking.
public class ReentrantDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println("First lock acquired");
lock.lock();
try {
System.out.println("Second lock acquired");
} finally { lock.unlock(); }
} finally { lock.unlock(); }
}).start();
}
}
6.4 Deadlock
Deadlock occurs when two or more threads mutually wait for resources held by each other.
Conditions:
- Mutual exclusion
- Hold and wait
- No preemption
- Circular wait
public class DeadlockSimulation {
private static final Object resourceA = new Object();
private static final Object resourceB = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (resourceA) {
System.out.println(Thread.currentThread().getName() + " acquired resourceA");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
synchronized (resourceB) {
System.out.println(Thread.currentThread().getName() + " acquired resourceB");
}
}
}, "ThreadAlpha").start();
new Thread(() -> {
synchronized (resourceB) {
System.out.println(Thread.currentThread().getName() + " acquired resourceB");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
synchronized (resourceA) {
System.out.println(Thread.currentThread().getName() + " acquired resourceA");
}
}
}, "ThreadBeta").start();
}
}
Detection: Use jps and jstack commands to diagnose deadlocks.
7. Callable Interface
Callable provides a way to create threads with return values, unlike Runnable.
public class CallableDemo {
public static void main(String[] args) throws Exception {
FutureTask<String> task = new FutureTask<>(() -> {
System.out.println("Callable executing in " + Thread.currentThread().getName());
return "Result from Callable";
});
new Thread(task).start();
System.out.println("Returned value: " + task.get());
}
}
Key differences:
Runnable.run(): No return value, cannot throw checked exceptionsCallable.call(): Returns value, can throw exceptions
8. JUC Utility Classes
8.1 CountDownLatch
Enables waiting for a specific number of operations to complete.
public class StudentLeaveDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
final int studentId = i;
new Thread(() -> {
System.out.println("Student " + studentId + " left classroom");
latch.countDown();
}, String.valueOf(studentId)).start();
}
latch.await();
System.out.println("Monitor locked the door");
}
}
8.2 CyclicBarrier
Allows threads to wait for each other until all reach a common barrier point.
public class DragonBallDemo {
private static final int REQUIRED = 7;
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(REQUIRED,
() -> System.out.println("All dragon balls collected! Summon the dragon!"));
for (int i = 1; i <= REQUIRED; i++) {
final int ball = i;
new Thread(() -> {
System.out.println("Dragon ball " + ball + " collected");
try { barrier.await(); } catch (Exception e) { e.printStackTrace(); }
}, String.valueOf(i)).start();
}
}
}
8.3 Semaphore
Controls access to resources with a limited number of permits.
public class ParkingDemo {
public static void main(String[] args) {
Semaphore spots = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
final int vehicle = i;
new Thread(() -> {
try {
spots.acquire();
System.out.println("Vehicle " + vehicle + " parked");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println("Vehicle " + vehicle + " left");
} catch (InterruptedException e) { e.printStackTrace(); }
finally { spots.release(); }
}, String.valueOf(i)).start();
}
}
}
9. Read-Write Locks
9.1 Pessimistic vs Optimistic Locking
- Pessimistic: Assumes conflict, acquires lock before operation
- Optimistic: Assumes no conflict, uses version checking
9.2 Read-Write Lock Implementation
class CacheStore {
private final Map<String, Object> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " writing " + key);
TimeUnit.MILLISECONDS.sleep(300);
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " wrote " + key);
} catch (InterruptedException e) { e.printStackTrace(); }
finally { rwLock.writeLock().unlock(); }
}
public Object read(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " reading " + key);
TimeUnit.MILLISECONDS.sleep(300);
Object value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " read " + key);
return value;
} catch (InterruptedException e) { e.printStackTrace(); }
finally { rwLock.readLock().unlock(); }
}
}
9.3 Lock Downgrade
Write lock can be downgraded to read lock:
writeLock.lock();
// perform write operations
readLock.lock();
// perform read operations
writeLock.unlock();
readLock.unlock();
10. Blocking Queues
10.1 Overview
Blocking queues provide thread-safe producer-consumer patterns:
- Blocking on insertion when full
- Blocking on removal when empty
10.2 Core Methods
| Operation | Throws Exception | Special Value | Blocks | Timeout |
|---|---|---|---|---|
| Insert | add() |
offer() |
put() |
offer(e, time, unit) |
| Remove | remove() |
poll() |
take() |
poll(time, unit) |
| Examine | element() |
peek() |
- | - |
10.3 Implementation Types
- ArrayBlockingQueue: Array-based, bounded, single lock
- LinkedBlockingQueue: Linked list, optionally unbounded, dual locks
- DelayQueue: Priority-based, element-specific delays
- PriorityBlockingQueue: Priority-ordered, unbounded
- SynchronousQueue: No storage, direct handoff
- LinkedTransferQueue: Transfer mechanism, unbounded
11. Thread Pools
11.1 Benefits
- Reduces thread creation/destruction overhead
- Provides faster response time
- Enables centralized thread management
11.2 Executor Types
// Fixed thread pool - N persistent threads
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// Single thread executor
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// Cached pool - scales dynamically
ExecutorService cachedPool = Executors.newCachedThreadPool();
11.3 ThreadPoolExecutor Parameters
Seven core parameters:
corePoolSize: Minimum threads kept alivemaximumPoolSize: Maximum thread countkeepAliveTime: Extra thread lifetimeTimeUnit: Time unit for keepAliveTimeworkQueue: Task queuethreadFactory: Thread creation strategyhandler: Rejection policy
11.4 Rejection Policies
- AbortPolicy: Throw RejectedExecutionException (default)
- CallerRunsPolicy: Execute in caller's thread
- DiscardOldestPolicy: Discard oldest pending task
- DiscardPolicy: Silently discard task
11.5 Custom Thread Pool (Recommended)
ExecutorService pool = new ThreadPoolExecutor(
2, // core pool size
5, // maximum pool size
2L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
12. Fork/Join Framework
Splits large tasks into smaller subtasks, processes them in parallel, then merges results.
class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10;
private int start, end;
private int result;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - start) <= THRESHOLD) {
for (int i = start; i <= end; i++) result += i;
} else {
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
result = left.join() + right.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws Exception {
ForkJoinPool executor = new ForkJoinPool();
ForkJoinTask<Integer> task = executor.submit(new SumTask(1, 100));
System.out.println("Result: " + task.get());
}
}
13. Asynchronous Programming
13.1 CompletableFuture
Provides powerful async operations with callback support:
public class AsyncDemo {
public static void main(String[] args) throws Exception {
// Async without return value
CompletableFuture.runAsync(() ->
System.out.println("Running async task"));
// Async with return value
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Computing result");
return 42;
});
future.whenComplete((result, error) -> {
System.out.println("Result: " + result);
System.out.println("Error: " + error);
});
}
}
13.2 CompletableFuture vs Future
CompletableFuture advantages:
- Manual completion support
- Non-blocking callbacks via
whenComplete(),thenApply() - Chainable operations
- Multiple futures combination via
allOf(),anyOf() - Exception handling via
exceptionally()