Implementing Fanout Exchange Pattern in RabbitMQ
1. Event organizer publishes an activity
2. Customer receives booking information
3. System sends email/SMS notification
Producer Implementation
public void PublishEvent()
{
ConnectionFactory connectionFactory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Credentials = new NetworkCredential("guest", "guest"),
VirtualHost = "/"
};
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
string exchangeName = "event_notification_fanout";
channel.ExchangeDeclare(
exchange: exchangeName,
type: ExchangeType.Fanout,
durable: true
);
EventDetails eventData = new EventDetails();
string jsonPayload = JsonSerializer.Serialize(eventData);
byte[] messageBody = Encoding.UTF8.GetBytes(jsonPayload);
IBasicProperties messageProperties = channel.CreateBasicProperties();
messageProperties.Persistent = true;
channel.BasicPublish(
exchange: exchangeName,
routingKey: "",
basicProperties: messageProperties,
body: messageBody
);
Console.WriteLine($"Event published: {eventData.EventId}");
}
}
}
Consumer Implementation
public void SubscribeToNotifications()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "event_notification_fanout",
type: ExchangeType.Fanout,
durable: true
);
string queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(
queue: queueName,
exchange: "event_notification_fanout",
routingKey: ""
);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"Notification received: {message}");
// Process notification
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicQos(0, 1, false);
channel.BasicConsume(
queue: queueName,
autoAck: false,
consumer: consumer
);
}
Email Service Implementation
public void EmailNotificationService()
{
var connection = new ConnectionFactory().CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "event_notification_fanout",
type: ExchangeType.Fanout,
durable: true
);
string notificationQueue = channel.QueueDeclare().QueueName;
channel.QueueBind(
queue: notificationQueue,
exchange: "event_notification_fanout",
routingKey: ""
);
var emailConsumer = new EventingBasicConsumer(channel);
emailConsumer.Received += (model, ea) =>
{
var eventData = JsonSerializer.Deserialize<EventDetails>(ea.Body.ToArray());
Console.WriteLine($"Sending email for event: {eventData.EventName}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(
queue: notificationQueue,
autoAck: false,
consumer: emailConsumer
);
}