Implementing RabbitMQ Topic Exchange with Wildcard Routing
Topic exchanges enable flexible messsage routing using wildcard patterns. The binding keys support two special chaarcters: * matches exactly one word, while # matches zero or more words.
Publishing Messages with Topic Exchange
public void PublishTopicMessages()
{
var connectionHelper = new MQHelper();
using (var connection = connectionHelper.GetConnection())
using (var channel = connection.CreateModel())
{
// Declare queues
channel.QueueDeclare("APACRegion", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("HeadlinesQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
// Declare topic exchange
channel.ExchangeDeclare("RegionalExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
// Bind queues with wildcard patterns
channel.QueueBind("APACRegion", "RegionalExchange", "APAC.*", arguments: null);
channel.QueueBind("HeadlinesQueue", "RegionalExchange", "*.Headlines", arguments: null);
// Message 1: matches APACRegion (APAC.*) and HeadlinesQueue (*.Headlines)
var payload1 = Encoding.UTF8.GetBytes("Breaking news from Asia Pacific");
channel.BasicPublish("RegionalExchange", routingKey: "APAC.Headlines", basicProperties: null, body: payload1);
Console.WriteLine($"[APAC.Headlines] dispatched");
// Message 2: matches APACRegion only
var payload2 = Encoding.UTF8.GetBytes("Weather update for Asia Pacific");
channel.BasicPublish("RegionalExchange", routingKey: "APAC.Weather", basicProperties: null, body: payload2);
Console.WriteLine($"[APAC.Weather] dispatched");
// Message 3: matches HeadlinesQueue only
var payload3 = Encoding.UTF8.GetBytes("Breaking news from Europe");
channel.BasicPublish("RegionalExchange", routingKey: "EU.Headlines", basicProperties: null, body: payload3);
Console.WriteLine($"[EU.Headlines] dispatched");
}
}
In this example, the routing key APAC.Headlines matches both queues, so it gets delivered to both subscribers.
Conusming Messages from Topic Exchange
public void ConsumeTopicMessages()
{
var config = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using (var connection = config.CreateConnection())
using (var channel = connection.CreateModel())
{
// Queue declaration
channel.QueueDeclare("HeadlinesQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
// Exchange declaration
channel.ExchangeDeclare("RegionalExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
// Binding with wildcard pattern
channel.QueueBind("HeadlinesQueue", "RegionalExchange", "*.Headlines", arguments: null);
// Set up consumer
var messageHandler = new EventingBasicConsumer(channel);
messageHandler.Received += (sender, eventArgs) =>
{
byte[] messageBody = eventArgs.Body.ToArray();
string content = Encoding.UTF8.GetString(messageBody);
Console.WriteLine($"Received: [{content}]");
};
// Start consuming
channel.BasicConsume("HeadlinesQueue", autoAck: true, consumer: messageHandler);
}
}