Fading Coder

An Old Coder’s Final Dance

Home > Tech > Content

Building a Minimal Thread Pool in C with POSIX Threads

Tech 2

Thread pools are a common concurrency primitive for servers and systems programming. Instead of creating a thread for each task, a thread pool maintains a fixed set of worker threads that pull work items from a shared queue. This approach cuts thread creation/destruction overhead and keeps latency low for short-lived tasks.

This article walks through a compact, fixed-size thread pool implemented on top of pthreads. It provides a small C API to create a pool, enqueue work, and shut it down either immediately or gracefully.

Design Overview

  • A queue holds pending tasks (function pointer + opaque arguement).
  • N worker threads wait on a condition variable when the queue is empty.
  • When a task is enqueued, one waiting worker is signaled.
  • Shutdown supports two modes:
    • Immediate: workers stop after the next wake-up, evenif items remain
    • Graceful: workers drain all queued tasks, then exit

The queue is implemented as a ring buffer tracked by head/tail indices and a current count.

Data Structures

  • Work item:
    • function: void ()(void)
    • arguement: void*
  • Thread pool:
    • Synchronization: pthread_mutex_t + pthread_cond_t
    • Workers array: pthread_t*
    • Task ring buffer: work item array
    • Queue state: capacity, head, tail, length
    • Thread state: number created, number currently alive
    • Shutdown flag: 0/1/2 (none/immediate/graceful)

Public API

  • threadpool_create(thread_count, queue_size, flags): allocate and start a fixed number of workers and a task queue
  • threadpool_add(pool, fn, arg, flags): enqueue a new task
  • threadpool_destroy(pool, flags): stop workers and free resources; when flags includes threadpool_graceful, pending work drains before exit

Error codes report invalid parameters, lock failures, full queues, pool shutdown, or thread join errors.

Worker Behavior

Each worker loops:

  1. Lock the pool mutex.
  2. Wait while the queue is empty and the pool is not shutting down.
  3. If immediate shutdown is requested, or graceful shutdown with no remaining work, exit the loop.
  4. Otherwise pull one task from the queue, unlock, and execute it.

On exit, the worker decrements the running thread count under the lock and returns.

Build and Run

  • Compile with pthreads enabled (e.g., gcc -O2 -pthread)
  • Include threadpool.h in consumers
  • Link threadpool.c into your project

Example Test Program

The following test enqueues as many tasks as fit in the queue, waits for half of them to finish, then triggers an immediate shutdown.

/* test_threadpool.c */
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include "threadpool.h"

#define NUM_THREADS 32
#define QUEUE_CAP   256

static int submitted = 0;
static int completed = 0;
static pthread_mutex_t tally = PTHREAD_MUTEX_INITIALIZER;

static void work_item(void *arg) {
    (void)arg;
    usleep(10000);
    pthread_mutex_lock(&tally);
    completed++;
    pthread_mutex_unlock(&tally);
}

int main(void) {
    threadpool_t *pool = threadpool_create(NUM_THREADS, QUEUE_CAP, 0);
    assert(pool != NULL);

    fprintf(stderr, "Started pool with %d workers, queue capacity %d\n",
            NUM_THREADS, QUEUE_CAP);

    while (threadpool_add(pool, work_item, NULL, 0) == 0) {
        pthread_mutex_lock(&tally);
        submitted++;
        pthread_mutex_unlock(&tally);
    }

    fprintf(stderr, "Enqueued %d tasks\n", submitted);

    while (completed < submitted / 2) {
        usleep(10000);
    }

    assert(threadpool_destroy(pool, 0) == 0);
    fprintf(stderr, "Completed %d tasks before shutdown\n", completed);
    return 0;
}

Header: threadpool.h

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

#ifdef __cplusplus
extern "C" {
#endif

#define MAX_THREADS 64
#define MAX_QUEUE   65536

typedef struct threadpool threadpool_t;

typedef enum {
    threadpool_invalid        = -1,
    threadpool_lock_failure   = -2,
    threadpool_queue_full     = -3,
    threadpool_shutdown       = -4,
    threadpool_thread_failure = -5
} threadpool_error_t;

typedef enum {
    threadpool_graceful = 1
} threadpool_destroy_flags_t;

threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);

int threadpool_add(threadpool_t *pool,
                   void (*function)(void *),
                   void *argument,
                   int flags);

int threadpool_destroy(threadpool_t *pool, int flags);

#ifdef __cplusplus
}
#endif

#endif /* THREADPOOL_H_ */

Implementation: threadpool.c

#include <pthread.h>
#include <stdlib.h>
#include "threadpool.h"

typedef enum {
    shutdown_none      = 0,
    shutdown_immediate = 1,
    shutdown_graceful  = 2
} shutdown_mode_t;

typedef struct {
    void (*fn)(void *);
    void *arg;
} tp_task_t;

struct threadpool {
    pthread_mutex_t mtx;
    pthread_cond_t  cv;

    pthread_t   *workers;
    int          nthreads;   /* created worker slots */
    int          alive;      /* workers not yet exited */

