Java Development: RocketMQ Consumer Message Idempotent Deduplication
A production-ready generic utility class for RocketMQ consumer message idempotent deduplication, with no additional configuration overhead beyond basic setup.
Core Capabilities
- Supports using Redis or MySQL as the idempotent record storage layer
- Flexible deduplication key options: use business primary keys or default message unique ID (default)
- Concurrent duplicate request blocking: prevents simultaneous processing of identical messages before the initial processing completes or fails
- Near-exactly-once message delivery semantics; in extreme failure scenarios, falls back to at-least-once delivery to avoid message loss.
Deduplication Principle
Refer to the accompanying flow diagram for a full breakdown of the deduplication workflow.
Quick Start
1. Implement the Idempotent Consumer Listener
Extend the base IdempotentConcurrentMessageListener class, and implement the required callbacks for deduplication key generation and business message processing:
public class DemoIdempotentConsumer extends IdempotentConcurrentMessageListener {
public DemoIdempotentConsumer(IdempotentConsumeConfig idempotentConfig) {
super(idempotentConfig);
}
/**
* Define the deduplication key for each message
*/
@Override
protected String getDeduplicationKey(MessageExt rocketMessage) {
// Use message body as deduplication key for demo-topic for demonstration; avoid this in production
if ("demo-topic".equals(rocketMessage.getTopic())) {
return new String(rocketMessage.getBody());
}
// Fall back to default message ID for other topics
return super.getDeduplicationKey(rocketMessage);
}
/**
* Business logic for message consumption
*/
@Override
protected boolean processMessage(MessageExt rocketMessage) {
switch (rocketMessage.getTopic()) {
case "demo-topic":
log.info("Simulating long-running consumption task...{} {}", new String(rocketMessage.getBody()), rocketMessage);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
break;
}
return true;
}
}
2. Initialize and Start the RocketMQ Consumer
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group");
mqPushConsumer.subscribe("demo-topic", "*");
// Idempotent consumption configuration block
String consumerGroup = mqPushConsumer.getConsumerGroup();
// Retrieve Spring's StringRedisTemplate instance (omitted for brevity)
StringRedisTemplate redisStringTemplate = getRedisTemplate();
IdempotentConsumeConfig idempotentConfig = IdempotentConsumeConfig.enableIdempotentConsume(consumerGroup, redisStringTemplate);
DemoIdempotentConsumer messageListener = new DemoIdempotentConsumer(idempotentConfig);
mqPushConsumer.registerMessageListener(messageListener);
mqPushConsumer.start();
Notes:
- You must provide valid RocketMQ consumer configuration and retrieve the Redis/JDBC template instance yourself
- This utility supports both Redis and MySQL for idempotent storage; see the full demo code for additional usage details.
Test Scenarios
Scenario 1: Ordinary Duplicate Messages
- Send a test messsage with payload
test-demo-msg-001todemo-topic - Send an identical duplicate message after 3+ seconds (longer than the simulated consumption time)
Sample Log Output:
[INFO] 2020-05-15 11:06:17,697 [] >>> Simulating long-running consumption task...test-demo-msg-001 MessageExt [queueId=1, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1589511454575, bornHost=/10.13.32.179:52637, storeTimestamp=1589511454576, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA31B0, commitLogOffset=1055535536, bodyCRC=1038040938, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=demo-topic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1589511977328, UNIQ_KEY=0A0D20B3632A5B2133B14A730F6F014A, WAIT=true}, body=16]]
[INFO] 2020-05-15 11:06:20,748 [] >>> Successfully processed [1] message(s)
[WARN] 2020-05-15 11:06:26,504 [] >>> Duplicate message detected, skipping processing: dedupKey = DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-001","tag":"","topic":"demo-topic"}, msgId : 0A0D20B3632A5B2133B14A7332DB014B , will acknowledge automatically. RedisBackedDeduplicator
[INFO] 2020-05-15 11:06:26,504 [] >>> Successfully processed [1] message(s)
Explanation: The duplicate message is skipped entirely without entering the business logic layer.
Scenario 2: Concurrent Duplicate Messages
- Send a test message with payload
test-demo-msg-002todemo-topic - Send an identical duplicate message immediately (before the 3-second simulated consumption completes)
Annotated Sample Log Output:
// First message arrives at 11:07:33, starts long-running consumption
[INFO] 2020-05-15 11:07:33,756 [] >>> Simulating long-running consumption task...test-demo-msg-002 MessageExt [queueId=1, storeSize=168, queueOffset=2, sysFlag=0, bornTimestamp=1589511530879, bornHost=/10.13.32.179:52651, storeTimestamp=1589511530881, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA3302, commitLogOffset=1055535874, bodyCRC=146853239, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=demo-topic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1589512053623, UNIQ_KEY=0A0D20B3632A5B2133B14A74397F014C, WAIT=true}, body=15]]
// Duplicate message arrives at 11:07:35, detects active consumption
[WARN] 2020-05-15 11:07:35,884 [] >>> Identical message is already being processed, will retry later: dedupKey : DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-002","tag":"","topic":"demo-topic"}, msgId: 0A0D20B3632A5B2133B14A7441FB014D, RedisBackedDeduplicator
// Marked as consumption failure to trigger RocketMQ delayed retry
[WARN] 2020-05-15 11:07:35,884 [] >>> Processing failed for [1] message(s), ackIndex = [-1]
// First message completes successfully at 11:07:36
[INFO] 2020-05-15 11:07:36,801 [] >>> Successfully processed [1] message(s)
// Delayed retry of duplicate message runs at 11:07:46, detects already completed consumption
[WARN] 2020-05-15 11:07:46,024 [] >>> Duplicate message detected, skipping processing: dedupKey = DedupElement={"application":"demo-consumer-group","msgUniqKey":"test-demo-msg-002","tag":"","topic":"demo-topic"}, msgId : 0A0D20B3632A5B2133B14A7441FB014D , will acknowledge automatically. RedisBackedDeduplicator
[INFO] 2020-05-15 11:07:46,024 [] >>> Successfully processed [1] message(s)
Explanation: The concurrent duplicate message is blocked during the initial processing, then retried later where it is recognized as already processed.
MySQL-backed Deduplication
To use MySQL instead of Redis for idempotent record storage, replace the StringRedisTemplate with a JdbcTemplate insatnce:
// Retrieve Spring JdbcTemplate instance (omitted for brevity)
JdbcTemplate jdbcTemplate = getJdbcTemplate();
IdempotentConsumeConfig idempotentConfig = IdempotentConsumeConfig.enableIdempotentConsume(consumerGroup, jdbcTemplate);
You must pre-create the idempotent deduplication table with the following schema:
DROP TABLE IF EXISTS `t_rocketmq_dedup`;
CREATE TABLE `t_rocketmq_dedup` (
`application_name` varchar(255) NOT NULL COMMENT 'Consumer group name for isolation',
`topic` varchar(255) NOT NULL COMMENT 'Source message topic',
`tag` varchar(16) NOT NULL COMMENT 'Message tag (empty string if no tag)',
`msg_uniq_key` varchar(255) NOT NULL COMMENT 'Deduplication key (business primary key or message ID)',
`status` varchar(16) NOT NULL COMMENT 'Message consumption status',
`expire_time` bigint(20) NOT NULL COMMENT 'Expiration timestamp for the deduplication record',
UNIQUE KEY `uniq_key` (`application_name`,`topic`,`tag`,`msg_uniq_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
Explanation: The unique composite index ensures that duplicate messages across consumer groups, topics, and tags are properly isolated.
Failed Consumption Retry Behavior
This utility uses RocketMQ's native delayed retry mechanism for failed consumption, with a default maximum of 16 retry attempts. To adjust the retry delay times or maximum retry count, modify the consumer's configuration parameters directly.