Implementing Thread-Safe Programming Patterns in Modern C++
Threading Paradigms: Message-Based vs Shared-State Synchronization
The most robust threading paradigm is message-based synchronization, typically implemented using a thread-safe queue class (mt_queue). Threads communicate exclusively via these queues, eliminating the need for explicit locks.
- The only synchronization lock is encapsulated within the queue implementation, making it inaccessible to users and eliminating daedlock potential.
- Threads share no writable variables except through queues, naturally satisfying the "write-exclusive" principle of thread safety.
- mt_queue - message-based multi-threading (shared-nothing, reduces errors)
- Many third-party implementations exist, including atomic-based lock-free queues that can outperform mutex-based solutions.
Inefficient Shared-State Synchronization
- This approach sacrifices debugging time for negligible performance gains measured in nanoseconds.
- mutex - shared-state multi-threading (complex shared state, error-prone)
Key Implementation Considerations
1. RAII Lock Management with std::lock_guard
std::lock_guard implements RAII (Resource Acquisition Is Initialization) for mutex management. It guarantees mutex release when scope exits, regardless of exceptions.
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
std::string shared_data = "initial";
std::mutex data_mutex;
void writer_thread() {
try {
std::unique_lock guard(data_mutex);
shared_data = "modified";
throw std::runtime_error("Simulated error");
} catch(...) {
// Lock automatically released
}
}
void reader_thread() {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::unique_lock guard(data_mutex);
shared_data = "updated";
}
int main() {
std::vector<std::thread> workers;
workers.emplace_back(writer_thread);
workers.emplace_back(reader_thread);
for(auto& worker : workers) {
worker.join();
}
std::cout << shared_data << std::endl;
return 0;
}
2. Correct Thread Synchronization Patterns
Incorrect Producer-Consumer Implementation:
std::string buffer;
std::mutex buffer_mutex;
void producer() {
std::unique_lock guard(buffer_mutex);
buffer = "data";
}
void consumer() {
// Memory visibility issues and CPU spin
while(buffer.empty());
std::cout << "Received: " << buffer << std::endl;
}
Problesm include memory ordering violations, cache coherence issues, and CPU spinning.
Improved Implementation with Yielding:
void better_consumer() {
std::unique_lock guard(buffer_mutex);
while(buffer.empty()) {
guard.unlock();
std::this_thread::yield(); // Yield to other threads
guard.lock();
}
std::cout << "Received: " << buffer << std::endl;
}
Optimal Implementation with Condition Variables:
#include <condition_variable>
std::condition_variable data_ready;
void optimal_consumer() {
std::unique_lock guard(buffer_mutex);
data_ready.wait(guard, []{ return !buffer.empty(); });
std::cout << "Received: " << buffer << std::endl;
}
void producer_with_notify() {
std::unique_lock guard(buffer_mutex);
buffer = "data";
data_ready.notify_one();
}
Timeout-Based Waiting:
void timed_consumer() {
std::unique_lock guard(buffer_mutex);
bool success = data_ready.wait_for(guard,
std::chrono::seconds(1),
[]{ return !buffer.empty(); });
if(success) {
std::cout << "Received: " << buffer << std::endl;
}
}
3. Measuring Execution Time with steady_clock
#include <chrono>
int main() {
auto start = std::chrono::steady_clock::now();
// Perform work
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<
std::chrono::milliseconds>(end - start);
std::cout << "Elapsed: " << duration.count() << "ms" << std::endl;
return 0;
}
4. Common Pattern: Condition Variable with Deque
template<typename T>
class ThreadSafeDeque {
private:
std::deque<T> container;
std::mutex container_mutex;
std::condition_variable data_available;
public:
void push_front(T item) {
std::unique_lock guard(container_mutex);
container.push_front(std::move(item));
data_available.notify_one();
}
T pop_back() {
std::unique_lock guard(container_mutex);
while(container.empty()) {
data_available.wait(guard);
}
T item = std::move(container.back());
container.pop_back();
return item;
}
};
Implementing Thread-Safe Queues
1. Unbounded Thread-Safe Queue
template<typename T>
class UnboundedThreadQueue {
private:
std::deque<T> storage;
mutable std::mutex storage_mutex;
std::condition_variable not_empty;
public:
void enqueue(T value) {
std::unique_lock guard(storage_mutex);
storage.push_front(std::move(value));
not_empty.notify_one();
}
T dequeue() {
std::unique_lock guard(storage_mutex);
while(storage.empty()) {
not_empty.wait(guard);
}
T value = std::move(storage.back());
storage.pop_back();
return value;
}
std::optional<T> try_dequeue() {
std::unique_lock guard(storage_mutex);
if(storage.empty()) {
return std::nullopt;
}
T value = std::move(storage.back());
storage.pop_back();
return value;
}
std::optional<T> dequeue_timeout(
std::chrono::steady_clock::duration timeout) {
std::unique_lock guard(storage_mutex);
if(!not_empty.wait_for(guard, timeout,
[this]{ return !storage.empty(); })) {
return std::nullopt;
}
T value = std::move(storage.back());
storage.pop_back();
return value;
}
};
2. Bounded Thread-Safe Queue
template<typename T>
class BoundedThreadQueue {
private:
std::deque<T> storage;
mutable std::mutex storage_mutex;
std::condition_variable not_empty;
std::condition_variable not_full;
size_t capacity;
public:
explicit BoundedThreadQueue(size_t max_size) : capacity(max_size) {}
void enqueue(T value) {
std::unique_lock guard(storage_mutex);
while(storage.size() >= capacity) {
not_full.wait(guard);
}
storage.push_front(std::move(value));
not_empty.notify_one();
}
T dequeue() {
std::unique_lock guard(storage_mutex);
while(storage.empty()) {
not_empty.wait(guard);
}
T value = std::move(storage.back());
storage.pop_back();
not_full.notify_one();
return value;
}
};