Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing RabbitMQ Messaging Patterns in Spring Boot

Tech May 12 3

RabbitMQ Architecture and Exchange Types

RabbitMQ acts as a message broker that facilitates asynchronous communication between distinct serviecs in a distributed system. The architecture follows the Producer-Consumer model, where a Producer generates data and sends it to a Queue. A Consumer subscribes to the queue to process the data.

A critical component in RabbitMQ is the Exchange. Instead of sending messages directly to a queue, producers publish messages to an exchange. The exchange is responsible for routing these messages to specific queues based on predefined rules. RabbitMQ supports several exchange types, including Direct, Topic, Headers, and Fanout.

  • Direct Exchange: Routes messages to queues where the binding key exactly matches the routing key.
  • Topic Exchange: Performs wildcard matching between the routing key and the binding pattern (e.g., stock.*).
  • Headers Exchange: Routes messages based on header key-value pairs rather than routing keys.
  • Fanout Exchange: Broadcasts messages to all bound queues, ignoring routing keys entirely.

Project Configuration

To integrate RabbitMQ with Spring Boot, add the spring-boot-starter-amqp dependency to your build file.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Configure the connection details in application.properties:

spring.application.name=rabbitmq-spring-demo
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

Implementing the Direct Exchange Pattern

In this pattern, we define a queue and bind it to the default exchange (or a named direct exchange) using a specific routing key.

Queue Configuration:

@Configuration
public class DirectRabbitConfig {

    @Bean
    public Queue notificationQueue() {
        return new Queue("notification.queue");
    }
}

Message Producer:

@Component
public class NotificationProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void broadcastNotification(int sequence) {
        String payload = "Alert message #" + sequence + " sent at " + new Date();
        System.out.println("Producer sent: " + payload);
        this.rabbitTemplate.convertAndSend("notification.queue", payload);
    }
}

Message Consumers:

To demonstrate load balancing, we define two consumers listening to the same queue.

@Component
@RabbitListener(queues = "notification.queue")
public class NotificationReceiverA {

    @RabbitHandler
    public void handle(String message) {
        System.out.println("Receiver A processed: " + message);
    }
}

@Component
@RabbitListener(queues = "notification.queue")
public class NotificationReceiverB {

    @RabbitHandler
    public void handle(String message) {
        System.out.println("Receiver B processed: " + message);
    }
}

Unit Test:

@Autowired
private NotificationProducer producer;

@Test
public void testDirectMessaging() {
    for (int i = 0; i < 10; i++) {
        producer.broadcastNotification(i);
    }
}

Sending Java Objects

RabbitMQ can transmit Java Objects provided they implement Serializable. Below is an example using a custom Order entity.

import java.io.Serializable;

public class Order implements Serializable {
    private String orderId;
    private double amount;

    public Order(String orderId, double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }

    // Getters, setters, and toString omitted for brevity
    @Override
    public String toString() {
        return "Order{id='" + orderId + "', amount=" + amount + "}";
    }
}

Producer for Objects:

@Component
public class OrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void submitOrder(Order order) {
        System.out.println("Sending object: " + order);
        this.rabbitTemplate.convertAndSend("order.queue", order);
    }
}

Consumer for Objects:

@Component
@RabbitListener(queues = "order.queue")
public class OrderReceiver {

    @RabbitHandler
    public void process(Order order) {
        System.out.println("Consumed object: " + order);
    }
}

Implementing the Topic Exchange Pattern

The Topic exchange allows flexible routing based on patterns. We will configure an exchange and bind two queues with different patterns.

@Configuration
public class TopicRabbitConfig {

    public static final String QUEUE_ALERTS = "queue.alerts";
    public static final String QUEUE_ALL_LOGS = "queue.all.logs";

    @Bean
    public Queue alertsQueue() {
        return new Queue(QUEUE_ALERTS);
    }

    @Bean
    public Queue allLogsQueue() {
        return new Queue(QUEUE_ALL_LOGS);
    }

    @Bean
    public TopicExchange appExchange() {
        return new TopicExchange("app.topic.exchange");
    }

    @Bean
    public Binding bindAlertsToExchange(Queue alertsQueue, TopicExchange appExchange) {
        return BindingBuilder.bind(alertsQueue).to(appExchange).with("system.error.*");
    }

    @Bean
    public Binding bindAllLogsToExchange(Queue allLogsQueue, TopicExchange appExchange) {
        return BindingBuilder.bind(allLogsQueue).to(appExchange).with("system.#");
    }
}

Topic Producer:

@Component
public class LogProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendErrorLog() {
        String msg = "Critical failure in module";
        System.out.println("Sending error log: " + msg);
        rabbitTemplate.convertAndSend("app.topic.exchange", "system.error.critical", msg);
    }

    public void sendInfoLog() {
        String msg = "Standard operation info";
        System.out.println("Sending info log: " + msg);
        rabbitTemplate.convertAndSend("app.topic.exchange", "system.info", msg);
    }
}

Topic Consumers:

@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_ALERTS)
public class AlertReceiver {
    @RabbitHandler
    public void handle(String msg) {
        System.out.println("Alerts Queue Received: " + msg);
    }
}

@Component
@RabbitListener(queues = TopicRabbitConfig.QUEUE_ALL_LOGS)
public class LogReceiver {
    @RabbitHandler
    public void handle(String msg) {
        System.out.println("All Logs Queue Received: " + msg);
    }
}

When sending to system.error.critical, both queues receive the message (matches # and error.*). When sending to system.info, only the queue bound to system.# receives it.

Implementing the Fanout Exchange Pattern

The Fanout exchange broadcasts a message to every queue bound to it, simulating a broadcast scenario.

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue queueA() { return new Queue("fanout.a"); }

    @Bean
    public Queue queueB() { return new Queue("fanout.b"); }

    @Bean
    public Queue queueC() { return new Queue("fanout.c"); }

    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("broadcast.exchange");
    }

    @Bean
    public Binding bindA(Queue queueA, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(queueA).to(broadcastExchange);
    }

    @Bean
    public Binding bindB(Queue queueB, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(queueB).to(broadcastExchange);
    }

    @Bean
    public Binding bindC(Queue queueC, FanoutExchange broadcastExchange) {
        return BindingBuilder.bind(queueC).to(broadcastExchange);
    }
}

Fanout Producer:

@Component
public class BroadcastProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void broadcast() {
        String message = "Global announcement";
        System.out.println("Broadcasting: " + message);
        this.rabbitTemplate.convertAndSend("broadcast.exchange", "", message);
    }
}

Fanout Consumers:

@Component
@RabbitListener(queues = "fanout.a")
public class ConsumerA {
    @RabbitHandler
    public void process(String msg) { System.out.println("Consumer A: " + msg); }
}

@Component
@RabbitListener(queues = "fanout.b")
public class ConsumerB {
    @RabbitHandler
    public void process(String msg) { System.out.println("Consumer B: " + msg); }
}

@Component
@RabbitListener(queues = "fanout.c")
public class ConsumerC {
    @RabbitHandler
    public void process(String msg) { System.out.println("Consumer C: " + msg); }
}

Executing the broadcast method will result in all three consumers (A, B, and C) receiving the identical message simultaneously.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.