Integrating Apache RocketMQ with Spring Boot for Message Publishing
Dependency Configuration
Add the RocketMQ Spring Boot starter to your project's build file.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
Application Properties
Configure the NameServer address and producer settings in application.yml.
Producer Configuration:
server:
port: 8090
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order_producer_group
send-message-timeout: 3000
Consumer Configuraton:
server:
port: 8091
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: order_consumer_group
message-model: CLUSTERING
spring:
application:
name: order-service-consumer
Sending Messages
The RocketMQTemplate provides methods for different message sending paterns.
Synchronous Send
The sender waits for a broker acknowledgment before proceeding.
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class OrderEventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void publishSynchronously(String topic, String orderId) {
Message<String> message = MessageBuilder.withPayload("Order created: " + orderId).build();
rocketMQTemplate.syncSend(topic, message);
}
}
Asynchronous Send
The sender does not wait for a broker response and uses a callback to handle the result.
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class OrderEventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void publishAsynchronously(String topic, String orderId) {
Message<String> message = MessageBuilder.withPayload("Order updated: " + orderId).build();
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Async send success: " + sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
System.err.println("Async send failed: " + throwable.getMessage());
}
});
}
}
One-Way Send
The sender transmits the message with out waiting for any response or confirmation.
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class OrderEventPublisher {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void publishOneWay(String topic, String orderId) {
Message<String> message = MessageBuilder.withPayload("Order deleted: " + orderId).build();
rocketMQTemplate.sendOneWay(topic, message);
}
}