Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing Message Queue Patterns with Spring AMQP

Tech 2

Spring AMQP is a framework that simplifies interaction with message brokers like RabbitMQ by providing abstractions and templates. It supports the five core messaging patterns defined by RabbitMQ, moving beyond the basic native API usage.

Messaging patterns are categorized by purpose and exchange type:

  • Basic Queue (Hello World)
  • Work Queue
  • Publish/Subscribe using Fanout Exchange
  • Routing using Direct Exchange
  • Topics using Topic Exchange

AMQP (Advanced Message Queuing Protocol) is an open standard for application-to-application message passing, independent of language and platform. Spring AMQP implements this protocol, with spring-amqp providing the abstraction and spring-rabbit as the default implementation.

Key features of Spring AMQP include:

  • Listener containers for asynchrnoous message consumption.
  • RabbitTemplate for sending and receiving messages.
  • RabbitAdmin for automatic declaration of queues, exchanges, and bindings.

Sending Messages with a Basic Queue (Hello World)

This pattern involves a simple producer-consumer model.

Configuration and Sending:

  1. Add the Spring AMQP dependency to your project's pom.xml:
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. Configure the RabbitMQ connection in application.yml:
    spring:
      rabbitmq:
        host: 192.168.1.100
        port: 5672
        username: appuser
        password: securepass
        virtual-host: /
    
  3. Use RabbitTemplate to send a message in a test class:
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class MessageSenderTest {
        @Autowired
        private RabbitTemplate messagingTemplate;
    
        @Test
        public void sendToSimpleQueue() {
            String targetQueue = "basic.queue";
            String messageContent = "Hello, Spring AMQP!";
            messagingTemplate.convertAndSend(targetQueue, messageContent);
        }
    }
    

Receiving Messages with a Basic Queue

Create a listener component to consume messages from the queue.

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {

    @RabbitListener(queues = "basic.queue")
    public void handleSimpleMessage(String receivedMessage) {
        System.out.println("Received message: " + receivedMessage);
    }
}

Implementing a Work Queue

A Work Queue dispatches tasks among multiple consumers.

Producer (Sends multiple messages):

@Test
public void sendWorkload() throws InterruptedException {
    String workQueue = "task.queue";
    for (int i = 1; i <= 50; i++) {
        String task = "Task item #" + i;
        messagingTemplate.convertAndSend(workQueue, task);
        Thread.sleep(20); // Simulate work generation delay
    }
}

Consumer (Two workers with different speeds):

@RabbitListener(queues = "task.queue")
public void processWorkFast(String task) throws InterruptedException {
    System.out.println("Fast Worker processing: " + task);
    Thread.sleep(20); // Fast processing
}

@RabbitListener(queues = "task.queue")
public void processWorkSlow(String task) throws InterruptedException {
    System.err.println("Slow Worker processing: " + task);
    Thread.sleep(100); // Slow processing
}

By default, messages are distributed evenly (round-robin) regardless of consumer speed. To implement fair dispatch, configure the prefetch count in application.yml. This limits the number of unacknowledged messages a consumer can hold, esnuring faster workers get more tasks.

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # Process one message at a time before fetching the next

Using a Fanout Exchange for Broadcast

A Fanout exchange routes messages to all bound queues, enabling publish/subscribe.

Declare Exchange, Queues, and Bindings via Configuration:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BroadcastConfig {

    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("app.broadcast");
    }

    @Bean
    public Queue subscriberQueueA() {
        return new Queue("subscriber.a.queue");
    }

    @Bean
    public Queue subscriberQueueB() {
        return new Queue("subscriber.b.queue");
    }

    @Bean
    public Binding bindQueueA(Queue subscriberQueueA, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(subscriberQueueA).to(broadcastExchange);
    }

    @Bean
    public Binding bindQueueB(Queue subscriberQueueB, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(subscriberQueueB).to(broadcastExchange);
    }
}

Send a message to the exchange (routing key is ignored for Fanout):

