Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Java Development: RocketMQ Consumer Message Idempotent Deduplication

Tech May 15 1

A production-ready generic utility class for RocketMQ consumer message idempotent deduplication, with no additional configuration overhead beyond basic setup.

Core Capabilities

  1. Supports using Redis or MySQL as the idempotent record storage layer
  2. Flexible deduplication key options: use business primary keys or default message unique ID (default)
  3. Concurrent duplicate request blocking: prevents simultaneous processing of identical messages before the initial processing completes or fails
  4. Near-exactly-once message delivery semantics; in extreme failure scenarios, falls back to at-least-once delivery to avoid message loss.

Deduplication Principle

Refer to the accompanying flow diagram for a full breakdown of the deduplication workflow.

Quick Start

1. Implement the Idempotent Consumer Listener

Extend the base IdempotentConcurrentMessageListener class, and implement the required callbacks for deduplication key generation and business message processing:

public class DemoIdempotentConsumer extends IdempotentConcurrentMessageListener {

    public DemoIdempotentConsumer(IdempotentConsumeConfig idempotentConfig) {
        super(idempotentConfig);
    }

    /**
     * Define the deduplication key for each message
     */
    @Override
    protected String getDeduplicationKey(MessageExt rocketMessage) {
        // Use message body as deduplication key for demo-topic for demonstration; avoid this in production
        if ("demo-topic".equals(rocketMessage.getTopic())) {
            return new String(rocketMessage.getBody());
        }
        // Fall back to default message ID for other topics
        return super.getDeduplicationKey(rocketMessage);
    }

    /**
     * Business logic for message consumption
     */
    @Override
    protected boolean processMessage(MessageExt rocketMessage) {
        switch (rocketMessage.getTopic()) {
            case "demo-topic":
                log.info("Simulating long-running consumption task...{} {}", new String(rocketMessage.getBody()), rocketMessage);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                break;
        }
        return true;
    }
}

2. Initialize and Start the RocketMQ Consumer

DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group");
mqPushConsumer.subscribe("demo-topic", "*");

// Idempotent consumption configuration block
String consumerGroup = mqPushConsumer.getConsumerGroup();
// Retrieve Spring's StringRedisTemplate instance (omitted for brevity)
StringRedisTemplate redisStringTemplate = getRedisTemplate();
IdempotentConsumeConfig idempotentConfig = IdempotentConsumeConfig.enableIdempotentConsume(consumerGroup, redisStringTemplate);

DemoIdempotentConsumer messageListener = new DemoIdempotentConsumer(idempotentConfig);
mqPushConsumer.registerMessageListener(messageListener);

mqPushConsumer.start();

Notes:

  1. You must provide valid RocketMQ consumer configuration and retrieve the Redis/JDBC template instance yourself
  2. This utility supports both Redis and MySQL for idempotent storage; see the full demo code for additional usage details.

Test Scenarios

Scenario 1: Ordinary Duplicate Messages

  1. Send a test messsage with payload test-demo-msg-001 to demo-topic
  2. Send an identical duplicate message after 3+ seconds (longer than the simulated consumption time)

Sample Log Output:

[INFO] 2020-05-15 11:06:17,697 []  >>> Simulating long-running consumption task...test-demo-msg-001 MessageExt [queueId=1, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1589511454575, bornHost=/10.13.32.179:52637, storeTimestamp=1589511454576, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA31B0, commitLogOffset=1055535536, bodyCRC=1038040938, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=demo-topic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1589511977328, UNIQ_KEY=0A0D20B3632A5B2133B14A730F6F014A, WAIT=true}, body=16]]
[INFO] 2020-05-15 11:06:20,748 []  >>> Successfully processed [1] message(s)
[WARN] 2020-05-15 11:06:26,504 []  >>> Duplicate message detected, skipping processing: dedupKey = DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-001","tag":"","topic":"demo-topic"}, msgId : 0A0D20B3632A5B2133B14A7332DB014B , will acknowledge automatically. RedisBackedDeduplicator
[INFO] 2020-05-15 11:06:26,504 []  >>> Successfully processed [1] message(s)

