Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Common Fundamentals of Java Concurrent Programming

Tech 1

Creating Threads

Extending the Thread Class

One of the simplest approaches to create a new thread in Java is extending the Thread class. You just need to create a subclass that inherits from Thread, override the run() method, and call the start() method to launch the thread. If the JVM uses a 1:1 threading model, the start() method will internally create a kernel thread via pthread_create() from the POSIX thread library, and map the Java thread instance to this kernel thread, though mapping strategies vary across different JVM implementations.

The main advantage of this approach is simplicity, making it ideal for trivial thread logic. However, since Java does not support multiple inheritance, extending Thread binds your class's inheritance hierarchy, limiting flexibility.

Example code for creating a thread by extending Thread:

package io.example;

import java.lang.management.ManagementFactory;

public class CustomThread extends Thread {
    @Override
    public void run() {
        long currentThreadId = Thread.currentThread().getId();
        String currentPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", currentPid, currentThreadId);
    }
}
package io.example;

import java.lang.management.ManagementFactory;

public class ThreadDemo {
    public static void main(String[] args) {
        Thread worker = new CustomThread();
        worker.start();

        long mainThreadId = Thread.currentThread().getId();
        String mainPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", mainPid, mainThreadId);
    }
}

Sample output:

Process ID: 66201, Thread ID: 13
Process ID: 66201, Thread ID: 1

Important Note: You must call start() to launch a new thread, rather than directly calling run(). Direct invocation of run() treats the thread object as a regular Java object and executes the method in the current thread, no new kernel thread is registered with the operating system, so your code still runs single-threaded.

Implementing the Runnable Interface

Implementing the Runnable interface is a more flexible approach to thread creation. This allows your class to implement other interfaces and extend other base classes while still supporting threaded execution.

Example code for creating a thread via Runnable:

package io.example;

import java.lang.management.ManagementFactory;

public class CustomRunnable implements Runnable {
    @Override
    public void run() {
        long currentThreadId = Thread.currentThread().getId();
        String currentPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", currentPid, currentThreadId);
    }
}
package io.example;

import java.lang.management.ManagementFactory;

public class RunnableDemo {
    public static void main(String[] args) {
        Runnable task = new CustomRunnable();
        new Thread(task).start();

        long mainThreadId = Thread.currentThread().getId();
        String mainPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", mainPid, mainThreadId);
    }
}

Sample output:

Process ID: 7388, Thread ID: 14
Process ID: 7388, Thread ID: 1

You can also simplify this approach with anonymous inner classes or lambda expressions:

package io.example;

import java.lang.management.ManagementFactory;

public class SimplifiedRunnableDemo {
    public static void main(String[] args) {
        // Anonymous inner class simplification
        Runnable anonymousTask = new Runnable() {
            @Override
            public void run() {
                long tid = Thread.currentThread().getId();
                String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
                System.out.printf("Process ID: %s, Thread ID: %d%n", pid, tid);
            }
        };
        new Thread(anonymousTask).start();

        // Lambda expression simplification
        new Thread(() -> {
            long tid = Thread.currentThread().getId();
            String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
            System.out.printf("Process ID: %s, Thread ID: %d%n", pid, tid);
        }).start();

        long mainTid = Thread.currentThread().getId();
        String mainPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", mainPid, mainTid);
    }
}

Implementing the Callable Interface

Callable allows you to create asynchronous tasks that return a result after execution. Unlike Runnable, Callable's call() method can return a value and throw checked exceptions.

Example of creating a thread with Callable and retrieving the result:

package io.example;

import java.lang.management.ManagementFactory;
import java.util.concurrent.Callable;

public class ResultTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        long tid = Thread.currentThread().getId();
        String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        return String.format("Process ID: %s, Thread ID: %d", pid, tid);
    }
}
package io.example;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> task = new ResultTask();
        FutureTask<String> resultContainer = new FutureTask<>(task);
        new Thread(resultContainer).start();
        // get() blocks the main thread until the worker thread completes execution
        System.out.println(resultContainer.get());

        long mainTid = Thread.currentThread().getId();
        String mainPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", mainPid, mainTid);
    }
}

