Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Configuring Kafka Consumer Retry with Maximum Attempts and Backoff Interval

Tech May 7 3

Scenario

When we have a functionality that must operate independently of the database—meaning that if the database goes down, data remains unaffected, and once the database is restored, data is written correctly—the client should experience no disruption. With this clear objective, let's implmeent it.

1. Add Kafka Dependency in pom.xml

Note: Different versions may require adjustments to the retry mechanism.

<!-- Kafka dependency -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.13</version>
</dependency>

2. Configure Kafka in application.yml

spring:
  kafka:
    bootstrap-servers: kafka-address:port
    producer:
      bootstrap-servers: kafka-address:port
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3
      acks: 0
    consumer:
      group-id: kafka-pm-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. Configure Consumer Retry Mechanism

First, define the retry parameters in application.yml:

mq:
  kafkaConfig:
    # Maximum retry attempts
    max-attempts: 3
    # Retry interval in milliseconds
    interval: 120000

Then, create a Kafka configuration class:

package com.example.config;

import cn.hutool.core.util.IdUtil;
import com.example.common.util.FileUtils;
import com.example.consumer.kafka.KafkaConsumer;
import com.example.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

@Slf4j
@Configuration
public class KafkaConfig {

    @Value("${mq.config.kafkaConfig.topic}")
    private String topic;

    @Value("${scheduled.sharding.agentStatePath}")
    private String agentStatePath;

    @Value("${mq.config.kafkaConfig.interval}")
    private Long interval;

    @Value("${mq.config.kafkaConfig.max-attempts}")
    private Long maxAttempts;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setCommonErrorHandler(commonErrorHandler());
        return factory;
    }

    public CommonErrorHandler commonErrorHandler() {
        BackOff backOff = new FixedBackOff(interval, maxAttempts);
        return new DefaultErrorHandler((consumerRecord, exception) -> {
            // After all retries exhausted, persist the message to a local file
            if (consumerRecord.topic().equals(topic)) {
                String filePath = agentStatePath + topic + IdUtil.getSnowflakeNextId() + ".txt";
                FileUtils.uploadFile(filePath, consumerRecord.value().toString());
            }
        }, backOff);
    }

    @Bean
    public KafkaConsumer kafkaConsumer(KafkaMessageService kafkaMessageService) {
        log.info("Initializing Kafka consumer");
        return new KafkaConsumer(kafkaMessageService);
    }
}

With this configuration, the retry mechanism is ready. If a consumer fails to process a message, it will retry according to the configured backoff. Once all retries are exhausted, the message is persisted to a local file.

Then, a scheduled task can periodically read these local files and resend the messages to Kafka for consumption until the database is restored and data is written successfully, acheiving a closed-loop solution.

Tags: Kafka

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.