Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building a Java Utility for Parallel Batch Data Processing

Tech May 15 1
Processing large datasets efficiently often requires parallel execution to maximize throughput. A reusable multi-threaded utility allows developers to focus on business logic while the framework handles thread management and data partitioning.

Response Wrapper Class

The ApiResponse class provides a standardized structure for returning results across all operations:
package com.example.concurrent.model;

import java.io.Serializable;

public class ApiResponse<T> implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    public static final int STATUS_OK = 1;
    public static final int STATUS_PENDING = 0;
    public static final int STATUS_ERROR = -1;
    
    private String message = "success";
    private int status = STATUS_OK;
    private T payload;
    
    public ApiResponse() {}
    
    public ApiResponse(T payload) {
        this.payload = payload;
    }
    
    public ApiResponse(Throwable error) {
        this.message = error.getMessage();
        this.status = STATUS_ERROR;
    }
    
    public static <T> ApiResponse<T> ok() {
        ApiResponse<T> response = new ApiResponse<>();
        response.status = STATUS_OK;
        return response;
    }
    
    public static <T> ApiResponse<T> ok(T data) {
        ApiResponse<T> response = new ApiResponse<>();
        response.status = STATUS_OK;
        response.payload = data;
        return response;
    }
    
    public static <T> ApiResponse<T> error(String errorMsg) {
        ApiResponse<T> response = new ApiResponse<>();
        response.status = STATUS_ERROR;
        response.message = errorMsg;
        return response;
    }
    
    public static <T> ApiResponse<T> of(int statusCode, String msg) {
        ApiResponse<T> response = new ApiResponse<>();
        response.status = statusCode;
        response.message = msg;
        return response;
    }
    
    public ApiResponse<T> withPayload(T data) {
        this.payload = data;
        return this;
    }
    
    public ApiResponse<T> withMessage(String msg) {
        this.message = msg;
        return this;
    }
    
    public T getPayload() { return payload; }
    public String getMessage() { return message; }
    public int getStatus() { return status; }
}

Task Processor Interface

The DataProcessor interface defines the contract for implementing custom business logic on each data item:
package com.example.concurrent.executor;

import java.util.Map;

public interface DataProcessor<R, D> {
    
    /**
     * Process a single data item
     * @param item the data item to process
     * @param context additional parameters for processing
     * @return processing result
     */
    R process(D item, Map<String, Object> context);
}

Callable Task Implementation

The ChunkTask class wraps a batch of data items and processes them within a single thread:
package com.example.concurrent.executor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.concurrent.model.ApiResponse;

public class ChunkTask<D> implements Callable<ApiResponse<List<ApiResponse<String>>>> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ChunkTask.class);
    
    private final String taskId;
    private final List<D> chunkData;
    private final Map<String, Object> context;
    private final DataProcessor<ApiResponse<String>, D> processor;
    
    public ChunkTask(String taskId, List<D> chunkData, 
                     Map<String, Object> context,
                     DataProcessor<ApiResponse<String>, D> processor) {
        this.taskId = taskId;
        this.chunkData = chunkData;
        this.context = context;
        this.processor = processor;
    }
    
    @Override
    public ApiResponse<List<ApiResponse<String>>> call() {
        ApiResponse<List<ApiResponse<String>>> result = ApiResponse.ok();
        
        if (chunkData == null || chunkData.isEmpty()) {
            return result;
        }
        
        LOGGER.info("Task [{}]: processing {} items", taskId, chunkData.size());
        
        List<ApiResponse<String>> outcomes = new ArrayList<>();
        
        for (int idx = 0; idx < chunkData.size(); idx++) {
            D item = chunkData.get(idx);
            ApiResponse<String> outcome = processor.process(item, context);
            outcomes.add(outcome);
            LOGGER.debug("Task [{}]: completed item {}", taskId, idx + 1);
        }
        
        LOGGER.info("Task [{}]: finished processing {} items", taskId, chunkData.size());
        result.withPayload(outcomes);
        
        return result;
    }
}