Sample output:

Process ID: 25580, Thread ID: 14
Process ID: 25580, Thread ID: 1

This approach can also be simplified with lambdas or anonymous classes:

package io.example;

import java.lang.management.ManagementFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class SimplifiedCallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<>(() -> {
            long tid = Thread.currentThread().getId();
            String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
            return String.format("Process ID: %s, Thread ID: %d", pid, tid);
        });
        new Thread(task).start();
        System.out.println(task.get());

        long mainTid = Thread.currentThread().getId();
        String mainPid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        System.out.printf("Process ID: %s, Thread ID: %d%n", mainPid, mainTid);
    }
}

Common Thread Methods

The Thread class provides a wide range of utility methods for thread management:

Common Method Description
public static Thread currentThread() Get the thread instance currently executing code
public void run() The entry point for a thread's task logic
public void start() Launch a new thread
public String getName() Get the name of the current thread; default name follows the pattern Thread-<index>
public void setName(String name) Set a custom name for the thread
public static void sleep(long time) Pause the current executing thread for the given number of milliseconds
public final void join() Makes the calling thread wait for the target thread to complete execution before continuing, similar to pthread_join in the POSIX thread library

Basic example of using join():

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread worker = new Thread(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println(Thread.currentThread().getName());
        });

        worker.start();
        // Main thread blocks here until worker completes
        worker.join();

        System.out.println(Thread.currentThread().getName());
    }
}

Note: Even if you don't call join(), the worker thread will still complete its task. The JVM only exits when all non-daemon threads have terminated. Both main thread and user-created threads are non-daemon by default, so main thread exiting will not stop running worker threads. However, if you mark a thread as a daemon thread with setDaemon(true), the JVM will exit immediately once the main thread completes, even if the daemon thread is still running.

public class DaemonDemo {
    public static void main(String[] args) {
        Thread worker = new Thread(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println(Thread.currentThread().getName());
        });

        worker.setDaemon(true);
        worker.start();

        System.out.println(Thread.currentThread().getName());
    }
}

Sample output:

Thread-0
main

Thread Synchronization

Locks

StampedLock can deliver better concurrency performance in specific scenarios, but requires careful usage to avoid overly complex code and potential deadlocks.

Both synchronized and ReentrantLock are implemented based on pessimistic locking, which assumes that concurrent conflicts will occur when accessing critical section code. In high-concurrency scenarios, intense lock contention can cause thread blocking and degrade performance. Especially in read-heavy workloads, pessimistic locking adds significant unnecessary overhead, since every read operation requires acquiring an exclusive lock.

In contrast, StampedLock uses an optimistic locking approach that is much better suited for read-heavy scenarios. Optimistic locking assumes no concurrent conflicts during data access, so it avoids lock contention, blocking, and deadlocks entirely for most operations. It only verifies if the resource was modified by another thread when committing changes. However, in write-heavy scenarios, optimistic locking will experience frequent validation failures and retries, which can also hurt performance.

synchronized

synchronized is a keyword built into Java for implementing thread synchronization. It can be used to create synchronized blocks or synchronized methods, ensuring safe access to shared resources in multi-threaded environments, preventing issues like race conditions and data inconsistency.

Before Java 6, synchronized only implemented the traditional heavyweight lock mechanism, which relies on the object's internal monitor, leading to significant performance overhead. Java 6 introduced two new optimizations: biased locking and lightweight locking, which eliminate unnecessary overhead in scenarios with little or no contention. These locking strategies are automatically selected by the JVM, so developers don't need to manually manage them. The JVM will prefer biased locking and lightweight locking before falling back to heavyweight locking.

Biased locking optimizes for when the same thread repeatedly acquires the same lock: the JVM stores the ID of the biased thread in the object header and stack frame lock record, so subsequent acquisitions require no synchronization overhead, eliminating locking overhead in uncontended scenarios.

Lightweight locking uses an optimistic CAS-based approach to acquire locks. CAS is implemented via the compareAndSwap method in the Unsafe class, which uses JNI to call inline assembly for atomic operations, and includes checks to avoid the ABA problem.

