Implementing RabbitMQ Messaging Patterns in Spring Boot
RabbitMQ Architecture and Exchange Types
RabbitMQ acts as a message broker that facilitates asynchronous communication between distinct serviecs in a distributed system. The architecture follows the Producer-Consumer model, where a Producer generates data and sends it to a Queue. A Consumer subscribes to the queue to process the data.
A critical component in RabbitMQ is the Exchange. Instead of sending messages directly to a queue, producers publish messages to an exchange. The exchange is responsible for routing these messages to specific queues based on predefined rules. RabbitMQ supports several exchange types, including Direct, Topic, Headers, and Fanout.
- Direct Exchange: Routes messages to queues where the binding key exactly matches the routing key.
- Topic Exchange: Performs wildcard matching between the routing key and the binding pattern (e.g.,
stock.*). - Headers Exchange: Routes messages based on header key-value pairs rather than routing keys.
- Fanout Exchange: Broadcasts messages to all bound queues, ignoring routing keys entirely.
Project Configuration
To integrate RabbitMQ with Spring Boot, add the spring-boot-starter-amqp dependency to your build file.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configure the connection details in application.properties:
spring.application.name=rabbitmq-spring-demo
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
Implementing the Direct Exchange Pattern
In this pattern, we define a queue and bind it to the default exchange (or a named direct exchange) using a specific routing key.
Queue Configuration:
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue notificationQueue() {
return new Queue("notification.queue");
}
}
Message Producer:
@Component
public class NotificationProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void broadcastNotification(int sequence) {
String payload = "Alert message #" + sequence + " sent at " + new Date();
System.out.println("Producer sent: " + payload);
this.rabbitTemplate.convertAndSend("notification.queue", payload);
}
}
Message Consumers:
To demonstrate load balancing, we define two consumers listening to the same queue.
@Component
@RabbitListener(queues = "notification.queue")
public class NotificationReceiverA {
@RabbitHandler
public void handle(String message) {
System.out.println("Receiver A processed: " + message);
}
}
@Component
@RabbitListener(queues = "notification.queue")
public class NotificationReceiverB {
@RabbitHandler
public void handle(String message) {
System.out.println("Receiver B processed: " + message);
}
}
Unit Test:
@Autowired
private NotificationProducer producer;
@Test
public void testDirectMessaging() {
for (int i = 0; i < 10; i++) {
producer.broadcastNotification(i);
}
}
Sending Java Objects
RabbitMQ can transmit Java Objects provided they implement Serializable. Below is an example using a custom Order entity.
import java.io.Serializable;
public class Order implements Serializable {
private String orderId;
private double amount;
public Order(String orderId, double amount) {
this.orderId = orderId;
this.amount = amount;
}
// Getters, setters, and toString omitted for brevity
@Override
public String toString() {
return "Order{id='" + orderId + "', amount=" + amount + "}";
}
}
Producer for Objects:
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void submitOrder(Order order) {
System.out.println("Sending object: " + order);
this.rabbitTemplate.convertAndSend("order.queue", order);
}
}
Consumer for Objects:
@Component
@RabbitListener(queues = "order.queue")
public class OrderReceiver {
@RabbitHandler
public void process(Order order) {
System.out.println("Consumed object: " + order);
}
}
Implementing the Topic Exchange Pattern
The Topic exchange allows flexible routing based on patterns. We will configure an exchange and bind two queues with different patterns.
@Configuration
public class TopicRabbitConfig {
public static final String QUEUE_ALERTS = "queue.alerts";
public static final String QUEUE_ALL_LOGS = "queue.all.logs";
@Bean
public Queue alertsQueue() {
return new Queue(QUEUE_ALERTS);
}
@Bean
public Queue allLogsQueue() {
return new Queue(QUEUE_ALL_LOGS);
}
@Bean
public TopicExchange appExchange() {
return new TopicExchange("app.topic.exchange");
}
@Bean
public Binding bindAlertsToExchange(Queue alertsQueue, TopicExchange appExchange) {
return BindingBuilder.bind(alertsQueue).to(appExchange).with("system.error.*");
}
@Bean
public Binding bindAllLogsToExchange(Queue allLogsQueue, TopicExchange appExchange) {
return BindingBuilder.bind(allLogsQueue).to(appExchange).with("system.#");
}
}
Topic Producer:
@Component
public class LogProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendErrorLog() {
String msg = "Critical failure in module";
System.out.println("Sending error log: " + msg);
rabbitTemplate.convertAndSend("app.topic.exchange", "system.error.critical", msg);
}
public void sendInfoLog() {
String msg = "Standard operation info";
System.out.println("Sending info log: " + msg);
rabbitTemplate.convertAndSend("app.topic.exchange", "system.info", msg);
}
}
Topic Consumers:
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_ALERTS)
public class AlertReceiver {
@RabbitHandler
public void handle(String msg) {
System.out.println("Alerts Queue Received: " + msg);
}
}
@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_ALL_LOGS)
public class LogReceiver {
@RabbitHandler
public void handle(String msg) {
System.out.println("All Logs Queue Received: " + msg);
}
}
When sending to system.error.critical, both queues receive the message (matches # and error.*). When sending to system.info, only the queue bound to system.# receives it.
Implementing the Fanout Exchange Pattern
The Fanout exchange broadcasts a message to every queue bound to it, simulating a broadcast scenario.
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue queueA() { return new Queue("fanout.a"); }
@Bean
public Queue queueB() { return new Queue("fanout.b"); }
@Bean
public Queue queueC() { return new Queue("fanout.c"); }
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("broadcast.exchange");
}
@Bean
public Binding bindA(Queue queueA, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(queueA).to(broadcastExchange);
}
@Bean
public Binding bindB(Queue queueB, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(queueB).to(broadcastExchange);
}
@Bean
public Binding bindC(Queue queueC, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(queueC).to(broadcastExchange);
}
}
Fanout Producer:
@Component
public class BroadcastProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void broadcast() {
String message = "Global announcement";
System.out.println("Broadcasting: " + message);
this.rabbitTemplate.convertAndSend("broadcast.exchange", "", message);
}
}
Fanout Consumers:
@Component
@RabbitListener(queues = "fanout.a")
public class ConsumerA {
@RabbitHandler
public void process(String msg) { System.out.println("Consumer A: " + msg); }
}
@Component
@RabbitListener(queues = "fanout.b")
public class ConsumerB {
@RabbitHandler
public void process(String msg) { System.out.println("Consumer B: " + msg); }
}
@Component
@RabbitListener(queues = "fanout.c")
public class ConsumerC {
@RabbitHandler
public void process(String msg) { System.out.println("Consumer C: " + msg); }
}
Executing the broadcast method will result in all three consumers (A, B, and C) receiving the identical message simultaneously.