Explanation: The duplicate message is skipped entirely without entering the business logic layer.

Scenario 2: Concurrent Duplicate Messages

  1. Send a test message with payload test-demo-msg-002 to demo-topic
  2. Send an identical duplicate message immediately (before the 3-second simulated consumption completes)

Annotated Sample Log Output:

// First message arrives at 11:07:33, starts long-running consumption
[INFO] 2020-05-15 11:07:33,756 []  >>> Simulating long-running consumption task...test-demo-msg-002 MessageExt [queueId=1, storeSize=168, queueOffset=2, sysFlag=0, bornTimestamp=1589511530879, bornHost=/10.13.32.179:52651, storeTimestamp=1589511530881, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA3302, commitLogOffset=1055535874, bodyCRC=146853239, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=demo-topic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1589512053623, UNIQ_KEY=0A0D20B3632A5B2133B14A74397F014C, WAIT=true}, body=15]]

// Duplicate message arrives at 11:07:35, detects active consumption
[WARN] 2020-05-15 11:07:35,884 []  >>> Identical message is already being processed, will retry later: dedupKey : DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-002","tag":"","topic":"demo-topic"}, msgId: 0A0D20B3632A5B2133B14A7441FB014D, RedisBackedDeduplicator
// Marked as consumption failure to trigger RocketMQ delayed retry
[WARN] 2020-05-15 11:07:35,884 []  >>> Processing failed for [1] message(s), ackIndex = [-1]

// First message completes successfully at 11:07:36
[INFO] 2020-05-15 11:07:36,801 []  >>> Successfully processed [1] message(s)

// Delayed retry of duplicate message runs at 11:07:46, detects already completed consumption
[WARN] 2020-05-15 11:07:46,024 []  >>> Duplicate message detected, skipping processing: dedupKey = DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-002","tag":"","topic":"demo-topic"}, msgId : 0A0D20B3632A5B2133B14A7441FB014D , will acknowledge automatically. RedisBackedDeduplicator
[INFO] 2020-05-15 11:07:46,024 []  >>> Successfully processed [1] message(s)

Explanation: The concurrent duplicate message is blocked during the initial processing, then retried later where it is recognized as already processed.

MySQL-backed Deduplication

To use MySQL instead of Redis for idempotent record storage, replace the StringRedisTemplate with a JdbcTemplate insatnce:

// Retrieve Spring JdbcTemplate instance (omitted for brevity)
JdbcTemplate jdbcTemplate = getJdbcTemplate();
IdempotentConsumeConfig idempotentConfig = IdempotentConsumeConfig.enableIdempotentConsume(consumerGroup, jdbcTemplate);

You must pre-create the idempotent deduplication table with the following schema:

DROP TABLE IF EXISTS `t_rocketmq_dedup`;
CREATE TABLE `t_rocketmq_dedup` (
`application_name` varchar(255) NOT NULL COMMENT 'Consumer group name for isolation',
`topic` varchar(255) NOT NULL COMMENT 'Source message topic',
`tag` varchar(16) NOT NULL COMMENT 'Message tag (empty string if no tag)',
`msg_uniq_key` varchar(255) NOT NULL COMMENT 'Deduplication key (business primary key or message ID)',
`status` varchar(16) NOT NULL COMMENT 'Message consumption status',
`expire_time` bigint(20) NOT NULL COMMENT 'Expiration timestamp for the deduplication record',
UNIQUE KEY `uniq_key` (`application_name`,`topic`,`tag`,`msg_uniq_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

Explanation: The unique composite index ensures that duplicate messages across consumer groups, topics, and tags are properly isolated.

Failed Consumption Retry Behavior

This utility uses RocketMQ's native delayed retry mechanism for failed consumption, with a default maximum of 16 retry attempts. To adjust the retry delay times or maximum retry count, modify the consumer's configuration parameters directly.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.