Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Executing Asynchronous Tasks with CompletableFuture in Java

Tech May 15 1

Understanding the Future Interface

The Future interface (implemented by FutureTask) provides methods for managing asynchronous task execution, including retrieving execution results, canceling tasks, checking cancellation status, and determining completion status.

Future enables main threads to offload time-consuming operations to separate threads. While a subthread handles a computationally intensive task, the main thread remains free to perform other operations and later retrieves the results when ready. This resembles a teacher delegating a task to a student during class—while the student fetches water, the teacher continues teaching.

FutureTask: The Primary Future Implementation

Capabilities of Future

Future, introduced in Java 5, enables asynchronous parallel computation. When a main thread needs to execute an expensive calculation, you can submit it to an asynchronous thread via Future. The main thread continues processing other tasks or completes its work, then retrieves the calculation result through the Future interface.

Related Architecture

The design goal is asynchronous multi-threaded task execution with results: multi-threading, return values, and asynchronous processing (similar to a student buying water for the teacher—new thread, task with result).

Implementation involves Runnable + Callable + Future interfaces and FutureTask implementation class.

Runnable vs Callable

Thread can only accept Runnable, which cannot meet requirements for return values or asynchronous tasks (retrieving results, cancellation, completion status checks).

RunnableFuture combines Runnable and Future capabilities, enabling multi-threading and asynchronous tasks. FutureTask, a RunnableFuture implementation, accepts Callable via constructor, supporting multi-threading, return values, and asynchronous tasks.

Basic FutureTask Usage

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<>(new DataLoader());
        Thread worker = new Thread(task);
        worker.start();
        System.out.println(task.get());
    }
}

class DataLoader implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("Processing data...");
        return "Data loaded successfully";
    }
}

Practical Usage and Analysis

Advantages

Combining Future with thread pools for asynchronous multi-threaded tasks significantly improves program efficiency.

public class AsyncTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        long startTime = System.currentTimeMillis();
        
        FutureTask<String> task1 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 completed";
        });
        executor.submit(task1);
        
        FutureTask<String> task2 = new FutureTask<>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 completed";
        });
        executor.submit(task2);
        
        System.out.println(task1.get());
        System.out.println(task2.get());

        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("Total time: " + (endTime - startTime) + " ms");
        executor.shutdown();
    }
}

Disadvantages

Inelegant Result Retrieval

Blocking get(): Calling get() blocks until the result is available, regardless of computation completion, potentially causing application stalls.

public class BlockingDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> task = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + " processing...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Done";
        });
        Thread t1 = new Thread(task, "Worker");
        t1.start();
        System.out.println(task.get());
        System.out.println(Thread.currentThread().getName() + " continuing...");
    }
}

You can use futureTask.get(3, TimeUnit.SECONDS) for a 3-second timeout, but this generates excessive logs and is not recommended for production.

Polling with isDone(): Polling wastes CPU resources and doesn't provide timely results.

public class PollingDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> task = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + " processing...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Done";
        });
        Thread t1 = new Thread(task, "Worker");
        t1.start();
        
        System.out.println(Thread.currentThread().getName() + " working on other tasks...");
        
        while (true) {
            if (task.isDone()) {
                System.out.println(task.get());
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("Still processing, please wait...");
            }
        }
    }
}

Conclusion

Future's result retrieval is inefficient—only blocking or polling approaches are available. This limitation led to CompletableFuture's creation.

Addressing Complex Scenarios

Requirements:

For simple scenarios, Future works adequately. However, high-concurrency scenarios suffer from blocking and polling performance issues.Polling for task completion wastes CPU and produces inelegant code. Callbacks are needed—when a task completes, notify the listener.Creating asynchronous tasks requires Future + thread pool combinations.Handling dependent tasks in sequence—combining multiple async results where each depends on the previous.Merging independent async computations that all feed into a final result (like cooking: buy ingredients → prepare → cook).Selecting the fastest result when multiple tasks compete.

