Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Asynchronous Boost.Beast WebSocket Server with Outbound Message Queuing

Tech May 9 2

Asynchronous Architectural Considerations

Standard asynchronous implementations in libraries like Boost.Beast often bind request handling tightly to response generation within the same callback chain. For instance, a typical flow initiates a read operation, processes data upon completion, immediately triggers an asynchronous write, and waits for the write completion before issuing another read. While functional for simple echo services, this serial pattern obstructs true asynchronous capabilities required for server-side push scenarios. When external events require pushing data while a client connection is idle or processing another task, relying on the incoming request callback prevents concurrent outbound operations.

Decoupling Read and Write Operations

To achieve genuine asynchronous behavior, request consumption and message transmission must be decoupled. The core strategy involves maintaining a thread-safe queue for pending outbound messages. This ensures that whether a message originates from a client request response or an internal broadcast logic, it enters a unified output path.

Key implementation points include:

  • Strand Serialization: Utilize boost::asio::strand to serialize access to shared state (such as the session socket) across multiple threads or asynchronous callbacks. This prevents race conditions without explicti mutex locks.
  • Output Queue: Store outgoing message buffers rather than sending them directly upon creation. The async_write operation is non-blocking; calling it again while a previous one is pending without proper queuing can lead to errors or overwritten buffers.
  • Write Loop: Instead of writing immediately after a request, enqueue the buffer. A separate handler checks the queue upon the completion of a write operation. If messages remain, trigger the next async_write; otherwise, resume listening for new reads.

Code Implementation Example

The following structure demonstrates how to refactor a WebSocket session to support these patterns. Variable names and logical grouping have been updated for clarity and separation of concerns.

#ifndef WEBSOCKET_SESSION_HPP
#define WEBSOCKET_SESSION_HPP

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>
#include <boost/asio/strand.hpp>
#include <iostream>
#include <vector>
#include <memory>
#include <string>

namespace net = boost::asio;
using tcp_socket = net::ip::tcp;
using ssl_stream = boost::asio::ssl::stream<tcp_socket &>;
using ws_stream = boost::beast::websocket::stream<ssl_stream>;

class WebSession : public std::enable_shared_from_this<WebSession> {
private:
    tcp_socket m_clientSocket;
    ws_stream m_webSocketStream;
    net::executor_work_guard<net::io_context::executor_type> m_workGuard;
    net::strand<net::io_context::executor_type> m_ioStrand;
    
    // Output buffer queue
    std::vector<std::shared_ptr<boost::beast::flat_buffer>> m_outboundQueue;
    bool m_isWriting = false;

public:
    WebSession(tcp_socket socket, boost::asio::ssl::context& ctx)
        : m_clientSocket(std::move(socket)),
          m_webSocketStream(m_clientSocket, ctx),
          m_ioStrand(net::make_strand(boost::asio::use_awaitable)) {}

    ~WebSession() {
        if (m_webSocketStream.is_open()) {
            m_webSocketStream.close();
        }
    }

    void start() {
        // Initiate the SSL handshake followed by websocket upgrade
        auto self = shared_from_this();
        m_webSocketStream.handshake_async(boost::beast::websocket::request{}, 
            boost::beast::get_lowest_layer(m_webSocketStream).get_executor(),
            [self](auto ec) { 
                if(!ec) self->start_read();
            });
    }

    void start_read() {
        auto self = shared_from_this();
        m_webSocketStream.async_read(
            m_buf,
            [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
                handle_read(ec, bytes_transferred);
            });
    }

    void handle_read(boost::system::error_code ec, std::size_t /*bytes*/) {
        if (ec == boost::beast::websocket::error::closed || 
            ec != net::error::eof && !ec) {
            stop_session();
            return;
        }

        // Process incoming payload here asynchronously
        process_incoming_message(m_buf.data());

        // Do not immediately restart reading or writing based on request
        // Allow time for queued pushes to flush
        continue_processing();
    }

    void process_incoming_message(auto&& buffer) {
        // Business logic for request handling
        // Prepare a response
        auto msg = prepare_response();
        enqueue_message(msg);
    }

    void enqueue_message(std::string& content) {
        auto buffer = std::make_shared<boost::beast::flat_buffer>();
        buffer->commit(content.length());
        std::memcpy(buffer->data().data(), content.c_str(), content.length());
        
        m_outboundQueue.push_back(buffer);
        ensure_next_write();
    }

    void ensure_next_write() {
        if (!m_isWriting && !m_outboundQueue.empty()) {
            do_send();
        }
    }

    void do_send() {
        m_isWriting = true;
        auto bufferRef = m_outboundQueue.front();
        m_webSocketStream.binary(true);
        m_webSocketStream.async_write(*bufferRef,
            [this, self = shared_from_this()](boost::system::error_code ec, std::size_t) {
                m_isWriting = false;
                process_write_completion(ec);
            });
    }

    void process_write_completion(boost::system::error_code ec) {
        if (ec) {
            stop_session();
            return;
        }

        // Remove processed buffer from queue
        if (!m_outboundQueue.empty()) {
            m_outboundQueue.erase(m_outboundQueue.begin());
        }

        // Check for remaining items in queue
        if (!m_outboundQueue.empty()) {
            ensure_next_write();
        } else {
            // No more messages to send, resume listening for requests
            start_read();
        }
    }

    void continue_processing() {
        // Trigger potential push messages logic here
        // Logic might check a global registry and call enqueue_message
        ensure_next_write();
    }

    void stop_session() {
        // Close connection gracefully
    }

    std::string prepare_response() {
        return "Ack: Data Received";
    }

private:
    boost::beast::flat_buffer m_buf;
};

#endif

Operational Logic Summary

  1. Incoming Data: The handle_read callback extracts data and passes it to business logic. It returns control to the event loop immediately rather than blocking for I/O operations related to responses.
  2. Message Buffering: Responses are pushed into m_outboundQueue. This acts as a barrier preventing async_write calls from overlapping or overwriting each other.
  3. Sending Mechanism: The do_send function dequeues one item at a time. Upon completion (process_write_completion), the system decides whether to send the next item from the queue or revert to start_read if the queue is empty.
  4. Thread Safety: The m_ioStrand ensures that even if multiple threads attempt to access the session state, execution remains serialized on the correct strand executor.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.