Implementing a Simple RabbitMQ Producer-Consumer Workflow in .NET
A lightweight implementation of the producer-consumer architectural patttern relies on a message broker to decouple data generation from downstream processing. In this configuration, one service component broadcasts operational payloads into a RabbitMQ queue, while an independent worker service subscribes to that queue and processes incoming records. The queue functions as a persistent buffer, guaranteeing message retention until an active consumer retrieves and handles the data.
Publisher Implementation
The following endpoint demonstrates how to establish a broker connection, declare a routing destination, and transmit a serialized payload. Proper resource management is enforced through scoped disposal to prevent connection pool exhaustion.
[HttpPost("dispatch")]
public IActionResult PublishTask(TaskRequest request)
{
var connectionConfig = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
using var connection = connectionConfig.CreateConnection();
using var channel = connection.CreateModel();
const string targetQueue = "execution-pipeline-v1";
channel.QueueDeclare(
queue: targetQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var jsonPayload = JsonSerializer.Serialize(request);
var messageBuffer = Encoding.UTF8.GetBytes(jsonPayload);
var deliverySettings = channel.CreateBasicProperties();
deliverySettings.Persistent = true;
channel.BasicPublish(
exchange: string.Empty,
routingKey: targetQueue,
basicProperties: deliverySettings,
body: messageBuffer);
_logger.LogInformation("Task {Identifier} successfully queued for processing", request.Identifier);
return Ok(new { Status = "Dispatched" });
}
public class TaskRequest
{
public string Identifier { get; set; }
public string Description { get; set; }
}
Queue declaration parameters dictate persistence and scope behavior. Enabling durable ensures the broker preserves the queue structure across restarts, while setting Persistent = true on the message properties forces disk writes for the payload itself, preventing data loss during unexpected broker crashes.
Subscriber Implementation
The consumer establishes a long-lived listener on the matching queue and attaches a callback routine to handle incoming traffic. The EventingBasicConsumer automatically triggers the registered delegate whenever the broker forwards a new message.
[HttpGet("monitor")]
public IActionResult ActivateListener()
{
InitializeWorkerNode();
return Ok(new { Status = "Active" });
}
private void InitializeWorkerNode()
{
var connectionConfig = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
var connection = connectionConfig.CreateConnection();
var channel = connection.CreateModel();
const string targetQueue = "execution-pipeline-v1";
channel.QueueDeclare(
queue: targetQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var messageProcessor = new EventingBasicConsumer(channel);
messageProcessor.Received += (sender, eventArgs) =>
{
var content = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
Console.WriteLine($"Worker node captured payload: {content}");
// Business logic execution occurs here
// channel.BasicAck(eventArgs.DeliveryTag, multiple: false);
};
channel.BasicConsume(
queue: targetQueue,
autoAck: true,
consumer: messageProcessor);
}
The subscriber binds to the identical queue identifier and initiates continuous polling via BasicConsume. With autoAck set to true, RabbitMQ immediately removes messages upon delivery to the client. In production environments requiring strict data integrity, manual acknowledgments should replace automatci mode to guarantee messages are only cleared after successful business logic execution.
Running both endpoints simultaneously establishes an asynchronous communication channel. Invoking the publisher route injects a new record into the broker pipeline, and the active consumer instantly captures the trensmitted data without requiring synchronous HTTP calls between services.