Conclusion:

Future's API is insufficient for these elegant, declarative requirements. CompletableFuture addresses all these needs—anything Future can do, CompletableFuture can do better.

CompletableFuture: Enhancing Future

Why CompletableFuture Exists

The get() method blocks until computation completes—blocking contradicts the efficiency goals of asynchronous programming.The isDone() method causes CPU spinning.True async handling requires callbacks—automatically invoked when Future completes, eliminating waiting.

JDK 8 introduced CompletableFuture with an observer-pattern-like mechanism where completed tasks notify listeners automatically.

CompletableFuture and CompletionStage

CompletionStage Interface:

Represents a specific stage in the async computation process—one stage's completion may trigger another (e.g., code execution → completion, notification → exception).Each stage's computation can be a Function, Consumer, or Runnable, e.g., stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> print())Stage execution may be triggered by single or multiple stage completions.

CompletableFuture Class:

Provides powerful Future extensions, simplifying async programming complexity with functional programming capabilities.Supports callbacks for processing results and methods for transforming and combining CompletableFutures.May represent either a completed Future or an intermediate stage (CompletionStage) that triggers functions upon completion.

Four Static Methods for Creating Async Tasks

Executor Parameter Note:

If unspecified, uses default ForkJoinPool.commonPool() for async code execution.If specified, uses the custom or designated thread pool.

runAsync

Accepts Runnable, no return value.

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

supplyAsync

Returns a result.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

Examples

public class AsyncFactoryDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // No return value
        CompletableFuture<Void> noResult = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executor);
        System.out.println(noResult.get());
        
        // With return value
        CompletableFuture<String> withResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Operation complete";
        }, executor);
        System.out.println(withResult.get());
        
        executor.shutdown();
    }
}

Task Completion Callbacks and Exception Handling

CompletableFuture reduces blocking and polling by accepting callback objects automatically invoked when async tasks complete or encounter errors.

public class CallbackDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " executing");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (result > 5) {
                int i = 10 / 0;
            }
            System.out.println("Result after 1 second: " + result);
            return result;
        }, executor).whenComplete((value, error) -> {
            if (error == null) {
                System.out.println("Computation finished, value: " + value);
            }
        }).exceptionally(error -> {
            error.printStackTrace();
            System.out.println("Exception occurred: " + error.getCause() + " " + error.getMessage());
            return null;
        });
        System.out.println(Thread.currentThread().getName() + " working on other tasks");
        executor.shutdown();
    }
}

After creating an async task, the main thread continues its work while the separate thread handles computation. Upon completion, the main thread receives notification.

CompletableFuture Advantages

Upon async task completion, automatically invokes a callback object's method.After main thread sets up callbacks, no need to monitor async task execution—async tasks can execute sequentially.Upon async task error, automatically invokes a callback object's method.

Case Study: E-commerce Price Comparison

Functional Programming Trends

Lambda expressions + Stream API + Chain calls + Java 8 functional programming

Functional Interfaces

Runnable: No parameters, no return valueFunction: Accepts parameter, returns resultConsumer: Accepts parameter, no return valueBiConsumer: Accepts two parameters, no return valueSupplier: No parameters, returns result

Chain Calls

public class ProductSearchDemo {
    public static void main(String[] args) {
        Product product = new Product();
        product.setId(1).setName("Book").setCategory("Technology");
    }
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
class Product {
    private Integer id;
    private String name;
    private String category;
}

Business Requirements

Remember: Functionality first → Performance optimization (completion → perfection)

Price comparison requirements:

For the same product, search prices across multiple e-commerce platforms simultaneously.For the same product on one platform, compare prices from different sellers.

Output: Price list for the same product across different locations, returned as a List.

Book A on JD: $88.05Book A on Dangdang: $86.11Book A on Taobao: $90.43

Solutions:

Sequential: Check Taobao, then JD, then Tmall sequentially.Parallel: Fire all requests simultaneously using multi-threaded async tasks.

Java 8 Functional Programming Price Comparison

public class PriceComparisonDemo {
    static List<Store> stores = Arrays.asList(
        new Store("JD"), 
        new Store("Dangdang"), 
        new Store("Taobao"), 
        new Store("Dangdang")
    );
    
