RabbitMQ Core Concepts and Five Messaging Patterns
Synchronous vs. Asynchronous Communication
Synchronous communication involves direct, immediate interaction, similar to a video call. Asynchronous communication involves delayed interaction, allowing a sender to communicate with multiple receivers without waiting, akin to texting.
Pros and Cons of Synchronous Calls
- Pros: Strong timelinses; immediate response.
- Cons: High code coupling due to direct inter-service calls, making modifications cumbersome. Sequential service calls can lead to resource waste and performance degradation during wait times. Failure of one service may cause cascading failures and system avalanches.
Pros and Cons of Asynchronous Calls
- Pros:
- Low Coupling: Uses an event-driven model via a broker, eliminating direct service dependencies.
- Increased Throughput: Services process tasks independently without waiting for upstream completions.
- Fault Isolation: Failures in one service do not directly impact others.
- Traffic Shaping: A broker can buffer incoming requests and release them based on downstream processing capacity, preventing overload.
- Cons:
- High Broker Dependency: System reliability hinges on the broker's stability, security, and performance.
- Obfuscated Call Chains: The loose coupling can make business process flows and debugging more complex.
The choice between synchronous and asynchronous models depends on the specific use case, with synchronous being more common.
What is MQ?
MQ (Message Queue) is a broker that stores and forwards messages in an event-driven architecture.
Common MQ solutions include RabbitMQ, ActiveMQ, RocketMQ, and Kafka. RabbitMQ is chosen for its robustness and features.
RabbitMQ Installasion
RabbitMQ, built on Erlang, is installed here using Docker for simplicity.
- Pull the RabbitMQ image.
docker pull rabbitmq:3-management
- Run the RabbitMQ container.
docker run \
-e RABBITMQ_DEFAULT_USER=<your_username> \
-e RABBITMQ_DEFAULT_PASS=<your_password> \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
- Access the management console at
http://<your_ip>:15672using the credentials set above.
Key Management Console Entities:
- Channel: A virtual connection for operations.
- Exchange: Routes messages to queues.
- Queue: Buffers messages.
- Virtual Host: A logical grouping for exchanges and queues.
The messaging flow: Publisher -> Exchange -> Queue -> Consumer.
Core Messaging Models
RabbitMQ models fall into two categories:
- Basic models: Simple Queue, Work Queue.
- Publish/Subscribe models (via exchanges): Fanout, Direct, Topic.
Getting Started with SpringAMQP
SpringAMQP provides a higher-level API over the AMQP protocol. Add the dependency to your pom.xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configure connection properties in application.yml.
spring:
rabbitmq:
host: 192.168.121.10
port: 5672
virtual-host: /
username: <your_username>
password: <your_password>
Sending a Message
Use RabbitTemplate to send a message.
@SpringBootTest
public class MessageSenderTest {
@Autowired
private RabbitTemplate messageSender;
@Test
void sendBasicMessage() {
String targetQueue = "basic.queue";
String payload = "Hello, SpringAMQP!";
messageSender.convertAndSend(targetQueue, payload);
}
}
Receiving a Message
Use @RabbitListener to listen on a queue.
@Component
public class MessageReceiver {
@RabbitListener(queues = "basic.queue")
public void handleBasicMessage(String receivedPayload) {
System.out.println("Received: " + receivedPayload);
}
}
Work Queue Pattern
Multiple consumers can subscribe to the same queue. By default, RabbitMQ uses a round-robin dispatch. To enable fair dispatch based on consumer capability, limit prefetching.
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # Process one message at a time before fetching the next
Publish/Subscribe Patterns
Fanout Exchange
A Fanout Exchange broadcasts messages to all bound queues.
Define Queues, Exchange, and Bindings
@Configuration
public class BroadcastConfig {
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("broadcast.exchange");
}
@Bean
public Queue broadcastQueueA() {
return new Queue("broadcast.queue.a");
}
@Bean
public Queue broadcastQueueB() {
return new Queue("broadcast.queue.b");
}
@Bean
public Binding bindQueueA(Queue broadcastQueueA, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(broadcastQueueA).to(broadcastExchange);
}
@Bean
public Binding bindQueueB(Queue broadcastQueueB, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(broadcastQueueB).to(broadcastExchange);
}
}
Listeners for Each Queue
@Component
public class BroadcastListener {
@RabbitListener(queues = "broadcast.queue.a")
public void handleBroadcastA(String msg) {
System.out.println("Listener A: " + msg);
}
@RabbitListener(queues = "broadcast.queue.b")
public void handleBroadcastB(String msg) {
System.out.println("Listener B: " + msg);
}
}
Publisher Sends to Exchange
@Test
void sendBroadcastMessage() {
String exchangeName = "broadcast.exchange";
String payload = "Broadcast Message!!";
// Routing key is ignored for Fanout exchanges
messageSender.convertAndSend(exchangeName, "", payload);
}
Direct Exchange
A Direct Exchange routes messages to queues based on an exact match between the mesage's routing key and the queue's binding key.
Define Listeners with Binding Keys
@Component
public class RoutingListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue.x"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void handleDirectX(String msg) {
System.out.println("Queue X (red/blue): " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue.y"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void handleDirectY(String msg) {
System.out.println("Queue Y (red/yellow): " + msg);
}
}
Publisher Specifies Routing Key
@Test
void sendDirectMessage() {
String exchangeName = "direct.exchange";
String payload = "Direct Message!!";
String routingKey = "blue";
// Message goes to queues bound with key "blue"
messageSender.convertAndSend(exchangeName, routingKey, payload);
}
Topic Exchange
A Topic Exchange routes messages using a pattern match between a routing key (a dot-separated list of words) and a binding key (which can include * for a single word and # for zero or more words).
Define Listeners with Pattern Binding Keys
@Component
public class PatternListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.1"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "china.#" // Matches 'china.news', 'china.weather', etc.
))
public void handleTopicOne(String msg) {
System.out.println("Queue 1 (china.#): " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.2"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "#.news" // Matches 'china.news', 'world.news', etc.
))
public void handleTopicTwo(String msg) {
System.out.println("Queue 2 (#.news): " + msg);
}
}
Publisher Sends with a Routing Key
@Test
void sendTopicMessage() {
String exchangeName = "topic.exchange";
String payload = "Topic Message!!";
String routingKey = "china.news";
// Message goes to queues whose binding pattern matches 'china.news'
messageSender.convertAndSend(exchangeName, routingKey, payload);
}