Connecting Multiple RabbitMQ Brokers in Spring Boot
Spring Boot’s AMQP auto-configuration is great for a single RabbitMQ broker, but many systems need to talk to more than one. To support multiple brokers, define separate ConnectionFactory, RabbitTemplate, RabbitAdmin, and listener container factory beans for each broker and mark one as primary.
Configuration
Example application.yml with two independent RabbitMQ endpoints:
server:
port: 8080
mq:
v1:
host: host
port: 5672
username: username
password: password
virtual-host: virtual-host
acknowledge-mode: manual
mandatory: true
publisher-confirms: true
publisher-returns: true
prefetch: 5
v2:
host: host
port: 5672
username: username
password: password
virtual-host: virtual-host
acknowledge-mode: manual
mandatory: true
publisher-confirms: true
publisher-returns: true
prefetch: 5
Multi-broker infrastructure
Define per-broker beans and mark one set as the default with @Primary.
package com.example.config.amqp;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MultiRabbitConfiguration {
// ---- properties holders ----
@Bean(name = "v1Props")
@ConfigurationProperties(prefix = "mq.v1")
public BrokerProps v1Props() {
return new BrokerProps();
}
@Bean(name = "v2Props")
@ConfigurationProperties(prefix = "mq.v2")
public BrokerProps v2Props() {
return new BrokerProps();
}
// ---- v1 (primary) ----
@Bean(name = "v1ConnectionFactory")
@Primary
public CachingConnectionFactory v1ConnectionFactory(@Qualifier("v1Props") BrokerProps p) {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setHost(p.getHost());
cf.setPort(p.getPort());
cf.setUsername(p.getUsername());
cf.setPassword(p.getPassword());
cf.setVirtualHost(p.getVirtualHost());
if (p.isPublisherConfirms()) {
cf.setPublisherConfirmType(ConfirmType.CORRELATED);
}
cf.setPublisherReturns(p.isPublisherReturns());
return cf;
}
@Bean(name = "v1Template")
@Primary
public RabbitTemplate v1Template(@Qualifier("v1ConnectionFactory") ConnectionFactory cf,
@Qualifier("v1Props") BrokerProps p) {
RabbitTemplate t = new RabbitTemplate(cf);
t.setMandatory(p.isMandatory());
t.setConfirmCallback((correlation, ack, cause) -> {
// handle confirms as needed
});
t.setReturnsCallback(returned -> {
// handle unroutable messages as needed
});
return t;
}
@Bean(name = "v1ListenerFactory")
@Primary
public SimpleRabbitListenerContainerFactory v1ListenerFactory(
@Qualifier("v1ConnectionFactory") ConnectionFactory cf,
@Qualifier("v1Props") BrokerProps p) {
SimpleRabbitListenerContainerFactory f = new SimpleRabbitListenerContainerFactory();
f.setConnectionFactory(cf);
f.setAcknowledgeMode(AcknowledgeMode.valueOf(p.getAcknowledgeMode().toUpperCase()));
f.setPrefetchCount(p.getPrefetch());
return f;
}
@Bean(name = "v1Admin")
@Primary
public RabbitAdmin v1Admin(@Qualifier("v1ConnectionFactory") ConnectionFactory cf) {
RabbitAdmin admin = new RabbitAdmin(cf);
admin.setAutoStartup(true);
return admin;
}
// ---- v2 ----
@Bean(name = "v2ConnectionFactory")
public CachingConnectionFactory v2ConnectionFactory(@Qualifier("v2Props") BrokerProps p) {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setHost(p.getHost());
cf.setPort(p.getPort());
cf.setUsername(p.getUsername());
cf.setPassword(p.getPassword());
cf.setVirtualHost(p.getVirtualHost());
if (p.isPublisherConfirms()) {
cf.setPublisherConfirmType(ConfirmType.CORRELATED);
}
cf.setPublisherReturns(p.isPublisherReturns());
return cf;
}
@Bean(name = "v2Template")
public RabbitTemplate v2Template(@Qualifier("v2ConnectionFactory") ConnectionFactory cf,
@Qualifier("v2Props") BrokerProps p) {
RabbitTemplate t = new RabbitTemplate(cf);
t.setMandatory(p.isMandatory());
t.setConfirmCallback((correlation, ack, cause) -> {
// handle confirms as needed
});
t.setReturnsCallback(returned -> {
// handle unroutable messages as needed
});
return t;
}
@Bean(name = "v2ListenerFactory")
public SimpleRabbitListenerContainerFactory v2ListenerFactory(
@Qualifier("v2ConnectionFactory") ConnectionFactory cf,
@Qualifier("v2Props") BrokerProps p) {
SimpleRabbitListenerContainerFactory f = new SimpleRabbitListenerContainerFactory();
f.setConnectionFactory(cf);
f.setAcknowledgeMode(AcknowledgeMode.valueOf(p.getAcknowledgeMode().toUpperCase()));
f.setPrefetchCount(p.getPrefetch());
return f;
}
@Bean(name = "v2Admin")
public RabbitAdmin v2Admin(@Qualifier("v2ConnectionFactory") ConnectionFactory cf) {
RabbitAdmin admin = new RabbitAdmin(cf);
admin.setAutoStartup(true);
return admin;
}
}
BrokerProps binds the YAML for each broker:
package com.example.config.amqp;
public class BrokerProps {
private String host;
private int port = 5672;
private String username;
private String password;
private String virtualHost = "/";
private boolean mandatory = true;
private boolean publisherConfirms = true;
private boolean publisherReturns = true;
private String acknowledgeMode = "MANUAL"; // MANUAL, AUTO, NONE
private int prefetch = 5;
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public String getVirtualHost() { return virtualHost; }
public void setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; }
public boolean isMandatory() { return mandatory; }
public void setMandatory(boolean mandatory) { this.mandatory = mandatory; }
public boolean isPublisherConfirms() { return publisherConfirms; }
public void setPublisherConfirms(boolean publisherConfirms) { this.publisherConfirms = publisherConfirms; }
public boolean isPublisherReturns() { return publisherReturns; }
public void setPublisherReturns(boolean publisherReturns) { this.publisherReturns = publisherReturns; }
public String getAcknowledgeMode() { return acknowledgeMode; }
public void setAcknowledgeMode(String acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; }
public int getPrefetch() { return prefetch; }
public void setPrefetch(int prefetch) { this.prefetch = prefetch; }
}
Declaring exchange, queue, and binding
Create topology with a selected RabbitAdmin. Below uses the v2 admin:
package com.example.config.amqp;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopologyConfiguration {
public static final String EXCHANGE = "ex.topic.demo";
public static final String QUEUE = "q.topic.demo";
public static final String ROUTING_KEY = "rk.topic.demo";
@Resource(name = "v2Admin")
private RabbitAdmin admin;
@PostConstruct
public void declare() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
Queue queue = new Queue(QUEUE, true);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
admin.declareExchange(exchange);
admin.declareQueue(queue);
admin.declareBinding(binding);
}
}
Producer
Send messages to the same exchaneg via different brokers using distinct RabbitTemplate beans.
package com.example.messaging;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import static com.example.config.amqp.TopologyConfiguration.*;
@Component
public class TopicPublisher {
@Resource(name = "v1Template")
private RabbitTemplate primaryTemplate;
@Resource(name = "v2Template")
private RabbitTemplate secondaryTemplate;
public void publish() {
primaryTemplate.convertAndSend(EXCHANGE, ROUTING_KEY,
"Message published via v1Template");
secondaryTemplate.convertAndSend(EXCHANGE, ROUTING_KEY,
"Message published via v2Template");
}
}
Consumer
Choose the appropriate listener factory to bind this consumer to a specific connection.
package com.example.messaging;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q.topic.demo", containerFactory = "v2ListenerFactory")
public class TopicListener {
@RabbitHandler
public void onMessage(String payload) {
System.out.println(payload);
}
}
Test
package com.example;
import com.example.messaging.TopicPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MultiRabbitTest {
@Autowired
private TopicPublisher publisher;
@Test
public void publishToBothBrokers() {
publisher.publish();
}
}