Configuring Kafka Consumer Retry with Maximum Attempts and Backoff Interval
Scenario
When we have a functionality that must operate independently of the database—meaning that if the database goes down, data remains unaffected, and once the database is restored, data is written correctly—the client should experience no disruption. With this clear objective, let's implmeent it.
1. Add Kafka Dependency in pom.xml
Note: Different versions may require adjustments to the retry mechanism.
<!-- Kafka dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.13</version>
</dependency>
2. Configure Kafka in application.yml
spring:
kafka:
bootstrap-servers: kafka-address:port
producer:
bootstrap-servers: kafka-address:port
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
acks: 0
consumer:
group-id: kafka-pm-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. Configure Consumer Retry Mechanism
First, define the retry parameters in application.yml:
mq:
kafkaConfig:
# Maximum retry attempts
max-attempts: 3
# Retry interval in milliseconds
interval: 120000
Then, create a Kafka configuration class:
package com.example.config;
import cn.hutool.core.util.IdUtil;
import com.example.common.util.FileUtils;
import com.example.consumer.kafka.KafkaConsumer;
import com.example.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@Slf4j
@Configuration
public class KafkaConfig {
@Value("${mq.config.kafkaConfig.topic}")
private String topic;
@Value("${scheduled.sharding.agentStatePath}")
private String agentStatePath;
@Value("${mq.config.kafkaConfig.interval}")
private Long interval;
@Value("${mq.config.kafkaConfig.max-attempts}")
private Long maxAttempts;
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setCommonErrorHandler(commonErrorHandler());
return factory;
}
public CommonErrorHandler commonErrorHandler() {
BackOff backOff = new FixedBackOff(interval, maxAttempts);
return new DefaultErrorHandler((consumerRecord, exception) -> {
// After all retries exhausted, persist the message to a local file
if (consumerRecord.topic().equals(topic)) {
String filePath = agentStatePath + topic + IdUtil.getSnowflakeNextId() + ".txt";
FileUtils.uploadFile(filePath, consumerRecord.value().toString());
}
}, backOff);
}
@Bean
public KafkaConsumer kafkaConsumer(KafkaMessageService kafkaMessageService) {
log.info("Initializing Kafka consumer");
return new KafkaConsumer(kafkaMessageService);
}
}
With this configuration, the retry mechanism is ready. If a consumer fails to process a message, it will retry according to the configured backoff. Once all retries are exhausted, the message is persisted to a local file.
Then, a scheduled task can periodically read these local files and resend the messages to Kafka for consumption until the database is restored and data is written successfully, acheiving a closed-loop solution.