Implementing Message Queue Patterns with Spring AMQP
Spring AMQP is a framework that simplifies interaction with message brokers like RabbitMQ by providing abstractions and templates. It supports the five core messaging patterns defined by RabbitMQ, moving beyond the basic native API usage.
Messaging patterns are categorized by purpose and exchange type:
- Basic Queue (Hello World)
- Work Queue
- Publish/Subscribe using Fanout Exchange
- Routing using Direct Exchange
- Topics using Topic Exchange
AMQP (Advanced Message Queuing Protocol) is an open standard for application-to-application message passing, independent of language and platform. Spring AMQP implements this protocol, with spring-amqp providing the abstraction and spring-rabbit as the default implementation.
Key features of Spring AMQP include:
- Listener containers for asynchrnoous message consumption.
RabbitTemplatefor sending and receiving messages.RabbitAdminfor automatic declaration of queues, exchanges, and bindings.
Sending Messages with a Basic Queue (Hello World)
This pattern involves a simple producer-consumer model.
Configuration and Sending:
- Add the Spring AMQP dependency to your project's
pom.xml:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> - Configure the RabbitMQ connection in
application.yml:spring: rabbitmq: host: 192.168.1.100 port: 5672 username: appuser password: securepass virtual-host: / - Use
RabbitTemplateto send a message in a test class:import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class MessageSenderTest { @Autowired private RabbitTemplate messagingTemplate; @Test public void sendToSimpleQueue() { String targetQueue = "basic.queue"; String messageContent = "Hello, Spring AMQP!"; messagingTemplate.convertAndSend(targetQueue, messageContent); } }
Receiving Messages with a Basic Queue
Create a listener component to consume messages from the queue.
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
@RabbitListener(queues = "basic.queue")
public void handleSimpleMessage(String receivedMessage) {
System.out.println("Received message: " + receivedMessage);
}
}
Implementing a Work Queue
A Work Queue dispatches tasks among multiple consumers.
Producer (Sends multiple messages):
@Test
public void sendWorkload() throws InterruptedException {
String workQueue = "task.queue";
for (int i = 1; i <= 50; i++) {
String task = "Task item #" + i;
messagingTemplate.convertAndSend(workQueue, task);
Thread.sleep(20); // Simulate work generation delay
}
}
Consumer (Two workers with different speeds):
@RabbitListener(queues = "task.queue")
public void processWorkFast(String task) throws InterruptedException {
System.out.println("Fast Worker processing: " + task);
Thread.sleep(20); // Fast processing
}
@RabbitListener(queues = "task.queue")
public void processWorkSlow(String task) throws InterruptedException {
System.err.println("Slow Worker processing: " + task);
Thread.sleep(100); // Slow processing
}
By default, messages are distributed evenly (round-robin) regardless of consumer speed. To implement fair dispatch, configure the prefetch count in application.yml. This limits the number of unacknowledged messages a consumer can hold, esnuring faster workers get more tasks.
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # Process one message at a time before fetching the next
Using a Fanout Exchange for Broadcast
A Fanout exchange routes messages to all bound queues, enabling publish/subscribe.
Declare Exchange, Queues, and Bindings via Configuration:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BroadcastConfig {
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("app.broadcast");
}
@Bean
public Queue subscriberQueueA() {
return new Queue("subscriber.a.queue");
}
@Bean
public Queue subscriberQueueB() {
return new Queue("subscriber.b.queue");
}
@Bean
public Binding bindQueueA(Queue subscriberQueueA, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(subscriberQueueA).to(broadcastExchange);
}
@Bean
public Binding bindQueueB(Queue subscriberQueueB, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(subscriberQueueB).to(broadcastExchange);
}
}
Send a message to the exchange (routing key is ignored for Fanout):
@Test
public void broadcastMessage() {
String exchangeName = "app.broadcast";
String announcement = "Important announcement for all!";
// Empty routing key
messagingTemplate.convertAndSend(exchangeName, "", announcement);
}
Listeners for each queue:
@RabbitListener(queues = "subscriber.a.queue")
public void listenBroadcastA(String msg) {
System.out.println("Subscriber A received: " + msg);
}
@RabbitListener(queues = "subscriber.b.queue")
public void listenBroadcastB(String msg) {
System.out.println("Subscriber B received: " + msg);
}
Using a Direct Exchange for Routing
A Direct exchange routes messages to queues based on an exact match of the routing key.
Declare and bind queues using annotations on listener methods:
import org.springframework.amqp.core.ExchangeTypes;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "app.direct", type = ExchangeTypes.DIRECT),
key = {"alert", "warning"} // Matches these routing keys
))
public void handleDirectMessages1(String msg) {
System.out.println("Queue1 [alert/warning]: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "app.direct", type = ExchangeTypes.DIRECT),
key = {"info", "warning"} // Matches these routing keys
))
public void handleDirectMessages2(String msg) {
System.out.println("Queue2 [info/warning]: " + msg);
}
Send messages with specific routing keys:
@Test
public void sendDirectMessage() {
String exchangeName = "app.direct";
// Sent to Queue1 only
messagingTemplate.convertAndSend(exchangeName, "alert", "Severe alert!");
// Sent to both Queue1 and Queue2
messagingTemplate.convertAndSend(exchangeName, "warning", "General warning!");
// Sent to Queue2 only
messagingTemplate.convertAndSend(exchangeName, "info", "Informational message.");
}
Using a Topic Exchange for Pattern-Based Routing
A Topic exchange routes messages using wildcard patterns (* for a single word, # for zero or more words) in the routing key.
Declare listeners with pattern-based bindings:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.news.queue"),
exchange = @Exchange(name = "app.topic", type = ExchangeTypes.TOPIC),
key = "news.*" // Matches 'news.sports', 'news.tech', etc.
))
public void listenNewsTopic(String msg) {
System.out.println("News Topic: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.orders.queue"),
exchange = @Exchange(name = "app.topic", type = ExchangeTypes.TOPIC),
key = "order.#" // Matches 'order.new', 'order.new.eu', etc.
))
public void listenOrdersTopic(String msg) {
System.out.println("Orders Topic: " + msg);
}
Send messages with structured routing keys:
@Test
public void sendTopicMessage() {
String exchangeName = "app.topic";
// Received by 'topic.news.queue'
messagingTemplate.convertAndSend(exchangeName, "news.sports", "Sports update!");
// Received by 'topic.orders.queue'
messagingTemplate.convertAndSend(exchangeName, "order.new.eu", "New EU order.");
// Received by both queues if a binding like '#' exists
messagingTemplate.convertAndSend(exchangeName, "news", "General news.");
}
Configuring JSON Message Converter
By default, Spring AMQP uses Java serialization. To use JSON for object messages, configure a MessageConverter bean.
- Add Jackson dependency to
pom.xml:<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> - Declare the JSON message converter in a configuration class or main application class:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfig { @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
Sending an Object Message:
@Test
public void sendObjectMessage() {
Map<String, Object> orderEvent = new HashMap<>();
orderEvent.put("orderId", 1001);
orderEvent.put("status", "SHIPPED");
messagingTemplate.convertAndSend("object.queue", orderEvent);
}
Receiving an Object Message:
Ensure the same MessageConverter is configured on the consumer side.
@RabbitListener(queues = "object.queue")
public void handleObjectMessage(Map<String, Object> incomingEvent) {
System.out.println("Received object: " + incomingEvent);
Integer orderId = (Integer) incomingEvent.get("orderId");
String status = (String) incomingEvent.get("status");
// Process the object...
}