    tp_task_t   *queue;
    int          qcap;       /* capacity */
    int          qhead;      /* index of next pop */
    int          qtail;      /* index of next push */
    int          qlen;       /* current size */

    int          shutting;   /* shutdown_mode_t */
};

static void *worker_main(void *arg);
static int   pool_free(threadpool_t *pool);

threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
    (void)flags;
    if (thread_count <= 0 || thread_count > MAX_THREADS ||
        queue_size   <= 0 || queue_size   > MAX_QUEUE) {
        return NULL;
    }

    threadpool_t *pool = (threadpool_t *)calloc(1, sizeof(*pool));
    if (!pool) return NULL;

    pool->workers = (pthread_t *)calloc((size_t)thread_count, sizeof(pthread_t));
    pool->queue   = (tp_task_t *)calloc((size_t)queue_size, sizeof(tp_task_t));
    if (!pool->workers || !pool->queue) {
        pool_free(pool);
        return NULL;
    }

    if (pthread_mutex_init(&pool->mtx, NULL) != 0 ||
        pthread_cond_init(&pool->cv, NULL) != 0) {
        pool_free(pool);
        return NULL;
    }

    pool->nthreads = 0;
    pool->alive    = 0;
    pool->qcap     = queue_size;
    pool->qhead    = 0;
    pool->qtail    = 0;
    pool->qlen     = 0;
    pool->shutting = shutdown_none;

    for (int i = 0; i < thread_count; ++i) {
        if (pthread_create(&pool->workers[i], NULL, worker_main, pool) != 0) {
            /* Partial startup: request immediate shutdown and clean up */
            threadpool_destroy(pool, 0);
            return NULL;
        }
        pool->nthreads++;
        pool->alive++;
    }
    return pool;
}

int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags)
{
    (void)flags;
    if (!pool || !function) {
        return threadpool_invalid;
    }

    if (pthread_mutex_lock(&pool->mtx) != 0) {
        return threadpool_lock_failure;
    }

    int rc = 0;

    if (pool->shutting != shutdown_none) {
        rc = threadpool_shutdown;
    } else if (pool->qlen == pool->qcap) {
        rc = threadpool_queue_full;
    } else {
        pool->queue[pool->qtail].fn  = function;
        pool->queue[pool->qtail].arg = argument;
        pool->qtail = (pool->qtail + 1) % pool->qcap;
        pool->qlen++;
        (void)pthread_cond_signal(&pool->cv);
    }

    if (pthread_mutex_unlock(&pool->mtx) != 0) {
        return threadpool_lock_failure;
    }
    return rc;
}

int threadpool_destroy(threadpool_t *pool, int flags)
{
    if (!pool) return threadpool_invalid;

    if (pthread_mutex_lock(&pool->mtx) != 0) {
        return threadpool_lock_failure;
    }

    if (pool->shutting != shutdown_none) {
        pthread_mutex_unlock(&pool->mtx);
        return threadpool_shutdown;
    }

    pool->shutting = (flags & threadpool_graceful) ? shutdown_graceful
                                                   : shutdown_immediate;
    pthread_cond_broadcast(&pool->cv);

    if (pthread_mutex_unlock(&pool->mtx) != 0) {
        return threadpool_lock_failure;
    }

    int err = 0;
    for (int i = 0; i < pool->nthreads; ++i) {
        if (pthread_join(pool->workers[i], NULL) != 0) {
            err = threadpool_thread_failure;
        }
    }

    if (err == 0) {
        pool->alive = 0;
        (void)pool_free(pool);
    }
    return err;
}

static int pool_free(threadpool_t *pool)
{
    if (!pool) return 0;
    if (pool->alive > 0) return -1;

    if (pool->workers) free(pool->workers);
    if (pool->queue)   free(pool->queue);

    pthread_mutex_destroy(&pool->mtx);
    pthread_cond_destroy(&pool->cv);
    free(pool);
    return 0;
}

static void *worker_main(void *arg)
{
    threadpool_t *pool = (threadpool_t *)arg;

    for (;;) {
        tp_task_t task = {0};

        if (pthread_mutex_lock(&pool->mtx) != 0) {
            break;
        }

        while (pool->qlen == 0 && pool->shutting == shutdown_none) {
            pthread_cond_wait(&pool->cv, &pool->mtx);
        }

        if (pool->shutting == shutdown_immediate ||
            (pool->shutting == shutdown_graceful && pool->qlen == 0)) {
            pool->alive--;
            pthread_mutex_unlock(&pool->mtx);
            break;
        }

        task = pool->queue[pool->qhead];
        pool->qhead = (pool->qhead + 1) % pool->qcap;
        pool->qlen--;
        pthread_mutex_unlock(&pool->mtx);

        if (task.fn) {
            task.fn(task.arg);
        }
    }

    return NULL;
}

Minimal Build Command

  • gcc -O2 -pthread threadpool.c test_threadpool.c -o test_threadpool
Tags: c

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.