Parallel Batch Executor

The ParallelBatchExecutor manages thread pool creation, data partitioning, and result aggregation:
package com.example.concurrent.executor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.concurrent.model.ApiResponse;

public class ParallelBatchExecutor<T> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelBatchExecutor.class);
    private static final int DEFAULT_POOL_SIZE = 5;
    
    private int poolSize;
    
    private ParallelBatchExecutor(int poolSize) {
        this.poolSize = poolSize > 0 ? poolSize : DEFAULT_POOL_SIZE;
    }
    
    public static <T> ParallelBatchExecutor<T> create(int threadCount) {
        return new ParallelBatchExecutor<>(threadCount);
    }
    
    public static <T> ParallelBatchExecutor<T> create() {
        return new ParallelBatchExecutor<>(DEFAULT_POOL_SIZE);
    }
    
    public ApiResponse<List<ApiResponse<String>>> execute(
            List<T> dataset, 
            Map<String, Object> context,
            DataProcessor<ApiResponse<String>, T> processor) {
        
        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
        CompletionService<ApiResponse> completionService = 
            new ExecutorCompletionService<>(executor);
        
        long startTime = System.currentTimeMillis();
        int totalItems = dataset.size();
        int chunkSize = totalItems / poolSize;
        
        for (int i = 0; i < poolSize; i++) {
            int fromIndex = i * chunkSize;
            int toIndex = (i == poolSize - 1) ? totalItems : (i + 1) * chunkSize;
            
            List<T> chunk = dataset.subList(fromIndex, toIndex);
            ChunkTask<T> task = new ChunkTask<>("worker-" + i, chunk, context, processor);
            completionService.submit(task);
        }
        
        List<ApiResponse<String>> allResults = new ArrayList<>();
        
        for (int i = 0; i < poolSize; i++) {
            try {
                Future<ApiResponse> future = completionService.take();
                ApiResponse<List<ApiResponse<String>>> chunkResult = future.get();
                if (chunkResult.getPayload() != null) {
                    allResults.addAll(chunkResult.getPayload());
                }
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("Error retrieving task result", e);
            }
        }
        
        executor.shutdownNow();
        
        long duration = System.currentTimeMillis() - startTime;
        LOGGER.info("Batch processing completed in {} ms", duration);
        
        return ApiResponse.ok(allResults);
    }
}

Usage Example

The following example demonstrates processing a list of integers with a custom business logic:
package com.example.concurrent.demo;

import java.util.*;
import com.example.concurrent.executor.*;
import com.example.concurrent.model.ApiResponse;

public class NumberProcessorDemo implements DataProcessor<ApiResponse<String>, Integer> {
    
    @Override
    public ApiResponse<String> process(Integer number, Map<String, Object> context) {
        int increment = (Integer) context.getOrDefault("increment", 0);
        int result = number + increment;
        return ApiResponse.ok(String.valueOf(result));
    }
    
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        for (int i = 1; i <= 10000; i++) {
            numbers.add(i);
        }
        
        Map<String, Object> params = new HashMap<>();
        params.put("increment", 10);
        
        ParallelBatchExecutor<Integer> executor = ParallelBatchExecutor.create(5);
        DataProcessor<ApiResponse<String>, Integer> processor = new NumberProcessorDemo();
        
        ApiResponse<List<ApiResponse<String>>> outcome = executor.execute(numbers, params, processor);
        
        System.out.println("Processed " + outcome.getPayload().size() + " items");
    }
}
The utility partitions the input list into chunks based on the configured thread count, distributes each chunk to a separate thread, and aggregates all results before returning. This approach significantly reduces processing time for large datasets while maintaining clean separation between infrastructure and business logic.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

SBUS Signal Analysis and Communication Implementation Using STM32 with Fus Remote Controller

Overview In a recent project, I utilized the SBUS protocol with the Fus remote controller to control a vehicle's basic operations, including movement, lights, and mode switching. This article is aimed...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.