Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Connecting Multiple RabbitMQ Brokers in Spring Boot

Tech 1

Spring Boot’s AMQP auto-configuration is great for a single RabbitMQ broker, but many systems need to talk to more than one. To support multiple brokers, define separate ConnectionFactory, RabbitTemplate, RabbitAdmin, and listener container factory beans for each broker and mark one as primary.

Configuration

Example application.yml with two independent RabbitMQ endpoints:

server:
  port: 8080

mq:
  v1:
    host: host
    port: 5672
    username: username
    password: password
    virtual-host: virtual-host
    acknowledge-mode: manual
    mandatory: true
    publisher-confirms: true
    publisher-returns: true
    prefetch: 5
  v2:
    host: host
    port: 5672
    username: username
    password: password
    virtual-host: virtual-host
    acknowledge-mode: manual
    mandatory: true
    publisher-confirms: true
    publisher-returns: true
    prefetch: 5

Multi-broker infrastructure

Define per-broker beans and mark one set as the default with @Primary.

package com.example.config.amqp;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class MultiRabbitConfiguration {

    // ---- properties holders ----

    @Bean(name = "v1Props")
    @ConfigurationProperties(prefix = "mq.v1")
    public BrokerProps v1Props() {
        return new BrokerProps();
    }

    @Bean(name = "v2Props")
    @ConfigurationProperties(prefix = "mq.v2")
    public BrokerProps v2Props() {
        return new BrokerProps();
    }

    // ---- v1 (primary) ----

    @Bean(name = "v1ConnectionFactory")
    @Primary
    public CachingConnectionFactory v1ConnectionFactory(@Qualifier("v1Props") BrokerProps p) {
        CachingConnectionFactory cf = new CachingConnectionFactory();
        cf.setHost(p.getHost());
        cf.setPort(p.getPort());
        cf.setUsername(p.getUsername());
        cf.setPassword(p.getPassword());
        cf.setVirtualHost(p.getVirtualHost());
        if (p.isPublisherConfirms()) {
            cf.setPublisherConfirmType(ConfirmType.CORRELATED);
        }
        cf.setPublisherReturns(p.isPublisherReturns());
        return cf;
    }

    @Bean(name = "v1Template")
    @Primary
    public RabbitTemplate v1Template(@Qualifier("v1ConnectionFactory") ConnectionFactory cf,
                                     @Qualifier("v1Props") BrokerProps p) {
        RabbitTemplate t = new RabbitTemplate(cf);
        t.setMandatory(p.isMandatory());
        t.setConfirmCallback((correlation, ack, cause) -> {
            // handle confirms as needed
        });
        t.setReturnsCallback(returned -> {
            // handle unroutable messages as needed
        });
        return t;
    }

    @Bean(name = "v1ListenerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory v1ListenerFactory(
            @Qualifier("v1ConnectionFactory") ConnectionFactory cf,
            @Qualifier("v1Props") BrokerProps p) {
        SimpleRabbitListenerContainerFactory f = new SimpleRabbitListenerContainerFactory();
        f.setConnectionFactory(cf);
        f.setAcknowledgeMode(AcknowledgeMode.valueOf(p.getAcknowledgeMode().toUpperCase()));
        f.setPrefetchCount(p.getPrefetch());
        return f;
    }

    @Bean(name = "v1Admin")
    @Primary
    public RabbitAdmin v1Admin(@Qualifier("v1ConnectionFactory") ConnectionFactory cf) {
        RabbitAdmin admin = new RabbitAdmin(cf);
        admin.setAutoStartup(true);
        return admin;
    }

    // ---- v2 ----

    @Bean(name = "v2ConnectionFactory")
    public CachingConnectionFactory v2ConnectionFactory(@Qualifier("v2Props") BrokerProps p) {
        CachingConnectionFactory cf = new CachingConnectionFactory();
        cf.setHost(p.getHost());
        cf.setPort(p.getPort());
        cf.setUsername(p.getUsername());
        cf.setPassword(p.getPassword());
        cf.setVirtualHost(p.getVirtualHost());
        if (p.isPublisherConfirms()) {
            cf.setPublisherConfirmType(ConfirmType.CORRELATED);
        }
        cf.setPublisherReturns(p.isPublisherReturns());
        return cf;
    }

    @Bean(name = "v2Template")
    public RabbitTemplate v2Template(@Qualifier("v2ConnectionFactory") ConnectionFactory cf,
                                     @Qualifier("v2Props") BrokerProps p) {
        RabbitTemplate t = new RabbitTemplate(cf);
        t.setMandatory(p.isMandatory());
        t.setConfirmCallback((correlation, ack, cause) -> {
            // handle confirms as needed
        });
        t.setReturnsCallback(returned -> {
            // handle unroutable messages as needed
        });
        return t;
    }

    @Bean(name = "v2ListenerFactory")
    public SimpleRabbitListenerContainerFactory v2ListenerFactory(
            @Qualifier("v2ConnectionFactory") ConnectionFactory cf,
            @Qualifier("v2Props") BrokerProps p) {
        SimpleRabbitListenerContainerFactory f = new SimpleRabbitListenerContainerFactory();
        f.setConnectionFactory(cf);
        f.setAcknowledgeMode(AcknowledgeMode.valueOf(p.getAcknowledgeMode().toUpperCase()));
        f.setPrefetchCount(p.getPrefetch());
        return f;
    }

    @Bean(name = "v2Admin")
    public RabbitAdmin v2Admin(@Qualifier("v2ConnectionFactory") ConnectionFactory cf) {
        RabbitAdmin admin = new RabbitAdmin(cf);
        admin.setAutoStartup(true);
        return admin;
    }
}