@Test
public void broadcastMessage() {
    String exchangeName = "app.broadcast";
    String announcement = "Important announcement for all!";
    // Empty routing key
    messagingTemplate.convertAndSend(exchangeName, "", announcement);
}

Listeners for each queue:

@RabbitListener(queues = "subscriber.a.queue")
public void listenBroadcastA(String msg) {
    System.out.println("Subscriber A received: " + msg);
}

@RabbitListener(queues = "subscriber.b.queue")
public void listenBroadcastB(String msg) {
    System.out.println("Subscriber B received: " + msg);
}

Using a Direct Exchange for Routing

A Direct exchange routes messages to queues based on an exact match of the routing key.

Declare and bind queues using annotations on listener methods:

import org.springframework.amqp.core.ExchangeTypes;

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "app.direct", type = ExchangeTypes.DIRECT),
        key = {"alert", "warning"} // Matches these routing keys
))
public void handleDirectMessages1(String msg) {
    System.out.println("Queue1 [alert/warning]: " + msg);
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "app.direct", type = ExchangeTypes.DIRECT),
        key = {"info", "warning"} // Matches these routing keys
))
public void handleDirectMessages2(String msg) {
    System.out.println("Queue2 [info/warning]: " + msg);
}

Send messages with specific routing keys:

@Test
public void sendDirectMessage() {
    String exchangeName = "app.direct";
    // Sent to Queue1 only
    messagingTemplate.convertAndSend(exchangeName, "alert", "Severe alert!");
    // Sent to both Queue1 and Queue2
    messagingTemplate.convertAndSend(exchangeName, "warning", "General warning!");
    // Sent to Queue2 only
    messagingTemplate.convertAndSend(exchangeName, "info", "Informational message.");
}

Using a Topic Exchange for Pattern-Based Routing

A Topic exchange routes messages using wildcard patterns (* for a single word, # for zero or more words) in the routing key.

Declare listeners with pattern-based bindings:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.news.queue"),
        exchange = @Exchange(name = "app.topic", type = ExchangeTypes.TOPIC),
        key = "news.*" // Matches 'news.sports', 'news.tech', etc.
))
public void listenNewsTopic(String msg) {
    System.out.println("News Topic: " + msg);
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.orders.queue"),
        exchange = @Exchange(name = "app.topic", type = ExchangeTypes.TOPIC),
        key = "order.#" // Matches 'order.new', 'order.new.eu', etc.
))
public void listenOrdersTopic(String msg) {
    System.out.println("Orders Topic: " + msg);
}

Send messages with structured routing keys:

@Test
public void sendTopicMessage() {
    String exchangeName = "app.topic";
    // Received by 'topic.news.queue'
    messagingTemplate.convertAndSend(exchangeName, "news.sports", "Sports update!");
    // Received by 'topic.orders.queue'
    messagingTemplate.convertAndSend(exchangeName, "order.new.eu", "New EU order.");
    // Received by both queues if a binding like '#' exists
    messagingTemplate.convertAndSend(exchangeName, "news", "General news.");
}

Configuring JSON Message Converter

By default, Spring AMQP uses Java serialization. To use JSON for object messages, configure a MessageConverter bean.

  1. Add Jackson dependency to pom.xml:
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  2. Declare the JSON message converter in a configuration class or main application class:
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class AmqpConfig {
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    

Sending an Object Message:

@Test
public void sendObjectMessage() {
    Map<String, Object> orderEvent = new HashMap<>();
    orderEvent.put("orderId", 1001);
    orderEvent.put("status", "SHIPPED");
    messagingTemplate.convertAndSend("object.queue", orderEvent);
}

Receiving an Object Message: Ensure the same MessageConverter is configured on the consumer side.

@RabbitListener(queues = "object.queue")
public void handleObjectMessage(Map<String, Object> incomingEvent) {
    System.out.println("Received object: " + incomingEvent);
    Integer orderId = (Integer) incomingEvent.get("orderId");
    String status = (String) incomingEvent.get("status");
    // Process the object...
}
Tags: Spring AMQP

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.