Heavyweight locking relies on the OS-provided monitor mechanism. Java's monitor implementation is based on ObjectMonitor, a C++ implemented structure internal to the JVM. Every object has a built-in ObjectMonitor instance, which ensures only one thread can own the object's monitor at a time (implemented via OS-level mutex). Accessing synchronized blocks/methods essential acquires the monitor of the associated lock object.

Synchronized Blocks

Using synchronized around a code block creates a synchronized block, which guarantees only one thread can enter the block at a time. You specify a lock object as the synchronization guard, and place the critical section code inside the block.

While any valid object can act as a lock, overly coarse lock granularity can hurt concurrency, while overly fine granularity can leave race conditions unaddressed. For instance methods, this is the conventional lock object; for static methods, the class object ClassName.class is conventionally used.

Basic example of synchronized blocks:

package io.example;

public class SyncBlockDemo {
    private static int sharedCounter = 0;
    private static final Object syncLock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                synchronized (syncLock) {
                    sharedCounter++;
                }
            }
        };

        Thread t1 = new Thread(incrementTask);
        Thread t2 = new Thread(incrementTask);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + sharedCounter);
    }
}

Output:

Final counter value: 20000

Inside synchronized blocks, you can also use wait() to put the current thread into a waiting state, until another thread calls notify() or notifyAll() on the same lock object to wake it up. All three methods rely on the monitor mechanism, so they can only be used when holding the lock on the target object.

Basic example of wait() and notify():

package io.example;

public class WaitNotifyDemo {
    public static void main(String[] args) {
        final Object conditionLock = new Object();

        Thread waitingThread = new Thread(() -> {
            synchronized (conditionLock) {
                System.out.println("Waiter: Waiting for notification...");
                try {
                    conditionLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Waiter: Received notification!");
            }
        });

        Thread notifyingThread = new Thread(() -> {
            synchronized (conditionLock) {
                System.out.println("Notifier: Running background work...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Notifier: Work completed, sending notification...");
                conditionLock.notify();
            }
        });

        waitingThread.start();
        notifyingThread.start();
    }
}

Output:

Waiter: Waiting for notification...
Notifier: Running background work...
Notifier: Work completed, sending notification...
Waiter: Received notification!
Synchronized Methods

Adding the synchronized keyword to a method definition turns the entire method into a synchronized block, with an implicit lock object. For instance methods, the implicit lock is this; for static methods, the implicit lock is the class object.

Synchronized methods are very simple to use, but they have a larger lock scope which can hurt performance if most of the method doesn't access shared state.

Basic example of synchronized methods:

package io.example;

public class SyncMethodDemo {
    private static int sharedCounter = 0;

    public static synchronized void incrementCounter() {
        for (int i = 0; i < 10000; i++) {
            sharedCounter++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(SyncMethodDemo::incrementCounter);
        Thread t2 = new Thread(SyncMethodDemo::incrementCounter);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + sharedCounter);
    }
}

Output:

Final counter value: 20000

ReentrantLock

ReentrantLock is an explicit implementation of a reentrant lock, defaulting to non-fair ordering, and offers more flexibility than the implicit synchronized keyword. With ReentrantLock, you explicitly acquire and release locks, allowing you to precisely control the scope of synchronization.

ReentrantLock adds useful features like timed lock waiting, configurable fairness, and explicit condition variables. However, you must manually release the lock, so you should always release it in a finally block to avoid deadlocks if an exception is thrown.

Basic example of ReentrantLock:

package io.example;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    private static final Lock reentrantLock = new ReentrantLock();
    private static int sharedCounter = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                reentrantLock.lock();
                try {
                    sharedCounter++;
                } finally {
                    // Guarantee lock release even if an exception occurs
                    reentrantLock.unlock();
                }
            }
        };

        Thread t1 = new Thread(incrementTask);
        Thread t2 = new Thread(incrementTask);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + sharedCounter);
    }
}

Output:

Final counter value: 20000

StampedLock

