Asynchronous Boost.Beast WebSocket Server with Outbound Message Queuing
Asynchronous Architectural Considerations
Standard asynchronous implementations in libraries like Boost.Beast often bind request handling tightly to response generation within the same callback chain. For instance, a typical flow initiates a read operation, processes data upon completion, immediately triggers an asynchronous write, and waits for the write completion before issuing another read. While functional for simple echo services, this serial pattern obstructs true asynchronous capabilities required for server-side push scenarios. When external events require pushing data while a client connection is idle or processing another task, relying on the incoming request callback prevents concurrent outbound operations.
Decoupling Read and Write Operations
To achieve genuine asynchronous behavior, request consumption and message transmission must be decoupled. The core strategy involves maintaining a thread-safe queue for pending outbound messages. This ensures that whether a message originates from a client request response or an internal broadcast logic, it enters a unified output path.
Key implementation points include:
- Strand Serialization: Utilize
boost::asio::strandto serialize access to shared state (such as the session socket) across multiple threads or asynchronous callbacks. This prevents race conditions without explicti mutex locks. - Output Queue: Store outgoing message buffers rather than sending them directly upon creation. The
async_writeoperation is non-blocking; calling it again while a previous one is pending without proper queuing can lead to errors or overwritten buffers. - Write Loop: Instead of writing immediately after a request, enqueue the buffer. A separate handler checks the queue upon the completion of a write operation. If messages remain, trigger the next
async_write; otherwise, resume listening for new reads.
Code Implementation Example
The following structure demonstrates how to refactor a WebSocket session to support these patterns. Variable names and logical grouping have been updated for clarity and separation of concerns.
#ifndef WEBSOCKET_SESSION_HPP
#define WEBSOCKET_SESSION_HPP
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>
#include <boost/asio/strand.hpp>
#include <iostream>
#include <vector>
#include <memory>
#include <string>
namespace net = boost::asio;
using tcp_socket = net::ip::tcp;
using ssl_stream = boost::asio::ssl::stream<tcp_socket &>;
using ws_stream = boost::beast::websocket::stream<ssl_stream>;
class WebSession : public std::enable_shared_from_this<WebSession> {
private:
tcp_socket m_clientSocket;
ws_stream m_webSocketStream;
net::executor_work_guard<net::io_context::executor_type> m_workGuard;
net::strand<net::io_context::executor_type> m_ioStrand;
// Output buffer queue
std::vector<std::shared_ptr<boost::beast::flat_buffer>> m_outboundQueue;
bool m_isWriting = false;
public:
WebSession(tcp_socket socket, boost::asio::ssl::context& ctx)
: m_clientSocket(std::move(socket)),
m_webSocketStream(m_clientSocket, ctx),
m_ioStrand(net::make_strand(boost::asio::use_awaitable)) {}
~WebSession() {
if (m_webSocketStream.is_open()) {
m_webSocketStream.close();
}
}
void start() {
// Initiate the SSL handshake followed by websocket upgrade
auto self = shared_from_this();
m_webSocketStream.handshake_async(boost::beast::websocket::request{},
boost::beast::get_lowest_layer(m_webSocketStream).get_executor(),
[self](auto ec) {
if(!ec) self->start_read();
});
}
void start_read() {
auto self = shared_from_this();
m_webSocketStream.async_read(
m_buf,
[this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
handle_read(ec, bytes_transferred);
});
}
void handle_read(boost::system::error_code ec, std::size_t /*bytes*/) {
if (ec == boost::beast::websocket::error::closed ||
ec != net::error::eof && !ec) {
stop_session();
return;
}
// Process incoming payload here asynchronously
process_incoming_message(m_buf.data());
// Do not immediately restart reading or writing based on request
// Allow time for queued pushes to flush
continue_processing();
}
void process_incoming_message(auto&& buffer) {
// Business logic for request handling
// Prepare a response
auto msg = prepare_response();
enqueue_message(msg);
}
void enqueue_message(std::string& content) {
auto buffer = std::make_shared<boost::beast::flat_buffer>();
buffer->commit(content.length());
std::memcpy(buffer->data().data(), content.c_str(), content.length());
m_outboundQueue.push_back(buffer);
ensure_next_write();
}
void ensure_next_write() {
if (!m_isWriting && !m_outboundQueue.empty()) {
do_send();
}
}
void do_send() {
m_isWriting = true;
auto bufferRef = m_outboundQueue.front();
m_webSocketStream.binary(true);
m_webSocketStream.async_write(*bufferRef,
[this, self = shared_from_this()](boost::system::error_code ec, std::size_t) {
m_isWriting = false;
process_write_completion(ec);
});
}
void process_write_completion(boost::system::error_code ec) {
if (ec) {
stop_session();
return;
}
// Remove processed buffer from queue
if (!m_outboundQueue.empty()) {
m_outboundQueue.erase(m_outboundQueue.begin());
}
// Check for remaining items in queue
if (!m_outboundQueue.empty()) {
ensure_next_write();
} else {
// No more messages to send, resume listening for requests
start_read();
}
}
void continue_processing() {
// Trigger potential push messages logic here
// Logic might check a global registry and call enqueue_message
ensure_next_write();
}
void stop_session() {
// Close connection gracefully
}
std::string prepare_response() {
return "Ack: Data Received";
}
private:
boost::beast::flat_buffer m_buf;
};
#endif
Operational Logic Summary
- Incoming Data: The
handle_readcallback extracts data and passes it to business logic. It returns control to the event loop immediately rather than blocking for I/O operations related to responses. - Message Buffering: Responses are pushed into
m_outboundQueue. This acts as a barrier preventingasync_writecalls from overlapping or overwriting each other. - Sending Mechanism: The
do_sendfunction dequeues one item at a time. Upon completion (process_write_completion), the system decides whether to send the next item from the queue or revert tostart_readif the queue is empty. - Thread Safety: The
m_ioStrandensures that even if multiple threads attempt to access the session state, execution remains serialized on the correct strand executor.