Implementing a Producer-Consumer Model with POSIX Semaphores in Linux
POSIX Semaphores
POSIX semaphores serve the same purpose as SystemV semaphores for synchronization operations, ensuring conflict-free access to shared resources. However, POSIX semaphores are specifically designed for thread synchronization within a process. Creating a POSIX semaphore for multithreaded applications: Destroying a POSIX semaphore in a multithreaded environment: Performing a P operation on a semaphore (acquiring resources): Performing a V operation on a semaphore (releasing resources):Producer-Consumer Model Implementation Using Semaphores and Ring Buffer
Makefile
ring_producer_consumer: Main.cpp g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f ring_producer_consumer
ThreadWrapper.hpp
#ifndef __THREAD_WRAPPER_HPP__ #define __THREAD_WRAPPER_HPP__ #include#include #include #include #include namespace ThreadingUtils { template using ThreadFunction = std::function ; template class ThreadWrapper { public: void Execute() { _threadFunction(_dataRef, _threadName); } public: ThreadWrapper(ThreadFunction func, DataType& data, const std::string &name="unnamed-thread") : _threadFunction(func) , _dataRef(data) , _threadName(name) , _isRunning(true) {} static void *ThreadRoutine(void *args) { ThreadWrapper *currentThread = static_cast *>(args); currentThread->Execute(); return nullptr; } bool Start() { int result = pthread_create(&_threadId, nullptr, ThreadRoutine, this); if(!result) { _isRunning = false; return true; } else { return false; } } void Detach() { if(!_isRunning) { pthread_detach(_threadId); } } void Join() { if(!_isRunning) { pthread_join(_threadId, nullptr); } } std::string GetName() { return _threadName; } void Stop() { _isRunning = true; } ~ThreadWrapper() {} private: pthread_t _threadId; std::string _threadName; DataType& _dataRef; ThreadFunction _threadFunction; bool _isRunning; }; } #endif
CircularBuffer.hpp
#pragma once #include#include #include #include // Circular buffer template class ntemplate class CircularBuffer { private: // Acquire resources void WaitSemaphore(sem_t& sem) { sem_wait(&sem); } // Release resources void SignalSemaphore(sem_t& sem) { sem_post(&sem); } // Lock mutex void AcquireLock(pthread_mutex_t& mutex) { pthread_mutex_lock(&mutex); } // Release mutex void ReleaseLock(pthread_mutex_t& mutex) { pthread_mutex_unlock(&mutex); } public: CircularBuffer(int capacity) :_capacity(capacity) ,_buffer(capacity) ,_producerIndex(0) ,_consumerIndex(0) { sem_init(&_availableSlots, 0, _capacity); sem_init(&_filledSlots, 0, 0); pthread_mutex_init(&_producerMutex, nullptr); pthread_mutex_init(&_consumerMutex, nullptr); } // Producer enqueue function void Push(const DataType& item) { // Wait for available space WaitSemaphore(_availableSlots); // Lock producer mutex AcquireLock(_producerMutex); // Add item to buffer _buffer[_producerIndex++] = item; // Circular buffer wrap-around _producerIndex %= _capacity; // Unlock producer mutex ReleaseLock(_producerMutex); // Signal that data is available SignalSemaphore(_filledSlots); } // Consumer dequeue function void Pop(DataType* out) { // Wait for available data WaitSemaphore(_filledSlots); // Lock consumer mutex AcquireLock(_consumerMutex); // Remove item from buffer *out = _buffer[_consumerIndex++]; _consumerIndex %= _capacity; // Unlock consumer mutex ReleaseLock(_consumerMutex); // Signal that space is available SignalSemaphore(_availableSlots); } ~CircularBuffer() { sem_destroy(&_availableSlots); sem_destroy(&_filledSlots); pthread_mutex_destroy(&_producerMutex); pthread_mutex_destroy(&_consumerMutex); } private: // Vector simulating circular buffer std::vector _buffer; // Buffer capacity int _capacity; // Producer and consumer position pointers int _producerIndex; int _consumerIndex; // Semaphores sem_t _availableSlots; sem_t _filledSlots; // Mutexes pthread_mutex_t _producerMutex; pthread_mutex_t _consumerMutex; };
WorkItem.hpp
#pragma once #include#include typedef std::function WorkTask; void ProcessDownload() { std::cout << "Processing download..." << std::endl; }
Main.cpp
#include "CircularBuffer.hpp" #include "ThreadWrapper.hpp" #include "WorkItem.hpp" #include#include #include using namespace ThreadingUtils; // Type alias for circular buffer typedef CircularBuffer TaskBuffer; // Consumer thread function void ConsumerThread(TaskBuffer& buffer, const std::string& threadName) { while (true) { // Retrieve task WorkTask task; buffer.Pop(&task); std::cout << "Consumer " << threadName << ": "; // Execute task task(); } } // Producer thread function void ProducerThread(TaskBuffer& buffer, const std::string& threadName) { while (true) { // Submit task buffer.Push(ProcessDownload); std::cout << "Producer " << threadName << ": " << "Download task submitted" << std::endl; sleep(1); } } // Initialize thread collection void CreateThreads(std::vector >* threads, int count, TaskBuffer& buffer, ThreadFunction function) { for (int i = 0; i < count; i++) { // Create a batch of threads std::string name = "worker-" + std::to_string(i + 1); threads->emplace_back(function, buffer, name); } } // Create consumer threads void SetupConsumers(std::vector >* threads, int count, TaskBuffer& buffer) { CreateThreads(threads, count, buffer, ConsumerThread); } // Create producer threads void SetupProducers(std::vector >* threads, int count, TaskBuffer& buffer) { CreateThreads(threads, count, buffer, ProducerThread); } // Wait for all threads to complete void WaitForAllThreads(std::vector >& threads) { for (auto& thread : threads) { thread.Join(); } } // Start all threads void LaunchAll(std::vector >& threads) { for (auto& thread : threads) { thread.Start(); } } int main() { // Create circular buffer with capacity of 10 TaskBuffer* taskBuffer = new TaskBuffer(10); // Create thread collection std::vector > threads; // Create 1 consumer thread SetupConsumers(&threads, 1, *taskBuffer); // Create 1 producer thread SetupProducers(&threads, 1, *taskBuffer); // Start all threads LaunchAll(threads); // Wait for all threads to complete WaitForAllThreads(threads); return 0; }
Results
The above implementation demonstrates a single producer, single consumer model. The main function can be modified to support multiple producers and multiple consumers.