StampedLock was introduced in Java 8, and supports optimistic reads, pessimistic reads, and write locks. It uses optimistic reads to deliver higher concurrency, and supports upgrading to pessimistic locks or write locks. It is not reentrant, and does not support Condition variables.

StampedLock has three modes of access control:

  1. Optimistic Read: This is a lock-free operation that assumes no write conflicts occur. The thread reads data directly without acquiring a lock, then validates the data by checking a version stamp after reading. If the stamp is valid, the operation succeeds; if not, fall back to another locking strategy. Ideal for read-heavy workloads.
  2. Pessimistic Read: A conventional read lock that blocks writes but allows other concurrent readers. Used for mixed read-write workloads where consistency is required.
  3. Write Lock: Blocks all other reads and writes, used to protect modifications to shared state.

Basic example of StampedLock:

package io.example;

import java.util.concurrent.locks.StampedLock;

public class StampedLockDemo {
    private static final StampedLock stampedLock = new StampedLock();
    private static int sharedCounter = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                long stamp = stampedLock.writeLock();
                try {
                    sharedCounter++;
                } finally {
                    stampedLock.unlockWrite(stamp);
                }
            }
        };

        Thread t1 = new Thread(incrementTask);
        Thread t2 = new Thread(incrementTask);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + sharedCounter);
    }
}

Atomic Variables

Besides the optimistic read mode in StampedLock, the atomic variable classes under the java.util.concurrent.atomic package (like AtomicInteger, AtomicLong, AtomicIntegerArray, AtomicReference) are also implemented based on optimistic locking.

Plain atomic variables can suffer from the ABA problem. To avoid this, you can use AtomicStampedReference, which maintains an automatically incrementing version stamp alongside the object reference, to prevent ABA issues via version checking.

Basic example of AtomicInteger:

package io.example;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerDemo {
    private static final AtomicInteger sharedCounter = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                sharedCounter.incrementAndGet();
            }
        };

        Thread t1 = new Thread(incrementTask);
        Thread t2 = new Thread(incrementTask);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + sharedCounter.get());
    }
}

Basic example of AtomicReference:

package io.example;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {
    private static final AtomicReference<Integer> counterRef = new AtomicReference<>(0);

    public static void main(String[] args) throws InterruptedException {
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                while (true) {
                    Integer current = counterRef.get();
                    Integer updated = current + 1;
                    if (counterRef.compareAndSet(current, updated)) {
                        break;
                    }
                }
            }
        };

        Thread t1 = new Thread(incrementTask);
        Thread t2 = new Thread(incrementTask);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final counter value: " + counterRef.get());
    }
}

Output:

Final counter value: 20000

ThreadLocal

Core Principles

ThreadLocal is used for scenarios where an instance needs to be accessed across multiple methods, but should not be shared between threads. It allows each thread to store its own independent copy of the variable, so operations by one thread do not interfere with others.

The example below demonstrates how each thread gets an independent copy of a ThreadLocal variable:

public class ThreadLocalDemo {
    private static final ThreadLocal<Integer> threadLocalCounter = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new CounterTask()).start();
        }
    }

    static class CounterTask implements Runnable {
        @Override
        public void run() {
            Integer initialValue = threadLocalCounter.get();
            System.out.printf("%s initial value: %d%n", Thread.currentThread().getName(), initialValue);

            initialValue += 1;
            threadLocalCounter.set(initialValue);

            System.out.printf("%s modified value: %d%n", Thread.currentThread().getName(), threadLocalCounter.get());
        }
    }
}

Internally, every Thread instance has a member variable threadLocals of type ThreadLocal.ThreadLocalMap, meaning each thread has its own private ThreadLocalMap hash map. ThreadLocalMap uses a custom entry class that extends WeakReference<ThreadLocal<?>>, so the key (the ThreadLocal instance) is held as a weak reference, and the value is the object associated with the ThreadLocal. Read and write operations on ThreadLocal essentially just read and write entries in the current thread's ThreadLocalMap.

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

ThreadLocalMap uses weak references and opportunistic cleanup to avoid potential memory leaks.

