Fading Coder

An Old Coder’s Final Dance

You are here: Home > Tech > Content

Concurrent Batch Processing of Kafka Messages with Spring Boot

Tech 3

Introduction

Kafka is a powerful message middleware that is frequently used in software development to decouple upstream and downstream processes or manage uneven traffic loads effectively. While Kafka excels in write performance, the rate at which data is consumed largely depends on the efficiency of the consumer application. When processing speed fails to keep up with incoming messages, it can result in queue congestion. Resolving this issue by clearing the entire topic is often impractical, as other services may rely on the same topic. A more common solution involves parallelizing message consumption via consumer groups. However, this method requires more than one partition for the topic; otherwise, adding multiple consumers won’t yield significant improvements.

This guide demonstrates how to implement concurrent and batch consumption in Spring Boot using the Spring Kafka framwork.


Step 1: Configuring Concurrent Message Consumption

To enable concurrent consumption, use the ConcurrentKafkaListenerContainerFactory and designate a concurrency level corresponding to the number of partitions in your topic. This ensures that multiple Kafka listener containers are instantiated, each capable of processing messages from distinct partitions.

Implementation Example

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setConcurrency(4);  // Assume the topic has 4 partitions
    containerFactory.setBatchListener(true);
    containerFactory.getContainerProperties().setPollTimeout(3000);
    return containerFactory;
}

Alternatively, you can configure this using application properties:

spring.kafka.listener.concurrency=4

With this setup, the @KafkaListener annotation will automatically adapt for concurrent consumption.


Step 2: Enabling Batch Processing

Batch processing allows consumers to fetch multiple records in a single polling operation, reducing the overhead associated with frequent polling.

Key Configuration:

  1. Set factory.setBatchListener(true) in the ConcurrentKafkaListenerContainerFactory bean.
  2. Configure the maximum number of records per batch with the MAX_POLL_RECORDS_CONFIG property.

Consumer Configuration Example

@Bean
public Map<String, Object> consumerConfig() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your.broker.address");
    configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
    configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // Max number of records per poll
    return configProps;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

Points to Note:

  • MAX_POLL_RECORDS_CONFIG determines the upper limit of records fetched in a single poll, but it doesn’t guarantee that the consumer will always receive this number of records.
  • The max.poll.interval.ms setting defines the allowable interval between subsequent poll operations. If the consumer doesn’t trigger poll() within this duration, it may be considered inactive, triggering a partition reassignment.

Step 3: Partition-Level Consumption

For topics with multiple partitions, dividing the workload across separate listeners can improve processing speed. If a topic has only one partition, such techniques are not effective, as there is no partition-level parallelism to exploit.

Partition-Based Listener Example

public class PartitionListeners {

    private static final String TOPIC_NAME = "example-topic";

    @KafkaListener(id = "partitionListener0", topicPartitions = { @TopicPartition(topic = TOPIC_NAME, partitions = { "0" }) })
    public void handlePartition0(List<ConsumerRecord<?, ?>> records) {
        processRecords(records, "Partition 0");
    }

    @KafkaListener(id = "partitionListener1", topicPartitions = { @TopicPartition(topic = TOPIC_NAME, partitions = { "1" }) })
    public void handlePartition1(List<ConsumerRecord<?, ?>> records) {
        processRecords(records, "Partition 1");
    }

    private void processRecords(List<ConsumerRecord<?, ?>> records, String partitionInfo) {
        for (ConsumerRecord<?, ?> record : records) {
            String message = record.value() != null ? record.value().toString() : "null";
            String topic = record.topic();
            System.out.printf("[%s] Received message: %s from topic: %s%n", partitionInfo, message, topic);
        }
    }
}

Explanation:

  • Each @KafkaListener listens to a specific partition of a topic.
  • By setting up individual listeners for each partition, you can achieve concurrent consumption and distribute processing workload.

Partition and Concurrency Relations

As noted in Kafka’s official documentation:

  • If the number of TopicPartitions is evenly divisible by the configured concurrency level, each container gets an equal share of partitions.
  • If they don’t divide evenly, some containers may receive more partitions than others.
  • When concurency exceeds the number of partitions, the concurrency level drops to match the partition count.

Conclusion

By implementing concurrent and batch consumption combined with partition-level listeners, you can significantly enhance Kafka message processing efficiency in Spring Boot applications. While these strategies work well for topics with multiple partitions, their impact is limited for single-partition topics where parallelization isn’t feasible. Proper configuration of concurrency, polling intervals, and record limits enables seamless handling of high-message volumes without causing bottlenecks.

Tags: Kafka

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.