Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Core Java Concurrency Utilities: From Fundamentals to Advanced Patterns

Tech May 8 4

1. Process and Thread Fundamentals

1.1 Understanding Processes and Threads

A process represents an executing program within the operating system. It serves as the fundamental unit for resource allocation and scheduling. In modern thread-based computing architectures, processes function as containers for threads. A process encompasses program code, data, and system resources.

A thread constitutes the smallest unit of processing that the operating system can schedule. Threads exist within processes and represent the actual execution path. A single process can contain multiple concurrent threads, with each thread handling different tasks simultaneously.

Key distinctions:

  • Process: An executing application in the system; once running, it becomes a process; represents the minimum unit of resource allocation
  • Thread: The basic unit of CPU time allocation; the minimum unit of program execution

1.2 Thread States in Java

The Thread.State inner class defines six distinct thread states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, and TERMINATED.

1.3 wait() and sleep() Differences

  1. sleep() is a static method in Thread; wait() is an Object method callable on any instance
  2. sleep() does NOT release the lock (it doesn't require one); wait() releases the lock BUT only when called within a synchronized block
  3. Both methods can be interrupted via the interrupt() method

1.4 Concurrency versus Parallelism

  • Concurrency: Multiple threads executing within the same time interval—parallel at the macro level, sequential at the micro level
  • Parallelism: Multiple threads executing simultaneously at the exact same moment

1.5 Monitors

A monitor ensures that only one process can execute within its critical section at any given time. Operations defined within the monitor are executed by only one thread at a time, enforced by the compiler.

1.6 User Threads vs Daemon Threads

User threads: Custom-created threads that keep the JVM alive when the main thread completes

Daemon threads: Background service threads (e.g., garbage collector)

When only daemon threads remain, the JVM terminates.

public class ThreadTypeDemo {
    public static void main(String[] args) {
        Thread worker = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " Daemon: " + Thread.currentThread().isDaemon());
            while (true) {
                // Infinite loop - JVM continues running
            }
        }, "Worker");
        worker.setDaemon(true);
        worker.start();
        System.out.println(Thread.currentThread().getName() + " finished");
    }
}

Output:

main finished Worker Daemon: true

2. The Lock Interface

2.1 Synchronized Keyword Review

The synchronized keyword serves as an intrinsic lock mechanism in Java. It can modify objects, variables, or methods to control sequential access. Below demonstrates a ticket sales system using synchronized:

class TicketInventory {
    private int remaining = 1000;

    public synchronized void sell() {
        if (remaining > 0) {
            System.out.println(Thread.currentThread().getName() + " sold ticket, remaining: " + --remaining);
        }
    }
}

public class TicketDemo {
    public static void main(String[] args) {
        TicketInventory inventory = new TicketInventory();
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                inventory.sell();
            }
        };
        new Thread(task, "WindowA").start();
        new Thread(task, "WindowB").start();
        new Thread(task, "WindowC").start();
    }
}

2.2 Lock Interface Overview

The Lock interface provides more flexible locking operations than synchronized blocks. It supports non-blocking lock attempts (tryLock()), interruptible lock acquisition (lockInterruptibly()), and timeout-based lock attempts.

ReentrantLock Implementation:

private final ReentrantLock monitor = new ReentrantLock();
try {
    monitor.lock();
    // business logic
} finally {
    monitor.unlock();
}

2.3 Thread Creation Strategies

  1. Extend the Thread class
  2. Implement the Runnable interface
  3. Implement the Callable interface
  4. Utilize thread pools

2.4 Ticket Sales with Lock

class TicketInventory {
    private int remaining = 1000;
    private final ReentrantLock monitor = new ReentrantLock();

    public void sell() {
        monitor.lock();
        try {
            if (remaining > 0) {
                System.out.println(Thread.currentThread().getName() + " sold ticket, remaining: " + --remaining);
            }
        } finally {
            monitor.unlock();
        }
    }
}

public class LockTicketDemo {
    public static void main(String[] args) {
        TicketInventory inventory = new TicketInventory();
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                inventory.sell();
            }
        };
        new Thread(task, "WindowA").start();
        new Thread(task, "WindowB").start();
        new Thread(task, "WindowC").start();
    }
}

2.5 Synchronized vs Lock

Feature Synchronized Lock
Type Built-in keyword Separate class
Lock Release Automatic Manual (finally block required)
Interrupt Response No Yes
Lock Status Check No Yes (tryLock())
Performance Basic Better for concurrent reads

3. Inter-Thread Communication