When the strong reference to the ThreadLocal instance is dropped, the only reference to the ThreadLocal is the weak reference in the ThreadLocalMap, so it will be garbage collected automatically, preventing leaks of the ThreadLocal object itself.

However, after the ThreadLocal is collected, entries in ThreadLocalMap will have a null key, but the value still has a strong reference chain from the Thread instance. If the thread is long-lived (like a core thread in a thread pool), the value will never be collected. To fix this, ThreadLocal automatically cleans up entries with null keys during calls to get(), set(), and remove(), which prevents the value from being retained unnecessarily.

Memory Leaks and Stale Data

ThreadLocal only causes memory leaks if you do not call remove() after use AND the thread is never destroyed (like core threads in a thread pool). If either condition is not met, no memory leak occurs.

When a thread calls set() to store a value, the entry in ThreadLocalMap has a weak reference key for the ThreadLocal and a strong reference to the value. If the thread lives forever and you never call remove(), and the ThreadLocal still has a strong reference elsewhere, the entry and value will never be collected, causing a memory leak.

If the strong reference to the ThreadLocal is dropped, the ThreadLocal itself will be collected by GC due to the weak reference key, but the value can still be retained if opportunistic cleanup is never triggered, leading to a memory leak.

To avoid issues, you should avoid using ThreadLocal in long-lived threads where possible, and always call remove() after you are done using the variable. Additionally, you need to watch out for stale data when using ThreadLocal in thread pools, where threads are reused across multiple tasks.

CountDownLatch

CountDownLatch is a synchronization utility that allows one or more threads to wait for a set of other threads to complete their tasks. It uses a counter to track pending tasks: when the counter reaches zero, all waiting threads are released.

CountDownLatch provides the following core methods:

  • CountDownLatch(int count): Constructor that initializes the counter with the given number of pending tasks.
  • void await() throws InterruptedException: Blocks the calling thread until the counter reaches zero.
  • boolean await(long timeout, TimeUnit unit) throws InterruptedException: Blocks for up to the specified timeout, then returns even if the counter has not reached zero.
  • void countDown(): Decrements the counter, called by a worker thread when it completes its task.

Simple example of CountDownLatch:

package io.example;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CountDownLatch latch = new CountDownLatch(10);

        Runnable task = () -> {
            System.out.println("Task completed");
            latch.countDown();
        };

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 10; i++) {
            executor.submit(task);
        }
        latch.await();

        long endTime = System.currentTimeMillis();
        System.out.println("Total execution time: " + (endTime - startTime) + "ms");
    }
}

Sample output:

Task completed
Task completed
Task completed
Task completed
Task completed
Task completed
Task completed
Task completed
Task completed
Task completed
Total execution time: 1ms

Future and CompletableFuture

Future is an interface that represents the result of an asynchronous task, allowing you to submit a task to an executor and retrieve the result at a later time. Its commonly used for long-running operations to avoid blocking the main thread, improving application responsiveness.

Core methods of the Future interface:

  • get(): Retrieves the result of the asynchronous task. If the task is not yet complete, this method blocks the calling thread until the task finishes.
  • isDone(): Checks if the task has completed, returns true if finished, false otherwise.
  • cancel(boolean mayInterruptIfRunning): Attempts to cancel the task. If the task is running, mayInterruptIfRunning controls whether it should be interrupted. If cancelled successfully, get() will throw a CancellationException.

Future is commonly used with ExecutorService to submit tasks and retrieve results. Java 8 introduced CompletableFuture, which extends Future and adds many powerful features, including support for functional programming and composition of multiple asynchronous tasks.

Example of asynchronous task execution with Future:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(2000); // Simulate long-running work
            return 42;
        });

        try {
            System.out.println("Waiting for task to complete...");
            Integer result = future.get();
            System.out.println("Task completed, result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

Example of composing multiple asynchronous tasks with CompletableFuture:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    /**
     * Task dependency graph:
     * cf1 -> cf3
     * cf1 + cf2 -> cf4
     * cf2 -> cf5
     * cf3 + cf4 + cf5 -> cf6
     */
    public static void main(String[] args) {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "1");
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "2");
        
        CompletableFuture<String> cf3 = cf1.thenApply(res1 -> res1 + "3");
        CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (res1, res2) -> res1 + res2 + "4");
        CompletableFuture<String> cf5 = cf2.thenApply(res2 -> res2 + "5");
        
        CompletableFuture<String> cf6 = CompletableFuture.allOf(cf3, cf4, cf5).thenApply(v -> {
            String res3 = cf3.join();
            String res4 = cf4.join();
            String res5 = cf5.join();
            return res3 + res4 + res5 + "6";
        });

        String finalResult = cf6.join();
        System.out.println(finalResult);
    }
}

