Java Concurrency Utilities Multi-threading and High-concurrency Programming
Understanding JUC
JUC Overview
In Java, threading represents a crucial concept, and JUC specifically addresses concurrent programming. JUC stands for java.util.concurrent - a utility package dedicated to concurrent programming operations. This toolkit handles thread-related functionalities and was introduced starting from JDK 1.5.
Process vs Thread
A process represents an executing program instance operating on a specific data collection within a computer system. It serves as the fundamental unit for resource allocation and scheduling, forming the foundation of operating system architecture. In modern thread-oriented computer architectures, processes act as containers for threads. While programs describe instructions, data, and their organizational structure, processes represent the actual program entity.
A thread represents the smallest unit capable of computation scheduling within an operating system. Threads exist within processes and function as the actual operational units. A single thread corresponds to a sequential control flow within a process, alllowing multiple concurrent threads within one process to execute different tasks simultaneously.
Thread States
- NEW (Newly created)
- RUNNABLE (Ready state)
- BLOCKED (Blocked state)
- WAITING (Waiting indefinitely)
- TIMED_WAITING (Timed waiting)
- TERMINATED (Completed)
Differences between wait() and sleep()
- Sleep is a static method of Thread class, while wait() belongs to Object class and can be called on any object instance.
- Sleep doesn't release locks and doesn't require lock acquisition. Wait() releases locks, but requires current thread to hold the lock (code must be within synchronized block).
- Both can be interrupted using the interrupt method.
Concurrency vs Parallelism
Sequential Execution
Sequential execution means all tasks follow a strict order. Each step must complete before the next begins - like loading cargo, transporting it, then unloading it in sequence.
Parallel Execution
Parallel execution allows simultaneous acquisition and processing of multiple tasks. This approach shortens queue lengths by dividing long queues into multiple shorter ones. Efficiency depends on multi-process/multi-thread code implementation and multi-core CPU hardware.
Concurrency
Concurrency refers to multiple programs running simultaneously - more precisely, multiple processes or instructions executing concurrently. For single-core CPUs, only one thread runs at any given moment. The "simultaneous" execution is actually rapid switching between threads, creating the illusion of parallelism while threads take turns accessing CPU resources.
To handle high concurrency, large tasks break into smaller ones. Since OS scheduling is random, various phenomena may occur:
- Small tasks might execute multiple times before proceeding
- Steps might execute out of order
- Multiple threads can execute these small tasks in parallel
Monitor Concept
A monitor ensures only one process operates within it at any given time. Operations defined within the monitor get called by only one process simultaneously (implemented by the compiler). This represents the concept of locking.
JVM synchronization relies on entering and exiting monitor objects. Every object has a monitor object that gets created and destroyed alongside the Java object. Execution threads must first acquire the monitor object before executing methods, releasing it upon completion. During method execution, the thread holds the monitor, preventing other threads from acquiring the same monitor.
User Threads vs Daemon Threads
User threads are standard threads created by applications, while daemon threads operate in the background performing special tasks like garbage collection. When the main thread ends, if user threads continue running, the JVM remains active. However, if only daemon threads remain, the JVM terminates.
Lock Interface
Synchronized Keyword
Synchronized is a Java keyword implementing synchronization locks. It can modify:
- Code blocks - creating synchronized statement blocks
- Methods - creating synchronized methods
- Static methods - affecting all instances of the class
- Classes - affecting all instances
First Multi-threading Example - Ticket Sales
// Resource class with properties and operations
class TicketCounter {
private int ticketsAvailable = 30;
public synchronized void sellTicket() {
if (ticketsAvailable > 0) {
System.out.println(Thread.currentThread().getName() + " : Sold: " + (ticketsAvailable--) + " Remaining: " + ticketsAvailable);
}
}
}
public class TicketSalesExample {
public static void main(String[] args) {
TicketCounter counter = new TicketCounter();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
counter.sellTicket();
}
}, "Seller-A").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
counter.sellTicket();
}
}, "Seller-B").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
counter.sellTicket();
}
}, "Seller-C").start();
}
}
What is Lock?
Lock implementations provide broader locking operations than synchronized methods and statements. They allow more flexible structures, different properties, and support multiple associated condition objects.
Lock Interface Methods
public interface Lock {
void acquire();
void acquireInterruptibly() throws InterruptedException;
boolean attemptAcquire();
boolean attemptAcquire(long time, TimeUnit unit) throws InterruptedException;
void release();
Condition createCondition();
}
Using Lock Method
The acquire() method obtains locks. If another thread holds the lock, the calling thread waits. When using Lock, manual lock release is required, and exceptions don't automatically release locks. Therefore, use try-catch-finally blocks with lock release in the finally section.
Lock mutex = ...;
mutex.acquire();
try {
// Execute operations
} catch (Exception ex) {
// Handle exception
} finally {
mutex.release(); // Release lock
}
Condition Implementation
While synchronized works with wait()/notify() for wait/notify patterns, Lock's createCondition() returns Condition objects that also implement wait/notify functionality. Unlike notify() which randomly awakens waiting threads, Condition enables selective notifications through:
- await(): Makes current thread wait while releasing lock, resuming when signaled
- signal(): Awakens one waiting thread
Both require holding the associated Lock before invocation.
ReentrantLock Implementation
ReentrantLock provides reentrant locking capabilities with additional functionality. Here's an example:
import java.util.concurrent.locks.ReentrantLock;
class TicketManager {
private int availableTickets = 30;
private final ReentrantLock lock = new ReentrantLock(true);
public void sellTicket() {
lock.acquire();
try {
if (availableTickets > 0) {
System.out.println(Thread.currentThread().getName() + " : Sold " + (availableTickets--) + " Remaining: " + availableTickets);
}
} finally {
lock.release();
}
}
}
public class TicketManagerExample {
public static void main(String[] args) {
TicketManager manager = new TicketManager();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
manager.sellTicket();
}
}, "Thread-A").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
manager.sellTicket();
}
}, "Thread-B").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
manager.sellTicket();
}
}, "Thread-C").start();
}
}
Key Differences Between Lock and Synchronized
- Lock is an interface while synchronized is a built-in Java keyword
- Synchronized automatically releases locks during exceptions, preventing deadlocks; Lock requires manual release in finally blocks
- Lock allows waiting threads to respond to interrupts, while synchronized does not
- Lock provides methods to check successful lock acquisition
- Lock offers better performance under high contention scenarios
Inter-thread Communication
Inter-thread communication uses two models: shared memory and message passing.
Synchronized Approach
package com.example.communication;
class SharedData {
private int value = 0;
public synchronized void increment() throws InterruptedException {
while (value != 0) {
this.wait();
}
value++;
System.out.println(Thread.currentThread().getName() + " :: " + value);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (value != 1) {
this.wait();
}
value--;
System.out.println(Thread.currentThread().getName() + " :: " + value);
this.notifyAll();
}
}
public class SynchronizedCommunication {
public static void main(String[] args) {
SharedData data = new SharedData();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Producer-A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Consumer-A").start();
}
}
Lock-based Approach
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class DataContainer {
private int counter = 0;
private Lock mutex = new ReentrantLock();
private Condition condition = mutex.newCondition();
public void increase() throws InterruptedException {
mutex.lock();
try {
while (counter != 0) {
condition.await();
}
counter++;
System.out.println(Thread.currentThread().getName() + " :: " + counter);
condition.signalAll();
} finally {
mutex.unlock();
}
}
public void decrease() throws InterruptedException {
mutex.lock();
try {
while (counter != 1) {
condition.await();
}
counter--;
System.out.println(Thread.currentThread().getName() + " :: " + counter);
condition.signalAll();
} finally {
mutex.unlock();
}
}
}
public class LockBasedCommunication {
public static void main(String[] args) {
DataContainer container = new DataContainer();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
container.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Incrementer").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
container.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Decrementer").start();
}
}
Customized Inter-thread Communication
Example: Thread A prints 'A' 5 times, Thread B prints 'B' 10 times, Thread C prints 'C' 15 times, repeating this sequence 10 times:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class CoordinatedResource {
private int sequence = 1; // 1=A, 2=B, 3=C
private Lock mutex = new ReentrantLock();
private Condition conditionA = mutex.newCondition();
private Condition conditionB = mutex.newCondition();
private Condition conditionC = mutex.newCondition();
public void displayA(int round) throws InterruptedException {
mutex.lock();
try {
while (sequence != 1) {
conditionA.await();
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " Round: " + round);
}
sequence = 2;
conditionB.signal();
} finally {
mutex.unlock();
}
}
public void displayB(int round) throws InterruptedException {
mutex.lock();
try {
while (sequence != 2) {
conditionB.await();
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " Round: " + round);
}
sequence = 3;
conditionC.signal();
} finally {
mutex.unlock();
}
}
public void displayC(int round) throws InterruptedException {
mutex.lock();
try {
while (sequence != 3) {
conditionC.await();
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + " Round: " + round);
}
sequence = 1;
conditionA.signal();
} finally {
mutex.unlock();
}
}
}
public class CustomizedCommunication {
public static void main(String[] args) {
CoordinatedResource resource = new CoordinatedResource();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
resource.displayA(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Thread-A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
resource.displayB(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Thread-B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
resource.displayC(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Thread-C").start();
}
}