Two communication models exist: shared memory and message passing.

3.1 Synchronized Communication Pattern

The pattern involves:判断 (check), 操作 (process), 通知 (notify)

class SharedResource {
    private int value = 0;

    public synchronized void increment() throws InterruptedException {
        while (value != 0) {
            wait();
        }
        value++;
        System.out.print(Thread.currentThread().getName() + ":" + value + "--->");
        notifyAll();
    }

    public synchronized void decrement() throws InterruptedException {
        while (value != 1) {
            wait();
        }
        value--;
        System.out.println(Thread.currentThread().getName() + ":" + value);
        notifyAll();
    }
}

public class CommunicationDemo {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();
        
        Thread incThread = new Thread(() -> {
            try {
                for (int i = 0; i < 100; i++) resource.increment();
            } catch (InterruptedException e) { e.printStackTrace(); }
        }, "Incrementer");

        Thread decThread = new Thread(() -> {
            try {
                for (int i = 0; i < 100; i++) resource.decrement();
            } catch (InterruptedException e) { e.printStackTrace(); }
        }, "Decrementer");

        incThread.start();
        decThread.start();
    }
}

3.2 Spurious Wakeup Problem

Spuriosu wakeups occur in multi-threaded scenarios. With four threads (two incrementers, two decrementers), using if statements causes incorrect behavior because threads may wake up without proper signaling.

Solution: Always use while loops instead of if statements for condition checking:

while (value != 0) {
    wait();
}

3.3 Lock-Based Communication

Using Condition objects provides more granular control:

import java.util.concurrent.locks.*;

class SharedResource {
    private int value = 0;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void increment() {
        lock.lock();
        try {
            while (value != 0) condition.await();
            value++;
            System.out.print(Thread.currentThread().getName() + ":" + value + "--->");
            condition.signalAll();
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { lock.unlock(); }
    }

    public void decrement() {
        lock.lock();
        try {
            while (value != 1) condition.await();
            value--;
            System.out.println(Thread.currentThread().getName() + ":" + value);
            condition.signalAll();
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { lock.unlock(); }
    }
}

4. Customized Inter-Thread Communication

Scenario: Three threads print specific counts—Thread A prints 5 times, Thread B prints 10 times, Thread C prints 15 times, repeated for 10 rounds.

class Controller {
    private int flag = 1;
    private final Lock lock = new ReentrantLock();
    private final Condition condA = lock.newCondition();
    private final Condition condB = lock.newCondition();
    private final Condition condC = lock.newCondition();

    public void printA(int loop) {
        lock.lock();
        try {
            while (flag != 1) condA.await();
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
            }
            flag = 2;
            condB.signal();
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { lock.unlock(); }
    }

    public void printB(int loop) {
        lock.lock();
        try {
            while (flag != 2) condB.await();
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
            }
            flag = 3;
            condC.signal();
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { lock.unlock(); }
    }

    public void printC(int loop) {
        lock.lock();
        try {
            while (flag != 3) condC.await();
            for (int i = 1; i <= 15; i++) {
                System.out.println(Thread.currentThread().getName() + " round " + loop + ", iteration " + i);
            }
            flag = 1;
            condA.signal();
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { lock.unlock(); }
    }
}

This represents the single-flag synchronization pattern, following the principles of mutual exclusion.

5. Thread-Safe Collections

5.1 Collection Thread Safety Issues

The standard ArrayList.add() method lacks synchronization, causing ConcurrentModificationException in concurrent scenarios:

public class UnsafeCollectionDemo {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            final int id = i;
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(list);
            }, String.valueOf(id)).start();
        }
    }
}

5.2 Solution: Vector

Vector's add() method is synchronized but uses heavyweight locking with poor performance.

5.3 Solution: Collections Wrapper

List<String> list = Collections.synchronizedList(new ArrayList<>());

5.4 Solution: CopyOnWriteArrayList (Preferred)

Implements copy-on-write semantics: reads occur concurrently while writes operate on separate copies.

List<String> list = new CopyOnWriteArrayList<>();

Implementation insight:

public boolean add(E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] current = getArray();
        int len = current.length;
        Object[] updated = Arrays.copyOf(current, len + 1);
        updated[len] = element;
        setArray(updated);
        return true;
    } finally {
        lock.unlock();
    }
}

5.5 HashSet Thread Safety

HashSet has the same concurrency issues. Solution:

Set<String> set = new CopyOnWriteArraySet<>();

5.6 HashMap Thread Safety

Use ConcurrentHashMap for thread-safe map operations.