volatile

The volatile keyword has two key guarantees:

  • Guarantees visibility of shared variable modifications across threads: Per the Java Memory Model (JMM) Happens-Before rule, a write to a volatile variable always happens-before any subsequent read of that variable, so any changes made by one thread are immediately visible to all other threads reading the variable.
  • Prevents instruction reordering: The compiler inserts special memory barrier instructions into the bytecode around volatile variable accesses, which prevents the JVM and CPU from reordering instructions before and after the volatile access. This guarantees that when execution reaches a volatile access, all preceding instructions have completed, and all subsequent instructions have not yet executed.

Thread Pools

ThreadPoolExecutor

The core interface for thread pools in Java is ExecutorService, and the most commonly used implementation is ThreadPoolExecutor. Its constructor signature is:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                   long keepAliveTime, TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler)

Parameter description:

  • corePoolSize: The number of core threads in the pool. When the number of pending tasks is less than the queue capacity, core threads are kept alive even when idle, and are not destroyed until the pool is shut down.
  • maximumPoolSize: The maximum number of threads the pool can grow to. If all core threads are busy and the work queue is full, the pool can create new threads up to this limit. Any new tasks that arrive after this limit is hit will be rejected.
  • keepAliveTime: The maximum idle time for non-core threads. When the number of threads in the pool exceeds corePoolSize, any extra non-core threads that are idle longer than this time will be destroyed to free up resources.
  • unit: The time unit for keepAliveTime.
  • workQueue: The blocking queue that stores pending tasks waiting for execution. Common implementations include LinkedBlockingQueue (unbounded, max size Integer.MAX_VALUE), ArrayBlockingQueue (bounded), and PriorityBlockingQueue (priority ordered based on heap).
  • threadFactory: A factory used to create new thread instances. You can provide a custom implementation to control how threads are created.
  • handler: The rejection policy used to handle tasks that cannot be processed by the pool.

Rejection policies:

Policy Description
ThreadPoolExecutor.AbortPolicy Rejects the task and throws RejectedExecutionException, the default policy
ThreadPoolExecutor.DiscardPolicy Silently discards the task without throwing an exception, not recommended
ThreadPoolExecutor.DiscardOldestPolicy Discards the oldest pending task in the queue, then retries adding the new task
ThreadPoolExecutor.CallerRunsPolicy Runs the task directly on the caller thread, bypassing the pool

Key rules for task submission:

  • A new temporary non-core thread is only created when all core threads are busy, the work queue is full, and the total thread count is below maximumPoolSize.
  • Task rejection only occurs when all core and non-core threads are busy, the work queue is full, and a new task is submitted.

Common methods of ThreadPoolExecutor:

Method Description
void execute(Runnable command) Execute a Runnable task
Future<T> submit(Callable<T> task) Execute a Callable task, return a Future to retrieve the result
void shutdown() Shuts down the pool after all queued and running tasks complete
List<Runnable> shutdownNow() Immediately shuts down the pool, interrupts running tasks, and returns all unprocessed queued tasks

