Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing a Producer-Consumer Model with POSIX Semaphores in Linux

Tech 1

POSIX Semaphores

POSIX semaphores serve the same purpose as SystemV semaphores for synchronization operations, ensuring conflict-free access to shared resources. However, POSIX semaphores are specifically designed for thread synchronization within a process. Creating a POSIX semaphore for multithreaded applications: Destroying a POSIX semaphore in a multithreaded environment: Performing a P operation on a semaphore (acquiring resources): Performing a V operation on a semaphore (releasing resources):

Producer-Consumer Model Implementation Using Semaphores and Ring Buffer

Makefile

ring_producer_consumer: Main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ring_producer_consumer

ThreadWrapper.hpp

#ifndef __THREAD_WRAPPER_HPP__
#define __THREAD_WRAPPER_HPP__

#include 
#include 
#include 
#include 
#include 

namespace ThreadingUtils
{
    template
    using ThreadFunction = std::function;

    template
    class ThreadWrapper
    {
    public:
        void Execute()
        {
            _threadFunction(_dataRef, _threadName);
        }
    public:
        ThreadWrapper(ThreadFunction func, DataType& data, const std::string &name="unnamed-thread")
            : _threadFunction(func)
            , _dataRef(data)
            , _threadName(name)
            , _isRunning(true)
        {}

        static void *ThreadRoutine(void *args)
        {
            ThreadWrapper *currentThread = static_cast *>(args);
            currentThread->Execute();
            return nullptr;
        }

        bool Start()
        {
            int result = pthread_create(&_threadId, nullptr, ThreadRoutine, this);
            if(!result)
            {
                _isRunning = false;
                return true;
            }
            else
            {
                return false;
            }
        }

        void Detach()
        {
            if(!_isRunning)
            {
                pthread_detach(_threadId);
            }
        }

        void Join()
        {
            if(!_isRunning)
            {
                pthread_join(_threadId, nullptr);
            }
        }

        std::string GetName()
        {
            return _threadName;
        }

        void Stop()
        {
            _isRunning = true;
        }

        ~ThreadWrapper() {}
    private:
        pthread_t _threadId;
        std::string _threadName;
        DataType& _dataRef;
        ThreadFunction _threadFunction;
        bool _isRunning;
    };
}

#endif

CircularBuffer.hpp

#pragma once

#include 
#include 
#include 
#include 

// Circular buffer template class
ntemplate
class CircularBuffer
{
private:
    // Acquire resources
    void WaitSemaphore(sem_t& sem)
    {
        sem_wait(&sem);
    }

    // Release resources
    void SignalSemaphore(sem_t& sem)
    {
        sem_post(&sem);
    }

    // Lock mutex
    void AcquireLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    // Release mutex
    void ReleaseLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    CircularBuffer(int capacity)
        :_capacity(capacity)
        ,_buffer(capacity)
        ,_producerIndex(0)
        ,_consumerIndex(0)
    {
        sem_init(&_availableSlots, 0, _capacity);
        sem_init(&_filledSlots, 0, 0);
        pthread_mutex_init(&_producerMutex, nullptr);
        pthread_mutex_init(&_consumerMutex, nullptr);
    }

    // Producer enqueue function
    void Push(const DataType& item)
    {
        // Wait for available space
        WaitSemaphore(_availableSlots);
        // Lock producer mutex
        AcquireLock(_producerMutex);
        // Add item to buffer
        _buffer[_producerIndex++] = item;
        // Circular buffer wrap-around
        _producerIndex %= _capacity;
        // Unlock producer mutex
        ReleaseLock(_producerMutex);
        // Signal that data is available
        SignalSemaphore(_filledSlots);
    }

    // Consumer dequeue function
    void Pop(DataType* out)
    {
        // Wait for available data
        WaitSemaphore(_filledSlots);
        // Lock consumer mutex
        AcquireLock(_consumerMutex);
        // Remove item from buffer
        *out = _buffer[_consumerIndex++];
        _consumerIndex %= _capacity;
        // Unlock consumer mutex
        ReleaseLock(_consumerMutex);
        // Signal that space is available
        SignalSemaphore(_availableSlots);
    }

    ~CircularBuffer()
    {
        sem_destroy(&_availableSlots);
        sem_destroy(&_filledSlots);
        pthread_mutex_destroy(&_producerMutex);
        pthread_mutex_destroy(&_consumerMutex);
    }
private:
    // Vector simulating circular buffer
    std::vector _buffer;
    // Buffer capacity
    int _capacity;
    // Producer and consumer position pointers
    int _producerIndex;
    int _consumerIndex;
    // Semaphores
    sem_t _availableSlots;
    sem_t _filledSlots;
    // Mutexes
    pthread_mutex_t _producerMutex;
    pthread_mutex_t _consumerMutex;
};

WorkItem.hpp

#pragma once

#include 
#include 

typedef std::function WorkTask;

void ProcessDownload()
{
    std::cout << "Processing download..." << std::endl;
}

Main.cpp

#include "CircularBuffer.hpp"
#include "ThreadWrapper.hpp"
#include "WorkItem.hpp"
#include 
#include 
#include 

using namespace ThreadingUtils;
// Type alias for circular buffer
typedef CircularBuffer TaskBuffer;

// Consumer thread function
void ConsumerThread(TaskBuffer& buffer, const std::string& threadName)
{
    while (true)
    {
        // Retrieve task
        WorkTask task;
        buffer.Pop(&task);
        std::cout << "Consumer " << threadName << ": ";
        // Execute task
        task();
    }
}

// Producer thread function
void ProducerThread(TaskBuffer& buffer, const std::string& threadName)
{
    while (true)
    {
        // Submit task
        buffer.Push(ProcessDownload);
        std::cout << "Producer " << threadName << ": " << "Download task submitted" << std::endl;
        sleep(1);
    }
}

// Initialize thread collection
void CreateThreads(std::vector>* threads, int count, TaskBuffer& buffer, ThreadFunction function)
{
    for (int i = 0; i < count; i++)
    {
        // Create a batch of threads
        std::string name = "worker-" + std::to_string(i + 1);
        threads->emplace_back(function, buffer, name);
    }
}

// Create consumer threads
void SetupConsumers(std::vector>* threads, int count, TaskBuffer& buffer)
{
    CreateThreads(threads, count, buffer, ConsumerThread);
}

// Create producer threads
void SetupProducers(std::vector>* threads, int count, TaskBuffer& buffer)
{
    CreateThreads(threads, count, buffer, ProducerThread);
}

// Wait for all threads to complete
void WaitForAllThreads(std::vector>& threads)
{
    for (auto& thread : threads)
    {
        thread.Join();
    }
}

// Start all threads
void LaunchAll(std::vector>& threads)
{
    for (auto& thread : threads)
    {
        thread.Start();
    }
}

int main()
{
    // Create circular buffer with capacity of 10
    TaskBuffer* taskBuffer = new TaskBuffer(10);
    // Create thread collection
    std::vector> threads;
    // Create 1 consumer thread
    SetupConsumers(&threads, 1, *taskBuffer);
    // Create 1 producer thread
    SetupProducers(&threads, 1, *taskBuffer);
    // Start all threads
    LaunchAll(threads);

    // Wait for all threads to complete
    WaitForAllThreads(threads);

    return 0;
}

Results

The above implementation demonstrates a single producer, single consumer model. The main function can be modified to support multiple producers and multiple consumers.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.