BrokerProps binds the YAML for each broker:

package com.example.config.amqp;

public class BrokerProps {
    private String host;
    private int port = 5672;
    private String username;
    private String password;
    private String virtualHost = "/";
    private boolean mandatory = true;
    private boolean publisherConfirms = true;
    private boolean publisherReturns = true;
    private String acknowledgeMode = "MANUAL"; // MANUAL, AUTO, NONE
    private int prefetch = 5;

    public String getHost() { return host; }
    public void setHost(String host) { this.host = host; }
    public int getPort() { return port; }
    public void setPort(int port) { this.port = port; }
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
    public String getVirtualHost() { return virtualHost; }
    public void setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; }
    public boolean isMandatory() { return mandatory; }
    public void setMandatory(boolean mandatory) { this.mandatory = mandatory; }
    public boolean isPublisherConfirms() { return publisherConfirms; }
    public void setPublisherConfirms(boolean publisherConfirms) { this.publisherConfirms = publisherConfirms; }
    public boolean isPublisherReturns() { return publisherReturns; }
    public void setPublisherReturns(boolean publisherReturns) { this.publisherReturns = publisherReturns; }
    public String getAcknowledgeMode() { return acknowledgeMode; }
    public void setAcknowledgeMode(String acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; }
    public int getPrefetch() { return prefetch; }
    public void setPrefetch(int prefetch) { this.prefetch = prefetch; }
}

Declaring exchange, queue, and binding

Create topology with a selected RabbitAdmin. Below uses the v2 admin:

package com.example.config.amqp;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopologyConfiguration {

    public static final String EXCHANGE = "ex.topic.demo";
    public static final String QUEUE = "q.topic.demo";
    public static final String ROUTING_KEY = "rk.topic.demo";

    @Resource(name = "v2Admin")
    private RabbitAdmin admin;

    @PostConstruct
    public void declare() {
        TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
        Queue queue = new Queue(QUEUE, true);
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(binding);
    }
}

Producer

Send messages to the same exchaneg via different brokers using distinct RabbitTemplate beans.

package com.example.messaging;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import static com.example.config.amqp.TopologyConfiguration.*;

@Component
public class TopicPublisher {

    @Resource(name = "v1Template")
    private RabbitTemplate primaryTemplate;

    @Resource(name = "v2Template")
    private RabbitTemplate secondaryTemplate;

    public void publish() {
        primaryTemplate.convertAndSend(EXCHANGE, ROUTING_KEY,
                "Message published via v1Template");

        secondaryTemplate.convertAndSend(EXCHANGE, ROUTING_KEY,
                "Message published via v2Template");
    }
}

Consumer

Choose the appropriate listener factory to bind this consumer to a specific connection.

package com.example.messaging;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q.topic.demo", containerFactory = "v2ListenerFactory")
public class TopicListener {

    @RabbitHandler
    public void onMessage(String payload) {
        System.out.println(payload);
    }
}

Test

package com.example;

import com.example.messaging.TopicPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MultiRabbitTest {

    @Autowired
    private TopicPublisher publisher;

    @Test
    public void publishToBothBrokers() {
        publisher.publish();
    }
}

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.