6. Multi-Threaded Locking Mechanisms

6.1 Synchronized Lock Scenarios

  • Instance methods: Lock on the current instance
  • Static methods: Lock on the Class object
  • Synchronized blocks: Lock on the specified object
class Device {
    public synchronized void method1() throws Exception {
        TimeUnit.SECONDS.sleep(4);
        System.out.println("Method1 executed");
    }
    
    public synchronized void method2() {
        System.out.println("Method2 executed");
    }
    
    public void method3() {
        System.out.println("Method3 executed");
    }
}

Lock behavior depends on whether methods are static, the number of instances, and object identity.

6.2 Fair vs Unfair Locks

  • Fair lock: Lower efficiency but better CPU utilization
  • Unfair lock: Higher efficiency but potential thread starvation
ReentrantLock fairLock = new ReentrantLock(true);
ReentrantLock unfairLock = new ReentrantLock(false);

6.3 Reentrant Locks

Both synchronized and ReentrantLock are reentrant (recursive) locks. The same thread can acquire the lock multiple times without deadlocking.

public class ReentrantDemo {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("First lock acquired");
                lock.lock();
                try {
                    System.out.println("Second lock acquired");
                } finally { lock.unlock(); }
            } finally { lock.unlock(); }
        }).start();
    }
}

6.4 Deadlock

Deadlock occurs when two or more threads mutually wait for resources held by each other.

Conditions:

  1. Mutual exclusion
  2. Hold and wait
  3. No preemption
  4. Circular wait
public class DeadlockSimulation {
    private static final Object resourceA = new Object();
    private static final Object resourceB = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (resourceA) {
                System.out.println(Thread.currentThread().getName() + " acquired resourceA");
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
                synchronized (resourceB) {
                    System.out.println(Thread.currentThread().getName() + " acquired resourceB");
                }
            }
        }, "ThreadAlpha").start();

        new Thread(() -> {
            synchronized (resourceB) {
                System.out.println(Thread.currentThread().getName() + " acquired resourceB");
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
                synchronized (resourceA) {
                    System.out.println(Thread.currentThread().getName() + " acquired resourceA");
                }
            }
        }, "ThreadBeta").start();
    }
}

Detection: Use jps and jstack commands to diagnose deadlocks.

7. Callable Interface

Callable provides a way to create threads with return values, unlike Runnable.

public class CallableDemo {
    public static void main(String[] args) throws Exception {
        FutureTask<String> task = new FutureTask<>(() -> {
            System.out.println("Callable executing in " + Thread.currentThread().getName());
            return "Result from Callable";
        });
        
        new Thread(task).start();
        System.out.println("Returned value: " + task.get());
    }
}

Key differences:

  • Runnable.run(): No return value, cannot throw checked exceptions
  • Callable.call(): Returns value, can throw exceptions

8. JUC Utility Classes

8.1 CountDownLatch

Enables waiting for a specific number of operations to complete.

public class StudentLeaveDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(6);
        
        for (int i = 1; i <= 6; i++) {
            final int studentId = i;
            new Thread(() -> {
                System.out.println("Student " + studentId + " left classroom");
                latch.countDown();
            }, String.valueOf(studentId)).start();
        }
        
        latch.await();
        System.out.println("Monitor locked the door");
    }
}

8.2 CyclicBarrier

Allows threads to wait for each other until all reach a common barrier point.

public class DragonBallDemo {
    private static final int REQUIRED = 7;
    
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(REQUIRED, 
            () -> System.out.println("All dragon balls collected! Summon the dragon!"));
        
        for (int i = 1; i <= REQUIRED; i++) {
            final int ball = i;
            new Thread(() -> {
                System.out.println("Dragon ball " + ball + " collected");
                try { barrier.await(); } catch (Exception e) { e.printStackTrace(); }
            }, String.valueOf(i)).start();
        }
    }
}

8.3 Semaphore

Controls access to resources with a limited number of permits.

public class ParkingDemo {
    public static void main(String[] args) {
        Semaphore spots = new Semaphore(3);
        
        for (int i = 1; i <= 6; i++) {
            final int vehicle = i;
            new Thread(() -> {
                try {
                    spots.acquire();
                    System.out.println("Vehicle " + vehicle + " parked");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println("Vehicle " + vehicle + " left");
                } catch (InterruptedException e) { e.printStackTrace(); }
                finally { spots.release(); }
            }, String.valueOf(i)).start();
        }
    }
}

9. Read-Write Locks