    // Sequential approach
    public static List<String> getPricesSequential(List<Store> stores, String productName) {
        return stores.stream()
            .map(store -> String.format("Book '%s' at %s: $%.2f",
                    productName,
                    store.getStoreName(),
                    store.fetchPrice(productName)))
            .collect(Collectors.toList());
    }
    
    // Parallel approach
    public static List<String> getPricesParallel(List<Store> stores, String productName) {
        return stores.stream()
            .map(store -> CompletableFuture.supplyAsync(() ->
                String.format("Book '%s' at %s: $%.2f",
                        productName,
                        store.getStoreName(),
                        store.fetchPrice(productName))))
            .collect(Collectors.toList())
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }
    
    public static void main(String[] args) {
        // Sequential timing
        long start1 = System.currentTimeMillis();
        List<String> results1 = getPricesSequential(stores, "Database");
        results1.forEach(System.out::println);
        long end1 = System.currentTimeMillis();
        System.out.println("Sequential time: " + (end1 - start1) + " ms");
        
        // Parallel timing
        long start2 = System.currentTimeMillis();
        List<String> results2 = getPricesParallel(stores, "Database");
        results2.forEach(System.out::println);
        long end2 = System.currentTimeMillis();
        System.out.println("Parallel time: " + (end2 - start2) + " ms");
    }
}

@AllArgsConstructor
@NoArgsConstructor
@Data
class Store {
    private String storeName;
    
    public double fetchPrice(String productName) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

This async query approach significantly reduces time consumption and can be discussed in interviews or integrated into projects.

CompletableFuture API Reference

Result Retrieval and Computation Triggering

get, join, getNow

public T get()public T get(long timeout, TimeUnit unit)public T join(): Retrieves result without throwing checked exceptions.public T getNow(T valueIfAbsent): Returns actual value if completed, otherwise returns the provided value without blocking.

public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

join() vs get(): Both retrieve values, but join() doesn't require checked exception handling during compilation.

complete for Manual Triggering

public boolean complete(T value): Interrupts get() and immediately returns the provided parameter. Returns whether interruption succeeded.

private static void triggerDemo() throws InterruptedException, ExecutionException, TimeoutException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "original";
    });
    
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    System.out.println(future.complete("overridden") + "\t" + future.get());
}

When async task takes 2 seconds but get() is called after 1 second, the complete() method interrupts the blocking, setting the result to "overridden".

Result Processing (Sequential Dependencies)

thenApply

thenApply: When results have dependencies requiring sequential execution (like buying ingredients then cooking), if the current step fails, subsequent steps don't execute.

public class SequentialDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> pipeline = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        }, executor).thenApply(f -> {
            System.out.println("Step 2 executed");
            return f + 5;
        }).thenApply(f -> {
            System.out.println("Step 3 executed");
            return f + 8;
        }).whenComplete((value, error) -> {
            if (error == null) {
                System.out.println("Final result: " + value);
            }
        }).exceptionally(error -> {
            error.printStackTrace();
            return null;
        });
        System.out.println(Thread.currentThread().getName() + " doing other work");
        executor.shutdown();
    }
}

handle

handle: Similar to thenApply with sequential dependencies, but continues execution evenif exceptions occur.

public class HandleDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> pipeline = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        }, executor).thenApply(f -> {
            System.out.println("Step 2 executed");
            return f + 5;
        }).handle((result, error) -> {
            System.out.println("Step 3 executed");
            int i = 10 / 0;
            return result + 8;
        }).whenComplete((value, error) -> {
            if (error == null) {
                System.out.println("Final result: " + value);
            }
        }).exceptionally(error -> {
            error.printStackTrace();
            return null;
        });
        System.out.println(Thread.currentThread().getName() + " doing other work");
    }
}

