Concurrent Batch Processing of Kafka Messages with Spring Boot
Introduction
Kafka is a powerful message middleware that is frequently used in software development to decouple upstream and downstream processes or manage uneven traffic loads effectively. While Kafka excels in write performance, the rate at which data is consumed largely depends on the efficiency of the consumer application. When processing speed fails to keep up with incoming messages, it can result in queue congestion. Resolving this issue by clearing the entire topic is often impractical, as other services may rely on the same topic. A more common solution involves parallelizing message consumption via consumer groups. However, this method requires more than one partition for the topic; otherwise, adding multiple consumers won’t yield significant improvements.
This guide demonstrates how to implement concurrent and batch consumption in Spring Boot using the Spring Kafka framwork.
Step 1: Configuring Concurrent Message Consumption
To enable concurrent consumption, use the ConcurrentKafkaListenerContainerFactory and designate a concurrency level corresponding to the number of partitions in your topic. This ensures that multiple Kafka listener containers are instantiated, each capable of processing messages from distinct partitions.
Implementation Example
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4); // Assume the topic has 4 partitions
containerFactory.setBatchListener(true);
containerFactory.getContainerProperties().setPollTimeout(3000);
return containerFactory;
}
Alternatively, you can configure this using application properties:
spring.kafka.listener.concurrency=4
With this setup, the @KafkaListener annotation will automatically adapt for concurrent consumption.
Step 2: Enabling Batch Processing
Batch processing allows consumers to fetch multiple records in a single polling operation, reducing the overhead associated with frequent polling.
Key Configuration:
- Set
factory.setBatchListener(true)in theConcurrentKafkaListenerContainerFactorybean. - Configure the maximum number of records per batch with the
MAX_POLL_RECORDS_CONFIGproperty.
Consumer Configuration Example
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your.broker.address");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // Max number of records per poll
return configProps;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
Points to Note:
MAX_POLL_RECORDS_CONFIGdetermines the upper limit of records fetched in a single poll, but it doesn’t guarantee that the consumer will always receive this number of records.- The
max.poll.interval.mssetting defines the allowable interval between subsequent poll operations. If the consumer doesn’t triggerpoll()within this duration, it may be considered inactive, triggering a partition reassignment.
Step 3: Partition-Level Consumption
For topics with multiple partitions, dividing the workload across separate listeners can improve processing speed. If a topic has only one partition, such techniques are not effective, as there is no partition-level parallelism to exploit.
Partition-Based Listener Example
public class PartitionListeners {
private static final String TOPIC_NAME = "example-topic";
@KafkaListener(id = "partitionListener0", topicPartitions = { @TopicPartition(topic = TOPIC_NAME, partitions = { "0" }) })
public void handlePartition0(List<ConsumerRecord<?, ?>> records) {
processRecords(records, "Partition 0");
}
@KafkaListener(id = "partitionListener1", topicPartitions = { @TopicPartition(topic = TOPIC_NAME, partitions = { "1" }) })
public void handlePartition1(List<ConsumerRecord<?, ?>> records) {
processRecords(records, "Partition 1");
}
private void processRecords(List<ConsumerRecord<?, ?>> records, String partitionInfo) {
for (ConsumerRecord<?, ?> record : records) {
String message = record.value() != null ? record.value().toString() : "null";
String topic = record.topic();
System.out.printf("[%s] Received message: %s from topic: %s%n", partitionInfo, message, topic);
}
}
}
Explanation:
- Each
@KafkaListenerlistens to a specific partition of a topic. - By setting up individual listeners for each partition, you can achieve concurrent consumption and distribute processing workload.
Partition and Concurrency Relations
As noted in Kafka’s official documentation:
- If the number of
TopicPartitionsis evenly divisible by the configured concurrency level, each container gets an equal share of partitions. - If they don’t divide evenly, some containers may receive more partitions than others.
- When concurency exceeds the number of partitions, the concurrency level drops to match the partition count.
Conclusion
By implementing concurrent and batch consumption combined with partition-level listeners, you can significantly enhance Kafka message processing efficiency in Spring Boot applications. While these strategies work well for topics with multiple partitions, their impact is limited for single-partition topics where parallelization isn’t feasible. Proper configuration of concurrency, polling intervals, and record limits enables seamless handling of high-message volumes without causing bottlenecks.