9.1 Pessimistic vs Optimistic Locking

  • Pessimistic: Assumes conflict, acquires lock before operation
  • Optimistic: Assumes no conflict, uses version checking

9.2 Read-Write Lock Implementation

class CacheStore {
    private final Map<String, Object> cache = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void write(String key, Object value) {
        rwLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " writing " + key);
            TimeUnit.MILLISECONDS.sleep(300);
            cache.put(key, value);
            System.out.println(Thread.currentThread().getName() + " wrote " + key);
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { rwLock.writeLock().unlock(); }
    }

    public Object read(String key) {
        rwLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " reading " + key);
            TimeUnit.MILLISECONDS.sleep(300);
            Object value = cache.get(key);
            System.out.println(Thread.currentThread().getName() + " read " + key);
            return value;
        } catch (InterruptedException e) { e.printStackTrace(); }
        finally { rwLock.readLock().unlock(); }
    }
}

9.3 Lock Downgrade

Write lock can be downgraded to read lock:

writeLock.lock();
// perform write operations
readLock.lock();
// perform read operations
writeLock.unlock();
readLock.unlock();

10. Blocking Queues

10.1 Overview

Blocking queues provide thread-safe producer-consumer patterns:

  • Blocking on insertion when full
  • Blocking on removal when empty

10.2 Core Methods

Operation Throws Exception Special Value Blocks Timeout
Insert add() offer() put() offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() - -

10.3 Implementation Types

  • ArrayBlockingQueue: Array-based, bounded, single lock
  • LinkedBlockingQueue: Linked list, optionally unbounded, dual locks
  • DelayQueue: Priority-based, element-specific delays
  • PriorityBlockingQueue: Priority-ordered, unbounded
  • SynchronousQueue: No storage, direct handoff
  • LinkedTransferQueue: Transfer mechanism, unbounded

11. Thread Pools

11.1 Benefits

  • Reduces thread creation/destruction overhead
  • Provides faster response time
  • Enables centralized thread management

11.2 Executor Types

// Fixed thread pool - N persistent threads
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// Single thread executor
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// Cached pool - scales dynamically
ExecutorService cachedPool = Executors.newCachedThreadPool();

11.3 ThreadPoolExecutor Parameters

Seven core parameters:

  1. corePoolSize: Minimum threads kept alive
  2. maximumPoolSize: Maximum thread count
  3. keepAliveTime: Extra thread lifetime
  4. TimeUnit: Time unit for keepAliveTime
  5. workQueue: Task queue
  6. threadFactory: Thread creation strategy
  7. handler: Rejection policy

11.4 Rejection Policies

  • AbortPolicy: Throw RejectedExecutionException (default)
  • CallerRunsPolicy: Execute in caller's thread
  • DiscardOldestPolicy: Discard oldest pending task
  • DiscardPolicy: Silently discard task

11.5 Custom Thread Pool (Recommended)

ExecutorService pool = new ThreadPoolExecutor(
    2,                  // core pool size
    5,                  // maximum pool size
    2L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(3),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);

12. Fork/Join Framework

Splits large tasks into smaller subtasks, processes them in parallel, then merges results.

class SumTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10;
    private int start, end;
    private int result;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if ((end - start) <= THRESHOLD) {
            for (int i = start; i <= end; i++) result += i;
        } else {
            int mid = (start + end) / 2;
            SumTask left = new SumTask(start, mid);
            SumTask right = new SumTask(mid + 1, end);
            left.fork();
            right.fork();
            result = left.join() + right.join();
        }
        return result;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) throws Exception {
        ForkJoinPool executor = new ForkJoinPool();
        ForkJoinTask<Integer> task = executor.submit(new SumTask(1, 100));
        System.out.println("Result: " + task.get());
    }
}

13. Asynchronous Programming

13.1 CompletableFuture

Provides powerful async operations with callback support:

public class AsyncDemo {
    public static void main(String[] args) throws Exception {
        // Async without return value
        CompletableFuture.runAsync(() -> 
            System.out.println("Running async task"));

        // Async with return value
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Computing result");
            return 42;
        });

        future.whenComplete((result, error) -> {
            System.out.println("Result: " + result);
            System.out.println("Error: " + error);
        });
    }
}

13.2 CompletableFuture vs Future

CompletableFuture advantages:

  • Manual completion support
  • Non-blocking callbacks via whenComplete(), thenApply()
  • Chainable operations
  • Multiple futures combination via allOf(), anyOf()
  • Exception handling via exceptionally()
Tags: JavaJUC

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.