Result Consumption (No Return)

thenAccept

public class ConsumerDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> 1, executor)
            .thenApply(f -> f + 2)
            .thenApply(f -> f + 3)
            .thenAccept(result -> System.out.println(result));
    }
}

thenRun, thenAccept, thenApply Comparison

thenRun(Runnable): Task B executes after Task A completes, but B doesn't need A's result.thenAccept(Consumer): Task B executes after Task A completes, B needs A's result but has no return value.thenApply(Functon): Task B executes after Task A completes, B needs A's result and returns a value.

public class ComparisonDemo {
    public static void main(String[] args) {
        System.out.println(CompletableFuture.supplyAsync(() -> "value").thenRun(() -> {}).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "value").thenAccept(r -> System.out.println(r)).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "value").thenApply(f -> f + " processed").join());
    }
}

Async vs Non-Async Variants

Without custom thread pool, uses default ForkJoinPool.When the first task uses a custom executor and thenRun() is called, the second task shares the same pool.When thenRunAsync() is called, the first task uses the custom pool and the second uses ForkJoinPool.If the first task completes too quickly, the system may optimize by using the main thread directly. This applies to all thenAccept/thenApply variants.

Speed Selection: applyToEither

Uses whichever completes first.

public class RaceDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Runner A started");
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Runner A";
        }, executor);
        
        CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Runner B started");
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Runner B";
        }, executor);
        
        CompletableFuture<String> winner = taskA.applyToEither(taskB, f -> {
            return f + " wins";
        });
        
        System.out.println(Thread.currentThread().getName() + " result: " + winner.join());
    }
}

Output:

Runner A started
Runner B started
main result: Runner B wins

Two-Task Combinations

Both Tasks Must Complete

runAfterBoth: Combines two futures, doesn't need their results—executes task 3 after both complete.thenAcceptBoth: Combines two futures, retrieves results, executes task 3, no return value.thenCombine: Combines two futures, retrieves results, returns a value.

thenCombine for Merging Results

public class CombineDemo {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " started");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " started");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 200;
        });
        
        CompletableFuture<Integer> combined = future1.thenCombine(future2, (x, y) -> {
            System.out.println("Merging results...");
            return x + y;
        });
        
        System.out.println(combined.join());
    }
}

Output: 300

Either Task Completes

Accepts previous result, current task returns value.Accepts previous result, current task has no return value.Doesn't accept previous result, current task has no return value.

Multiple Task Combinations

allOf: Wait for All Tasks

CompletableFuture<String> imageFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching product images");
    return "product.jpg";
});

CompletableFuture<String> attributeFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching product attributes");
    return "Black+256GB";
});

CompletableFuture<String> descriptionFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching product description");
    return "Premium device";
});

CompletableFuture<Void> allDone = CompletableFuture.allOf(imageFuture, attributeFuture, descriptionFuture);
allDone.get();
System.out.println("Main finished: " + imageFuture.get() + ":" + attributeFuture.get() + ":" + descriptionFuture.get());

For many similar tasks, use an array instead of creating each individually:

CompletableFuture[] taskArray = new CompletableFuture[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
    int index = i;
    taskArray[i] = CompletableFuture.runAsync(() -> processTask(tasks.get(index)), executor);
}
CompletableFuture.allOf(taskArray).join();

anyOf: Wait for First Completion

CompletableFuture<Object> firstComplete = CompletableFuture.anyOf(imageFuture, attributeFuture, descriptionFuture);
firstComplete.get();
System.out.println("Main finished: " + imageFuture.get() + " => " + attributeFuture.get() + " => " + descriptionFuture.get());
System.out.println("First to complete: " + firstComplete.get());

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.