From 50567abb1790a906cde151b3c75d4bfedd5302ef Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Fri, 4 Oct 2019 23:24:03 +0100 Subject: [PATCH] Add a thread pool implementation to be used by PAM workers --- Makefile.am | 1 + thread-pool.c | 395 ++++++++++++++++++++++++++++++++++++++++++++++++++ thread-pool.h | 58 ++++++++ 3 files changed, 454 insertions(+) create mode 100644 thread-pool.c create mode 100644 thread-pool.h diff --git a/Makefile.am b/Makefile.am index a6bac78..2efb2b9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,6 +3,7 @@ bin_PROGRAMS = auth-sasl-server auth_sasl_server_SOURCES = \ base64.c base64.h \ + thread-pool.c thread-pool.h \ conn.c conn.h \ pam.c pam.h \ main.c diff --git a/thread-pool.c b/thread-pool.c new file mode 100644 index 0000000..48dcb33 --- /dev/null +++ b/thread-pool.c @@ -0,0 +1,395 @@ +/* + * Thread pool implementation. + * See <thr_pool.h> for interface declarations. + */ + +#define _GNU_SOURCE +#include <errno.h> +#include <signal.h> +#include <stdlib.h> +#include <time.h> +#include "thread-pool.h" + +/* + * FIFO queued job + */ +typedef struct job job_t; +struct job { + job_t *job_next; /* linked list of jobs */ + void *(*job_func)(void *); /* function to call */ + void *job_arg; /* its argument */ +}; + +/* + * List of active worker threads, linked through their stacks. + */ +typedef struct active active_t; +struct active { + active_t *active_next; /* linked list of threads */ + pthread_t active_tid; /* active thread id */ +}; + +/* + * The thread pool, opaque to the clients. + */ +struct thr_pool { + thr_pool_t *pool_forw; /* circular linked list */ + thr_pool_t *pool_back; /* of all thread pools */ + pthread_mutex_t pool_mutex; /* protects the pool data */ + pthread_cond_t pool_busycv; /* synchronization in pool_queue */ + pthread_cond_t pool_workcv; /* synchronization with workers */ + pthread_cond_t pool_waitcv; /* synchronization in pool_wait() */ + active_t *pool_active; /* list of threads performing work */ + job_t *pool_head; /* head of FIFO job queue */ + job_t *pool_tail; /* tail of FIFO job queue */ + pthread_attr_t pool_attr; /* attributes of the workers */ + int pool_flags; /* see below */ + unsigned int pool_linger; /* seconds before idle workers exit */ + int pool_minimum; /* minimum number of worker threads */ + int pool_maximum; /* maximum number of worker threads */ + int pool_nthreads; /* current number of worker threads */ + int pool_idle; /* number of idle workers */ +}; + +/* pool_flags */ +#define POOL_WAIT 0x01 /* waiting in thr_pool_wait() */ +#define POOL_DESTROY 0x02 /* pool is being destroyed */ + +/* the list of all created and not yet destroyed thread pools */ +static thr_pool_t *thr_pools = NULL; + +/* protects thr_pools */ +static pthread_mutex_t thr_pool_lock = PTHREAD_MUTEX_INITIALIZER; + +/* set of all signals */ +static sigset_t fillset; + +static void *worker_thread(void *); + +static int create_worker(thr_pool_t *pool) { + sigset_t oset; + int error; + pthread_t handle; + + (void)pthread_sigmask(SIG_SETMASK, &fillset, &oset); + error = pthread_create(&handle, &pool->pool_attr, worker_thread, pool); + (void)pthread_sigmask(SIG_SETMASK, &oset, NULL); + return (error); +} + +/* + * Worker thread is terminating. Possible reasons: + * - excess idle thread is terminating because there is no work. + * - thread was cancelled (pool is being destroyed). + * - the job function called pthread_exit(). + * In the last case, create another worker thread + * if necessary to keep the pool populated. + */ +static void worker_cleanup(void *arg) { + thr_pool_t *pool = (thr_pool_t *)arg; + + --pool->pool_nthreads; + if (pool->pool_flags & POOL_DESTROY) { + if (pool->pool_nthreads == 0) + (void)pthread_cond_broadcast(&pool->pool_busycv); + } else if (pool->pool_head != NULL && + pool->pool_nthreads < pool->pool_maximum && + create_worker(pool) == 0) { + pool->pool_nthreads++; + } + (void)pthread_mutex_unlock(&pool->pool_mutex); +} + +static void notify_waiters(thr_pool_t *pool) { + if (pool->pool_head == NULL && pool->pool_active == NULL) { + pool->pool_flags &= ~POOL_WAIT; + (void)pthread_cond_broadcast(&pool->pool_waitcv); + } +} + +/* + * Called by a worker thread on return from a job. + */ +static void job_cleanup(void *arg) { + thr_pool_t *pool = (thr_pool_t *)arg; + pthread_t my_tid = pthread_self(); + active_t *activep; + active_t **activepp; + + (void)pthread_mutex_lock(&pool->pool_mutex); + for (activepp = &pool->pool_active; (activep = *activepp) != NULL; + activepp = &activep->active_next) { + if (activep->active_tid == my_tid) { + *activepp = activep->active_next; + break; + } + } + if (pool->pool_flags & POOL_WAIT) + notify_waiters(pool); +} + +static void *worker_thread(void *arg) { + thr_pool_t *pool = (thr_pool_t *)arg; + int timedout; + job_t *job; + void *(*func)(void *); + active_t active; + struct timespec ts; + + /* + * This is the worker's main loop. It will only be left + * if a timeout occurs or if the pool is being destroyed. + */ + (void)pthread_mutex_lock(&pool->pool_mutex); + pthread_cleanup_push(worker_cleanup, pool); + active.active_tid = pthread_self(); + for (;;) { + /* + * We don't know what this thread was doing during + * its last job, so we reset its signal mask and + * cancellation state back to the initial values. + */ + (void)pthread_sigmask(SIG_SETMASK, &fillset, NULL); + (void)pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + (void)pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + + timedout = 0; + pool->pool_idle++; + if (pool->pool_flags & POOL_WAIT) + notify_waiters(pool); + while (pool->pool_head == NULL && !(pool->pool_flags & POOL_DESTROY)) { + if (pool->pool_nthreads <= pool->pool_minimum) { + (void)pthread_cond_wait(&pool->pool_workcv, &pool->pool_mutex); + } else { + (void)clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += pool->pool_linger; + if (pool->pool_linger == 0 || + pthread_cond_timedwait(&pool->pool_workcv, &pool->pool_mutex, + &ts) == ETIMEDOUT) { + timedout = 1; + break; + } + } + } + pool->pool_idle--; + if (pool->pool_flags & POOL_DESTROY) + break; + if ((job = pool->pool_head) != NULL) { + timedout = 0; + func = job->job_func; + arg = job->job_arg; + pool->pool_head = job->job_next; + if (job == pool->pool_tail) + pool->pool_tail = NULL; + active.active_next = pool->pool_active; + pool->pool_active = &active; + (void)pthread_mutex_unlock(&pool->pool_mutex); + pthread_cleanup_push(job_cleanup, pool); + free(job); + /* + * Call the specified job function. + */ + (void)func(arg); + /* + * If the job function calls pthread_exit(), the thread + * calls job_cleanup(pool) and worker_cleanup(pool); + * the integrity of the pool is thereby maintained. + */ + pthread_cleanup_pop(1); /* job_cleanup(pool) */ + } + if (timedout && pool->pool_nthreads > pool->pool_minimum) { + /* + * We timed out and there is no work to be done + * and the number of workers exceeds the minimum. + * Exit now to reduce the size of the pool. + */ + break; + } + } + pthread_cleanup_pop(1); /* worker_cleanup(pool) */ + return (NULL); +} + +static void clone_attributes(pthread_attr_t *new_attr, + pthread_attr_t *old_attr) { + struct sched_param param; + void *addr; + size_t size; + int value; + + (void)pthread_attr_init(new_attr); + + if (old_attr != NULL) { + (void)pthread_attr_getstack(old_attr, &addr, &size); + (void)pthread_attr_setstack(new_attr, NULL, size); + + (void)pthread_attr_getscope(old_attr, &value); + (void)pthread_attr_setscope(new_attr, value); + + (void)pthread_attr_getinheritsched(old_attr, &value); + (void)pthread_attr_setinheritsched(new_attr, value); + + (void)pthread_attr_getschedpolicy(old_attr, &value); + (void)pthread_attr_setschedpolicy(new_attr, value); + + (void)pthread_attr_getschedparam(old_attr, ¶m); + (void)pthread_attr_setschedparam(new_attr, ¶m); + + (void)pthread_attr_getguardsize(old_attr, &size); + (void)pthread_attr_setguardsize(new_attr, size); + } + + /* make all pool threads be detached threads */ + (void)pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED); +} + +thr_pool_t *thr_pool_create(int min_threads, int max_threads, + unsigned linger, pthread_attr_t *attr) { + thr_pool_t *pool; + + (void)sigfillset(&fillset); + + if (min_threads > max_threads || max_threads < 1) { + errno = EINVAL; + return (NULL); + } + + if ((pool = malloc(sizeof(*pool))) == NULL) { + errno = ENOMEM; + return (NULL); + } + (void)pthread_mutex_init(&pool->pool_mutex, NULL); + (void)pthread_cond_init(&pool->pool_busycv, NULL); + (void)pthread_cond_init(&pool->pool_workcv, NULL); + (void)pthread_cond_init(&pool->pool_waitcv, NULL); + pool->pool_active = NULL; + pool->pool_head = NULL; + pool->pool_tail = NULL; + pool->pool_flags = 0; + pool->pool_linger = linger; + pool->pool_minimum = min_threads; + pool->pool_maximum = max_threads; + pool->pool_nthreads = 0; + pool->pool_idle = 0; + + /* + * We cannot just copy the attribute pointer. + * We need to initialize a new pthread_attr_t structure using + * the values from the caller-supplied attribute structure. + * If the attribute pointer is NULL, we need to initialize + * the new pthread_attr_t structure with default values. + */ + clone_attributes(&pool->pool_attr, attr); + + /* insert into the global list of all thread pools */ + (void)pthread_mutex_lock(&thr_pool_lock); + if (thr_pools == NULL) { + pool->pool_forw = pool; + pool->pool_back = pool; + thr_pools = pool; + } else { + thr_pools->pool_back->pool_forw = pool; + pool->pool_forw = thr_pools; + pool->pool_back = thr_pools->pool_back; + thr_pools->pool_back = pool; + } + (void)pthread_mutex_unlock(&thr_pool_lock); + + return (pool); +} + +int thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg) { + job_t *job; + + if ((job = malloc(sizeof(*job))) == NULL) { + errno = ENOMEM; + return (-1); + } + job->job_next = NULL; + job->job_func = func; + job->job_arg = arg; + + (void)pthread_mutex_lock(&pool->pool_mutex); + + if (pool->pool_head == NULL) + pool->pool_head = job; + else + pool->pool_tail->job_next = job; + pool->pool_tail = job; + + if (pool->pool_idle > 0) + (void)pthread_cond_signal(&pool->pool_workcv); + else if (pool->pool_nthreads < pool->pool_maximum && create_worker(pool) == 0) + pool->pool_nthreads++; + + (void)pthread_mutex_unlock(&pool->pool_mutex); + return (0); +} + +static void mutex_unlock_cb(void *arg) { + pthread_mutex_t *mx = (pthread_mutex_t *)arg; + (void)pthread_mutex_unlock(mx); +} + +void thr_pool_wait(thr_pool_t *pool) { + (void)pthread_mutex_lock(&pool->pool_mutex); + pthread_cleanup_push(mutex_unlock_cb, &pool->pool_mutex); + while (pool->pool_head != NULL || pool->pool_active != NULL) { + pool->pool_flags |= POOL_WAIT; + (void)pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex); + } + pthread_cleanup_pop(1); /* pthread_mutex_unlock(&pool->pool_mutex); */ +} + +void thr_pool_destroy(thr_pool_t *pool) { + active_t *activep; + job_t *job; + + (void)pthread_mutex_lock(&pool->pool_mutex); + pthread_cleanup_push(mutex_unlock_cb, &pool->pool_mutex); + + /* mark the pool as being destroyed; wakeup idle workers */ + pool->pool_flags |= POOL_DESTROY; + (void)pthread_cond_broadcast(&pool->pool_workcv); + + /* cancel all active workers */ + for (activep = pool->pool_active; activep != NULL; + activep = activep->active_next) + (void)pthread_cancel(activep->active_tid); + + /* wait for all active workers to finish */ + while (pool->pool_active != NULL) { + pool->pool_flags |= POOL_WAIT; + (void)pthread_cond_wait(&pool->pool_waitcv, &pool->pool_mutex); + } + + /* the last worker to terminate will wake us up */ + while (pool->pool_nthreads != 0) + (void)pthread_cond_wait(&pool->pool_busycv, &pool->pool_mutex); + + pthread_cleanup_pop(1); /* pthread_mutex_unlock(&pool->pool_mutex); */ + + /* + * Unlink the pool from the global list of all pools. + */ + (void)pthread_mutex_lock(&thr_pool_lock); + if (thr_pools == pool) + thr_pools = pool->pool_forw; + if (thr_pools == pool) + thr_pools = NULL; + else { + pool->pool_back->pool_forw = pool->pool_forw; + pool->pool_forw->pool_back = pool->pool_back; + } + (void)pthread_mutex_unlock(&thr_pool_lock); + + /* + * There should be no pending jobs, but just in case... + */ + for (job = pool->pool_head; job != NULL; job = pool->pool_head) { + pool->pool_head = job->job_next; + free(job); + } + (void)pthread_attr_destroy(&pool->pool_attr); + free(pool); +} diff --git a/thread-pool.h b/thread-pool.h new file mode 100644 index 0000000..3d53924 --- /dev/null +++ b/thread-pool.h @@ -0,0 +1,58 @@ +#ifndef __thread_pool_H +#define __thread_pool_H 1 + +/* + * Declarations for the clients of a thread pool. + */ + +#include <pthread.h> + +/* + * The thr_pool_t type is opaque to the client. + * It is created by thr_pool_create() and must be passed + * unmodified to the remainder of the interfaces. + */ +typedef struct thr_pool thr_pool_t; + +/* + * Create a thread pool. + * min_threads: the minimum number of threads kept in the pool, + * always available to perform work requests. + * max_threads: the maximum number of threads that can be + * in the pool, performing work requests. + * linger: the number of seconds excess idle worker threads + * (greater than min_threads) linger before exiting. + * attr: attributes of all worker threads (can be NULL); + * can be destroyed after calling thr_pool_create(). + * On error, thr_pool_create() returns NULL with errno set to the error code. + */ +thr_pool_t *thr_pool_create(int min_threads, int max_threads, + unsigned int linger, pthread_attr_t *attr); + +/* + * Enqueue a work request to the thread pool job queue. + * If there are idle worker threads, awaken one to perform the job. + * Else if the maximum number of workers has not been reached, + * create a new worker thread to perform the job. + * Else just return after adding the job to the queue; + * an existing worker thread will perform the job when + * it finishes the job it is currently performing. + * + * The job is performed as if a new detached thread were created for it: + * pthread_create(NULL, attr, void *(*func)(void *), void *arg); + * + * On error, thr_pool_queue() returns -1 with errno set to the error code. + */ +int thr_pool_queue(thr_pool_t *pool, void *(*func)(void *), void *arg); + +/* + * Wait for all queued jobs to complete. + */ +void thr_pool_wait(thr_pool_t *pool); + +/* + * Cancel all queued jobs and destroy the pool. + */ +void thr_pool_destroy(thr_pool_t *pool); + +#endif -- GitLab