Processing large datasets efficiently often requires parallel execution to maximize throughput. A reusable multi-threaded utility allows developers to focus on business logic while the framework handles thread management and data partitioning.
Response Wrapper Class
The
ApiResponse class provides a standardized structure for returning results across all operations:
package com.example.concurrent.model;
import java.io.Serializable;
public class ApiResponse<T> implements Serializable {
private static final long serialVersionUID = 1L;
public static final int STATUS_OK = 1;
public static final int STATUS_PENDING = 0;
public static final int STATUS_ERROR = -1;
private String message = "success";
private int status = STATUS_OK;
private T payload;
public ApiResponse() {}
public ApiResponse(T payload) {
this.payload = payload;
}
public ApiResponse(Throwable error) {
this.message = error.getMessage();
this.status = STATUS_ERROR;
}
public static <T> ApiResponse<T> ok() {
ApiResponse<T> response = new ApiResponse<>();
response.status = STATUS_OK;
return response;
}
public static <T> ApiResponse<T> ok(T data) {
ApiResponse<T> response = new ApiResponse<>();
response.status = STATUS_OK;
response.payload = data;
return response;
}
public static <T> ApiResponse<T> error(String errorMsg) {
ApiResponse<T> response = new ApiResponse<>();
response.status = STATUS_ERROR;
response.message = errorMsg;
return response;
}
public static <T> ApiResponse<T> of(int statusCode, String msg) {
ApiResponse<T> response = new ApiResponse<>();
response.status = statusCode;
response.message = msg;
return response;
}
public ApiResponse<T> withPayload(T data) {
this.payload = data;
return this;
}
public ApiResponse<T> withMessage(String msg) {
this.message = msg;
return this;
}
public T getPayload() { return payload; }
public String getMessage() { return message; }
public int getStatus() { return status; }
}
Task Processor Interface
The
DataProcessor interface defines the contract for implementing custom business logic on each data item:
package com.example.concurrent.executor;
import java.util.Map;
public interface DataProcessor<R, D> {
/**
* Process a single data item
* @param item the data item to process
* @param context additional parameters for processing
* @return processing result
*/
R process(D item, Map<String, Object> context);
}
Callable Task Implementation
The
ChunkTask class wraps a batch of data items and processes them within a single thread:
package com.example.concurrent.executor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.concurrent.model.ApiResponse;
public class ChunkTask<D> implements Callable<ApiResponse<List<ApiResponse<String>>>> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChunkTask.class);
private final String taskId;
private final List<D> chunkData;
private final Map<String, Object> context;
private final DataProcessor<ApiResponse<String>, D> processor;
public ChunkTask(String taskId, List<D> chunkData,
Map<String, Object> context,
DataProcessor<ApiResponse<String>, D> processor) {
this.taskId = taskId;
this.chunkData = chunkData;
this.context = context;
this.processor = processor;
}
@Override
public ApiResponse<List<ApiResponse<String>>> call() {
ApiResponse<List<ApiResponse<String>>> result = ApiResponse.ok();
if (chunkData == null || chunkData.isEmpty()) {
return result;
}
LOGGER.info("Task [{}]: processing {} items", taskId, chunkData.size());
List<ApiResponse<String>> outcomes = new ArrayList<>();
for (int idx = 0; idx < chunkData.size(); idx++) {
D item = chunkData.get(idx);
ApiResponse<String> outcome = processor.process(item, context);
outcomes.add(outcome);
LOGGER.debug("Task [{}]: completed item {}", taskId, idx + 1);
}
LOGGER.info("Task [{}]: finished processing {} items", taskId, chunkData.size());
result.withPayload(outcomes);
return result;
}
}
Parallel Batch Executor
The
ParallelBatchExecutor manages thread pool creation, data partitioning, and result aggregation:
package com.example.concurrent.executor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.concurrent.model.ApiResponse;
public class ParallelBatchExecutor<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelBatchExecutor.class);
private static final int DEFAULT_POOL_SIZE = 5;
private int poolSize;
private ParallelBatchExecutor(int poolSize) {
this.poolSize = poolSize > 0 ? poolSize : DEFAULT_POOL_SIZE;
}
public static <T> ParallelBatchExecutor<T> create(int threadCount) {
return new ParallelBatchExecutor<>(threadCount);
}
public static <T> ParallelBatchExecutor<T> create() {
return new ParallelBatchExecutor<>(DEFAULT_POOL_SIZE);
}
public ApiResponse<List<ApiResponse<String>>> execute(
List<T> dataset,
Map<String, Object> context,
DataProcessor<ApiResponse<String>, T> processor) {
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
CompletionService<ApiResponse> completionService =
new ExecutorCompletionService<>(executor);
long startTime = System.currentTimeMillis();
int totalItems = dataset.size();
int chunkSize = totalItems / poolSize;
for (int i = 0; i < poolSize; i++) {
int fromIndex = i * chunkSize;
int toIndex = (i == poolSize - 1) ? totalItems : (i + 1) * chunkSize;
List<T> chunk = dataset.subList(fromIndex, toIndex);
ChunkTask<T> task = new ChunkTask<>("worker-" + i, chunk, context, processor);
completionService.submit(task);
}
List<ApiResponse<String>> allResults = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
try {
Future<ApiResponse> future = completionService.take();
ApiResponse<List<ApiResponse<String>>> chunkResult = future.get();
if (chunkResult.getPayload() != null) {
allResults.addAll(chunkResult.getPayload());
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Error retrieving task result", e);
}
}
executor.shutdownNow();
long duration = System.currentTimeMillis() - startTime;
LOGGER.info("Batch processing completed in {} ms", duration);
return ApiResponse.ok(allResults);
}
}
Usage Example
The following example demonstrates processing a list of integers with a custom business logic:
package com.example.concurrent.demo;
import java.util.*;
import com.example.concurrent.executor.*;
import com.example.concurrent.model.ApiResponse;
public class NumberProcessorDemo implements DataProcessor<ApiResponse<String>, Integer> {
@Override
public ApiResponse<String> process(Integer number, Map<String, Object> context) {
int increment = (Integer) context.getOrDefault("increment", 0);
int result = number + increment;
return ApiResponse.ok(String.valueOf(result));
}
public static void main(String[] args) {
List<Integer> numbers = new ArrayList<>();
for (int i = 1; i <= 10000; i++) {
numbers.add(i);
}
Map<String, Object> params = new HashMap<>();
params.put("increment", 10);
ParallelBatchExecutor<Integer> executor = ParallelBatchExecutor.create(5);
DataProcessor<ApiResponse<String>, Integer> processor = new NumberProcessorDemo();
ApiResponse<List<ApiResponse<String>>> outcome = executor.execute(numbers, params, processor);
System.out.println("Processed " + outcome.getPayload().size() + " items");
}
}
The utility partitions the input list into chunks based on the configured thread count, distributes each chunk to a separate thread, and aggregates all results before returning. This approach significantly reduces processing time for large datasets while maintaining clean separation between infrastructure and business logic.