Basic example of ThreadPoolExecutor:

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class WorkerTask implements Runnable {
    @Override
    public void run() {
        System.out.printf("[%s] %s%n", Thread.currentThread().getName(),
                          LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(2, 3,
                                                      8, TimeUnit.SECONDS,
                                                      new ArrayBlockingQueue<>(2),
                                                      Executors.defaultThreadFactory(),
                                                      new ThreadPoolExecutor.AbortPolicy());

        Runnable task = new WorkerTask();

        pool.execute(task); // Uses core thread 1
        pool.execute(task); // Uses core thread 2
        pool.execute(task); // Waits in queue
        pool.execute(task); // Waits in queue
        pool.execute(task); // Queue full, create non-core thread 3
        pool.execute(task); // All threads busy, queue full, rejected

        pool.shutdown();
    }
}

Sample output:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task WorkerTask@7a0ac6e3 rejected from java.util.concurrent.ThreadPoolExecutor@71be98f5[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
	at ThreadPoolDemo.main(Main.java:20)
[pool-1-thread-2] 2023-08-30 15:57:44
[pool-1-thread-1] 2023-08-30 15:57:44
[pool-1-thread-3] 2023-08-30 15:57:44

Example of catching task exceptions with submit():

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;

public class ThreadPoolExceptionDemo {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        Future<Integer> future = executor.submit(() -> {
            throw new RuntimeException("Exception thrown from worker task");
        });

        try {
            Integer result = future.get();
            System.out.println("Task result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

Output:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Exception thrown from worker task
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at ThreadPoolExceptionDemo.main(ThreadPoolExceptionDemo.java:16)
Caused by: java.lang.RuntimeException: Exception thrown from worker task
	at ThreadPoolExceptionDemo.lambda$main$0(ThreadPoolExceptionDemo.java:10)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Executors

Executors is a utility class for creating common thread pool configurations, providing static factory methods for different types of pools.

Factory Method Description
public static ExecutorService newFixedThreadPool(int nThreads) Creates a thread pool with a fixed number of threads. If a thread exits due to an exception, a new thread is added to replace it.
public static ExecutorService newSingleThreadExecutor() Creates a thread pool with exactly one thread. If the thread exits due to an exception, a new thread is added to replace it.
public static ExecutorService newCachedThreadPool() Creates a cached thread pool that grows as needed. Idle threads are kept alive for 60 seconds before being destroyed.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) Creates a thread pool that supports delayed execution and periodic task scheduling.

Example of using newScheduledThreadPool:

package io.example;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class ScheduledTask implements Runnable {
    @Override
    public void run() {
        System.out.printf("[%s] %s%n", Thread.currentThread().getName(),
                          LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }
}
package io.example;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        Runnable task = new ScheduledTask();

        // Run the task once after a 1 second delay
        pool.schedule(task, 1, TimeUnit.SECONDS);

        // Run the task every 3 seconds, starting after an initial 2 second delay
        pool.scheduleAtFixedRate(task, 2, 3, TimeUnit.SECONDS);

        Thread.sleep(10 * 1000);
        pool.shutdown();
    }
}

Sample output:

[pool-1-thread-1] 2023-08-30 16:26:33
[pool-1-thread-2] 2023-08-30 16:26:34
[pool-1-thread-2] 2023-08-30 16:26:37
[pool-1-thread-2] 2023-08-30 16:26:40

Thread States

Java defines 6 thread states in the java.lang.Thread.State enum, which can be retrieved with the getState() method.

Thread State Description
NEW The thread has been created with new, but start() has not been called yet
RUNNABLE The thread has called start() and is either ready to run (waiting for scheduling) or currently running
BLOCKED The thread is blocked waiting for a synchronized monitor lock
WAITING The thread is waiting indefinitely for another thread to perform a specific action
TIMED_WAITING The thread is waiting for a specific amount of time for another thread's action
TERMINATED The thread has completed execution and exited

Differences between sleep() and yield():

  • sleep() forcefully moves the current thread into the timed waiting state, and only transitions back to runnable after the sleep time expires. yield() is just a hint to the OS that the current thread can yield its CPU time to another thread, and the OS can ignore the hint.
  • sleep() requires handling InterruptedException, while yield() does not throw any checked exceptions.

Differences between BLOCKED and WAITING:

  • BLOCKED is a passive state triggered when a thread fails to acquire a synchronized lock. WAITING is an active state triggered explicitly by developer code.
  • BLOCKED threads are automatically woken up when they acquire the lock. WAITING threads must be explicitly woken up by another thread via methods like Object.notify() or LockSupport.unpark().

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.