Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing a Simple RabbitMQ Producer-Consumer Workflow in .NET

Tech May 15 1

A lightweight implementation of the producer-consumer architectural patttern relies on a message broker to decouple data generation from downstream processing. In this configuration, one service component broadcasts operational payloads into a RabbitMQ queue, while an independent worker service subscribes to that queue and processes incoming records. The queue functions as a persistent buffer, guaranteeing message retention until an active consumer retrieves and handles the data.

Publisher Implementation

The following endpoint demonstrates how to establish a broker connection, declare a routing destination, and transmit a serialized payload. Proper resource management is enforced through scoped disposal to prevent connection pool exhaustion.

[HttpPost("dispatch")]
public IActionResult PublishTask(TaskRequest request)
{
    var connectionConfig = new ConnectionFactory
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "guest",
        Password = "guest",
        VirtualHost = "/"
    };

    using var connection = connectionConfig.CreateConnection();
    using var channel = connection.CreateModel();

    const string targetQueue = "execution-pipeline-v1";

    channel.QueueDeclare(
        queue: targetQueue,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    var jsonPayload = JsonSerializer.Serialize(request);
    var messageBuffer = Encoding.UTF8.GetBytes(jsonPayload);

    var deliverySettings = channel.CreateBasicProperties();
    deliverySettings.Persistent = true;

    channel.BasicPublish(
        exchange: string.Empty,
        routingKey: targetQueue,
        basicProperties: deliverySettings,
        body: messageBuffer);

    _logger.LogInformation("Task {Identifier} successfully queued for processing", request.Identifier);
    return Ok(new { Status = "Dispatched" });
}

public class TaskRequest
{
    public string Identifier { get; set; }
    public string Description { get; set; }
}

Queue declaration parameters dictate persistence and scope behavior. Enabling durable ensures the broker preserves the queue structure across restarts, while setting Persistent = true on the message properties forces disk writes for the payload itself, preventing data loss during unexpected broker crashes.

Subscriber Implementation

The consumer establishes a long-lived listener on the matching queue and attaches a callback routine to handle incoming traffic. The EventingBasicConsumer automatically triggers the registered delegate whenever the broker forwards a new message.

[HttpGet("monitor")]
public IActionResult ActivateListener()
{
    InitializeWorkerNode();
    return Ok(new { Status = "Active" });
}

private void InitializeWorkerNode()
{
    var connectionConfig = new ConnectionFactory
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "guest",
        Password = "guest",
        VirtualHost = "/"
    };

    var connection = connectionConfig.CreateConnection();
    var channel = connection.CreateModel();

    const string targetQueue = "execution-pipeline-v1";

    channel.QueueDeclare(
        queue: targetQueue,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    var messageProcessor = new EventingBasicConsumer(channel);

    messageProcessor.Received += (sender, eventArgs) =>
    {
        var content = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
        Console.WriteLine($"Worker node captured payload: {content}");

        // Business logic execution occurs here

        // channel.BasicAck(eventArgs.DeliveryTag, multiple: false);
    };

    channel.BasicConsume(
        queue: targetQueue,
        autoAck: true,
        consumer: messageProcessor);
}

The subscriber binds to the identical queue identifier and initiates continuous polling via BasicConsume. With autoAck set to true, RabbitMQ immediately removes messages upon delivery to the client. In production environments requiring strict data integrity, manual acknowledgments should replace automatci mode to guarantee messages are only cleared after successful business logic execution.

Running both endpoints simultaneously establishes an asynchronous communication channel. Invoking the publisher route injects a new record into the broker pipeline, and the active consumer instantly captures the trensmitted data without requiring synchronous HTTP calls between services.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.