Purpose
For my recent project, I needed a thread pool. I saw a lot of implementations at GitHub, but most of them were quite complicated. With complication comes performance penalty. Most importantly, none of them provided dynamically managable threads. So I decided to write myself a thread pool in C. It was a fun project to make anyway.
Discussion
- My
ThreadPool
contains two queues, one thread queue(ThreadList *
) and one job queue(Job *
). As usual, new jobs end up in the rear of the job queue. The job queue is protected by aqueuemutex
, and all access and modifications are made to the queue only after holding the mutex. - The thread queue, or most notably the
numThreads
, the thread counter, is guarded by the another mutex and conditional, namelycondmutex
andconditional
, which serves the purpose of conditional idle wait of the threads. When a new thread is added to the queue,condmutex
is held,numThread
is incremented, and the lock is released. Then only the actualpthread_create
call is issued, and a newThreadList *
is added to the thread queue. - When a thread wants to go in idle state, either because job queue is
NULL
or asuspendPool
call is issued, it holds thecondmutex
first, then incrementswaitingThreads
, which is a counter for waiting threads. It then checks if all threads are waiting and it is not a suspension call, if it's true, it signalsendconditional
, and breaks any potential caller waiting in thewaitForComplete
call. It then goes intoconditional
wait, and when wakes up, decrements thewaitingThreads
and releasescondmutex
. - When a thread is intended to be removed from the pool,
removeThreadFromPool
is issued. This method just holds thequeuemutex
, incrementsremoveThreads
counter, and returns. In the execution loop inthreadExecutor
, each thread first holds thequeuemutex
, then checks ifremoveThreads
is positive. If it is, the threadbreak
s from the loop. When a thread comes outside of the execution loop, it checks if the pool is still running or not, indicated by the flagrun
inThreadPool
. If the pool is running but the thread is still outside of the loop, it sure was a removal call, in which case, the thread releases thequeuemutex
, and exits. suspendPool
andresumePool
works in a similar flag based manner, with the flagsuspend
and using the mutexesqueuemutex
andcondmutex
. As no thread is actively suspended while executing, only any thread wanting to get a job from the queue by holding thequeuemutex
is blocked, and put to suspend using step 3. Since all idle threads are idle holdingcondmutex
andconditional
,resumePool
resets the flag andbroadcast
s for theconditional
.- The
waitToComplete
call waits onendconditional
, which is signaled by any thread wheneverwaitingThreads==numThreads
andsuspend==0
.
Implementation
Header : mythreads.h
#ifndef MYTHREADS_H
#define MYTHREADS_H
/* The main pool structure
*
* To find member descriptions, see mythreads.c .
*/
typedef struct ThreadPool ThreadPool;
/* The status enum to indicate any failure.
*
* These values can be compared to all the functions
* that returns an integer, to findout the status of
* the execution of the function.
*/
typedef enum Status{
MEMORY_UNAVAILABLE,
QUEUE_LOCK_FAILED,
QUEUE_UNLOCK_FAILED,
SIGNALLING_FAILED,
BROADCASTING_FAILED,
COND_WAIT_FAILED,
POOL_NOT_INITIALIZED,
POOL_STOPPED,
INVALID_NUMBER,
COMPLETED
} Status;
/* Creates a new thread pool with argument number of threads.
*
* When this method returns, and if the return value is not
* NULL, it is assured that all threads are initialized and
* in waiting state. If any thread fails to initialize,
* typically if the pthread_create method fails, a warning
* message is print on the stdout. This method also can fail
* in case of insufficient memory, which is rare, and a NULL
* is returned in that case.
*/
ThreadPool * createPool(unsigned int);
/* Waits till all the threads in the pool are finished.
*
* When this method returns, it is assured that all threads
* in the pool have finished executing, and in waiting state.
*/
void waitToComplete(ThreadPool *);
/* Destroys the argument pool.
*
* This method tries to stop all threads in the pool
* immediately, and destroys any resource that the pool has
* used in its lifetime. However, this method will not
* return until all threads have finished processing their
* present work. That is, this method will not halt any
* actively executing thread. Rather, it'll wait for the
* present jobs to complete, and will keep the threads from
* running any new jobs. This method then joins all the
* threads, destroys all synchronization objects, and frees
* any remaining jobs, finally freeing the pool itself.
*/
void destroyPool(ThreadPool *);
/* Add a new job to the pool.
*
* This method adds a new job, that is a worker function,
* to the pool. The execution of the function is performed
* asynchronously, however. This method only assures the
* addition of the job to the job queue. The job queue is
* ordered in FIFO style, i.e., for this job to execute,
* all the jobs that has been added previously has to be
* executed first. This method doesn't guarantee the thread
* on which the job may execute. Rather, when its turn comes,
* the thread which first becomes idle, executes this job.
* When all threads are idle, any one of them wakes up and
* executes this function asynchronously.
*/
int addJobToPool(ThreadPool *, void (*func)(void *), void *);
/* Add some new threads to the pool.
*
* This function adds specified number of new threads to the
* argument threadpool. When this function returns, it is
* ensured that a new thread has been added to the pool.
* However, this new thread will only come to effect if there
* are remainder jobs, that is the job queue is not presently
* empty. This new thread will not steal any running jobs
* from the running threads. Occasionally, this method will
* return some error codes, typically due to the failure of
* pthread_create, or for insufficient memory. These error
* codes can be compared using the Status enum above.
*/
int addThreadsToPool(ThreadPool *, int);
/* Suspend all currently executing threads in the pool.
*
* This method pauses all currently executing threads in
* the pool. When the method call returns, it is guaranteed
* that all threads have been suspended at appropiate
* breakpoints. However, if a thread is presently executing,
* it is not forcefully suspended. Rather, the call waits
* till the thread completes the present job, and then
* halts the thread.
*/
void suspendPool(ThreadPool *);
/* Resume a suspended pool.
*
* This method resumes a pool, aynchronously, if and only
* if the pool was suspended before. When the method returns,
* it is guaranteed the all the threads of the pool will
* wake up from suspend very soon in future. This method
* fails if the pool was not previously suspended.
*/
void resumePool(ThreadPool *);
/* Remove an existing thread from the pool.
*
* This function will remove one thread from the threadpool,
* asynchronously. That is, this method will not stop any
* active threads, rather it'll merely indicate the wish.
* When any active thread will become idle, before becoming
* active again the thread will check if removal is wished.
* If it is wished, then thread will immediately exit. This
* method can run N times to remove N threads, however it
* has some serious consequences. If N is greater than the
* number of threads present in the pool, say M, then all
* M threads will be stopped. However, next (N-M) threads
* will also immediately exit when added to the pool. If
* all M threads are removed from the queue, then the job
* queue will halt, and when a new thread will be added to
* the pool, the queue will automatically resume from the
* position where it stopped.
*/
void removeThreadFromPool(ThreadPool *);
#endif
Library : mythreads.c
Structure definitions
/* A singly linked list of threads. This list
* gives tremendous flexibility managing the
* threads at runtime.
*/
typedef struct ThreadList {
pthread_t thread; // The thread object
struct ThreadList *next; // Link to next thread
} ThreadList;
/* A singly linked list of worker functions. This
* list is implemented as a queue to manage the
* execution in the pool.
*/
typedef struct Job {
void (*function)(void *); // The worker function
void *args; // Argument to the function
struct Job *next; // Link to next Job
} Job;
/* The core pool structure. This is the only
* user accessible structure in the API. It contains
* all the primitives necessary to provide
* synchronization between the threads, along with
* dynamic management and execution control.
*/
struct ThreadPool {
/* The FRONT of the thread queue in the pool.
* It typically points to the first thread
* created in the pool.
*/
ThreadList * threads;
/* The REAR of the thread queue in the pool.
* Points to the last, and most young thread
* added to the pool.
*/
ThreadList * rearThreads;
/* Number of threads in the pool. As this can
* grow dynamically, access and modification
* of it is bounded by a mutex.
*/
unsigned int numThreads;
/* The indicator which indicates the number
* of threads to remove. If this is equal to
* N, then N threads will be removed from the
* pool when they are idle. All threads
* typically check the value of this variable
* before executing a job, and if finds the
* value >0, immediately exits.
*/
unsigned int removeThreads;
/* Denotes the number of idle threads in the
* pool at any given instant of time. This value
* is used to check if all threads are idle,
* and thus triggering the end of job queue or
* the initialization of the pool, whichever
* applicable.
*/
volatile unsigned int waitingThreads;
/* Denotes whether the pool is presently
* initalized or not. This variable is used to
* busy wait after the creation of the pool
* to ensure all threads are in waiting state.
*/
volatile unsigned short isInitialized;
/* The main mutex for the job queue. All
* operations on the queue is done after locking
* this mutex to ensure consistency.
*/
pthread_mutex_t queuemutex;
/* This mutex indicates whether a thread is
* presently in idle state or not, and is used
* in conjunction with the conditional below.
*/
pthread_mutex_t condmutex;
/* Conditional to ensure conditional wait.
* When idle, each thread waits on this
* conditional, which is signaled by various
* methods to indicate the wake of the thread.
*/
pthread_cond_t conditional;
/* Ensures pool state. When the pool is running,
* this is set to 1. All the threads loop on
* this condition, and exits immediately when
* it is set to 0, which happens when the pool
* is destroyed.
*/
_Atomic unsigned short run;
/* Used to assign unique thread IDs to each
* running threads. It is an always incremental
* counter.
*/
unsigned int threadID;
/* The FRONT of the job queue, which typically
* points to the job to be executed next.
*/
Job *FRONT;
/* The REAR of the job queue, which points
* to the job last added in the pool.
*/
Job *REAR;
/* Mutex used to denote the end of the job
* queue, which triggers the function
* waitForComplete.
*/
pthread_mutex_t endmutex;
/* Conditional to signal the end of the job
* queue.
*/
pthread_cond_t endconditional;
/* Variable to impose and withdraw
* the suspend state.
*/
unsigned short suspend;
};
1. Core executor function
static void *threadExecutor(void *pl){
ThreadPool *pool = (ThreadPool *)pl; // Get the pool
pthread_mutex_lock(&pool->queuemutex); // Lock the mutex
unsigned int id = ++pool->threadID; // Get an id
pthread_mutex_unlock(&pool->queuemutex); // Release the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Starting execution loop!", id);
#endif
//Start the core execution loop
while(pool->run){ // run==1, we should get going
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Trying to lock the mutex!", id);
#endif
pthread_mutex_lock(&pool->queuemutex); //Lock the queue mutex
if(pool->removeThreads>0){ // A thread is needed to be removed
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Removal signalled! Exiting the execution loop!", id);
#endif
pthread_mutex_lock(&pool->condmutex);
pool->waitingThreads++; // Register as forever waiting thread
pthread_mutex_unlock(&pool->condmutex);
break; // Exit the loop
}
Job *presentJob = pool->FRONT; // Get the first job
if(presentJob==NULL || pool->suspend){ // Queue is empty!
#ifdef DEBUG
if(presentJob==NULL)
printf("\n[THREADPOOL:THREAD%u:INFO] Queue is empty! Unlocking the mutex!", id);
else
printf("\n[THREADPOOL:THREAD%u:INFO] Suspending thread!", id);
#endif
pthread_mutex_unlock(&pool->queuemutex); // Unlock the mutex
pthread_mutex_lock(&pool->condmutex); // Hold the conditional mutex
pool->waitingThreads++; // Add yourself as a waiting thread
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Waiting threads %u!", id, pool->waitingThreads);
#endif
if(!pool->suspend && pool->waitingThreads==pool->numThreads){ // All threads are idle
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] All threads are idle now!", id);
#endif
if(pool->isInitialized){ // Pool is initialized, time to trigger the end conditional
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Signaling endconditional!" ,id);
fflush(stdout);
#endif
pthread_mutex_lock(&pool->endmutex); // Lock the mutex
pthread_cond_signal(&pool->endconditional); // Signal the end
pthread_mutex_unlock(&pool->endmutex); // Release the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Signalled any monitor!", id);
#endif
}
else // We are initializing the pool
pool->isInitialized = 1; // Break the busy wait
}
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Going to conditional wait!", id);
fflush(stdout);
#endif
pthread_cond_wait(&pool->conditional, &pool->condmutex); // Idle wait on conditional
/* Woke up! */
if(pool->waitingThreads>0) // Unregister youself as a waiting thread
pool->waitingThreads--;
pthread_mutex_unlock(&pool->condmutex); // Woke up! Release the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Woke up from conditional wait!", id);
#endif
}
else{ // There is a job in the pool
pool->FRONT = pool->FRONT->next; // Shift FRONT to right
if(pool->FRONT==NULL) // No jobs next
pool->REAR = NULL; // Reset the REAR
#ifdef DEBUG
else
printQueue(pool->FRONT);
printf("\n[THREADPOOL:THREAD%u:INFO] Job recieved! Unlocking the mutex!", id);
#endif
pthread_mutex_unlock(&pool->queuemutex); // Unlock the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Executing the job now!", id);
#endif
presentJob->function(presentJob->args); // Execute the job
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Job completed! Releasing memory for the job!", id);
#endif
free(presentJob); // Release memory for the job
}
}
if(pool->run){ // We exited, but the pool is running! It must be force removal!
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Releasing the lock!", id);
#endif
pool->removeThreads--; // Alright, I'm shutting now
pthread_mutex_unlock(&pool->queuemutex); // We broke the loop, release the mutex now
#ifdef DEBUG
printf("\n[THREADPOOL:THREAD%u:INFO] Stopping now..", id);
#endif
}
#ifdef DEBUG
else // The pool is stopped
printf("\n[THREADPOOL:THREAD%u:INFO] Pool has been stopped! Exiting now..", id);
#endif
pthread_exit((void *)COMPLETED); // Exit
}
2. Create pool
ThreadPool * createPool(unsigned int numThreads){
ThreadPool * pool = (ThreadPool *)malloc(sizeof(ThreadPool)); // Allocate memory for the pool
if(pool==NULL){ // Oops!
printf("[THREADPOOL:INIT:ERROR] Unable to allocate memory for the pool!");
return NULL;
}
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] Allocated %lu bytes for new pool!", sizeof(ThreadPool));
#endif
// Initialize members with default values
pool->numThreads = 0;
pool->FRONT = NULL;
pool->REAR = NULL;
pool->waitingThreads = 0;
pool->isInitialized = 0;
pool->removeThreads = 0;
pool->suspend = 0;
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] Initializing mutexes!");
#endif
pthread_mutex_init(&pool->queuemutex, NULL); // Initialize queue mutex
pthread_mutex_init(&pool->condmutex, NULL); // Initialize idle mutex
pthread_mutex_init(&pool->endmutex, NULL); // Initialize end mutex
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] Initiliazing conditionals!");
#endif
pthread_cond_init(&pool->endconditional, NULL); // Initialize end conditional
pthread_cond_init(&pool->conditional, NULL); // Initialize idle conditional
pool->run = 1; // Start the pool
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] Successfully initialized all members of the pool!");
printf("\n[THREADPOOL:INIT:INFO] Initializing %u threads..",numThreads);
#endif
addThreadsToPool(pool, numThreads); // Add threads to the pool
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] Waiting for all threads to start..");
#endif
while(!pool->isInitialized); // Busy wait till the pool is initialized
#ifdef DEBUG
printf("\n[THREADPOOL:INIT:INFO] New threadpool initialized successfully!");
#endif
return pool;
}
3. Add threads
int addThreadsToPool(ThreadPool *pool, int threads){
if(pool==NULL){ // Sanity check
printf("\n[THREADPOOL:ADD:ERROR] Pool is not initialized!");
return POOL_NOT_INITIALIZED;
}
if(!pool->run){
printf("\n[THREADPOOL:ADD:ERROR] Pool already stopped!");
return POOL_STOPPED;
}
if(threads<1){
printf("\n[THREADPOOL:ADD:WARNING] Tried to add invalid number of threads %d!", threads);
return INVALID_NUMBER;
}
int rc = 0;
#ifdef DEBUG
printf("\n[THREADPOOL:ADD:INFO] Holding the condmutex..");
#endif
pthread_mutex_lock(&pool->condmutex);
pool->numThreads += threads; // Increment the thread count to prevent idle signal
pthread_mutex_unlock(&pool->condmutex);
#ifdef DEBUG
printf("\n[THREADPOOL:ADD:INFO] Speculative increment done!");
#endif
int i = 0;
for(i=0;i<threads;i++){
ThreadList *newThread = (ThreadList *)malloc(sizeof(ThreadList)); // Allocate a new thread
newThread->next = NULL;
rc = pthread_create(&newThread->thread, NULL, threadExecutor, (void *)pool); // Start the thread
if(rc){
printf("\n[THREADPOOL:ADD:ERROR] Unable to create thread %d(error code %d)!", (i+1), rc);
pthread_mutex_lock(&pool->condmutex);
pool->numThreads--;
pthread_mutex_unlock(&pool->condmutex);
}
else{
#ifdef DEBUG
printf("\n[THREADPOOL:ADD:INFO] Initialized thread %u!", (i+1));
#endif
if(pool->rearThreads==NULL) // This is the first thread
pool->threads = pool->rearThreads = newThread;
else // There are threads in the pool
pool->rearThreads->next = newThread;
pool->rearThreads = newThread; // This is definitely the last thread
}
}
return rc;
}
4. Remove thread
void removeThreadFromPool(ThreadPool *pool){
if(pool==NULL || !pool->isInitialized){
printf("\n[THREADPOOL:REM:ERROR] Pool is not initialized!");
return;
}
if(!pool->run){
printf("\n[THREADPOOL:REM:WARNING] Removing thread from a stopped pool!");
return;
}
#ifdef DEBUG
printf("\n[THREADPOOL:REM:INFO] Acquiring the lock!");
#endif
pthread_mutex_lock(&pool->queuemutex); // Lock the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:REM:INFO] Incrementing the removal count");
#endif
pool->removeThreads++; // Indicate the willingness of removal
pthread_mutex_unlock(&pool->queuemutex); // Unlock the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:REM:INFO] Waking up any sleeping threads!");
#endif
pthread_mutex_lock(&pool->condmutex); // Lock the wait mutex
pthread_cond_signal(&pool->conditional); // Signal any idle threads
pthread_mutex_unlock(&pool->condmutex); // Release the wait mutex
#ifdef DEBUG
printf("\n[THREADPOOL:REM:INFO] Signalling complete!");
#endif
}
5. Add job
int addJobToPool(ThreadPool *pool, void (*func)(void *args), void *args){
if(pool==NULL || !pool->isInitialized){ // Sanity check
printf("\n[THREADPOOL:EXEC:ERROR] Pool is not initialized!");
return POOL_NOT_INITIALIZED;
}
if(!pool->run){
printf("\n[THREADPOOL:EXEC:ERROR] Trying to add a job in a stopped pool!");
return POOL_STOPPED;
}
Job *newJob = (Job *)malloc(sizeof(Job)); // Allocate memory
if(newJob==NULL){ // Who uses 2KB RAM nowadays?
printf("\n[THREADPOOL:EXEC:ERROR] Unable to allocate memory for new job!");
return MEMORY_UNAVAILABLE;
}
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Allocated %lu bytes for new job!", sizeof(Job));
#endif
newJob->function = func; // Initialize the function
newJob->args = args; // Initialize the argument
newJob->next = NULL; // Reset the link
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Locking the queue for insertion of the job!");
#endif
pthread_mutex_lock(&pool->queuemutex); // Inserting the job, lock the queue
if(pool->FRONT==NULL) // This is the first job
pool->FRONT = pool->REAR = newJob;
else // There are other jobs
pool->REAR->next = newJob;
pool->REAR = newJob; // This is the last job
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Inserted the job at the end of the queue!");
#endif
if(pool->waitingThreads>0){ // There are some threads sleeping, wake'em up
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Signaling any idle thread!");
#endif
pthread_mutex_lock(&pool->condmutex); // Lock the mutex
pthread_cond_signal(&pool->conditional); // Signal the conditional
pthread_mutex_unlock(&pool->condmutex); // Release the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Signaling successful!");
#endif
}
pthread_mutex_unlock(&pool->queuemutex); // Finally, release the queue
#ifdef DEBUG
printf("\n[THREADPOOL:EXEC:INFO] Unlocked the mutex!");
#endif
return 0;
}
6. Wait for completion
void waitToComplete(ThreadPool *pool){
if(pool==NULL || !pool->isInitialized){ // Sanity check
printf("\n[THREADPOOL:WAIT:ERROR] Pool is not initialized!");
return;
}
if(!pool->run){
printf("\n[THREADPOOL:WAIT:ERROR] Pool already stopped!");
return;
}
pthread_mutex_lock(&pool->condmutex);
if(pool->numThreads==pool->waitingThreads){
#ifdef DEBUG
printf("\n[THREADPOOL:WAIT:INFO] All threads are already idle!");
#endif
pthread_mutex_unlock(&pool->condmutex);
return;
}
pthread_mutex_unlock(&pool->condmutex);
#ifdef DEBUG
printf("\n[THREADPOOL:WAIT:INFO] Waiting for all threads to become idle..");
#endif
pthread_mutex_lock(&pool->endmutex); // Lock the mutex
pthread_cond_wait(&pool->endconditional, &pool->endmutex); // Wait for end signal
pthread_mutex_unlock(&pool->endmutex); // Unlock the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:WAIT:INFO] All threads are idle now!");
#endif
}
7. Suspend pool
void suspendPool(ThreadPool *pool){
if(pool==NULL || !pool->isInitialized){ // Sanity check
printf("\n[THREADPOOL:SUSP:ERROR] Pool is not initialized!");
return;
}
if(!pool->run){ // Pool is stopped
printf("\n[THREADPOOL:SUSP:ERROR] Pool already stopped!");
return;
}
if(pool->suspend){ // Pool is already suspended
printf("\n[THREADPOOL:SUSP:ERROR] Pool already suspended!");
return;
}
#ifdef DEBUG
printf("\n[THREADPOOL:SUSP:INFO] Initiating suspend..");
#endif
pthread_mutex_lock(&pool->queuemutex); // Lock the queue
pool->suspend = 1; // Present the wish for suspension
pthread_mutex_unlock(&pool->queuemutex); // Release the queue
#ifdef DEBUG
printf("\n[THREADPOOL:SUSP:INFO] Waiting for all threads to be idle..");
fflush(stdout);
#endif
while(pool->waitingThreads<pool->numThreads); // Busy wait till all threads are idle
#ifdef DEBUG
printf("\n[THREADPOOL:SUSP:INFO] Successfully suspended all threads!");
#endif
}
8. Resume pool
void resumePool(ThreadPool *pool){
if(pool==NULL || !pool->isInitialized){ // Sanity check
printf("\n[THREADPOOL:RESM:ERROR] Pool is not initialized!");
return;
}
if(!pool->run){ // Pool stopped
printf("\n[THREADPOOL:RESM:ERROR] Pool is not running!");
return;
}
if(!pool->suspend){ // Pool is not suspended
printf("\n[THREADPOOL:RESM:WARNING] Pool is not suspended!");
return;
}
#ifdef DEBUG
printf("\n[THREADPOOL:RESM:INFO] Initiating resume..");
#endif
pthread_mutex_lock(&pool->condmutex); // Lock the conditional
pool->suspend = 0; // Reset the state
#ifdef DEBUG
printf("\n[THREADPOOL:RESM:INFO] Waking up all threads..");
#endif
pthread_cond_broadcast(&pool->conditional); // Wake up all threads
pthread_mutex_unlock(&pool->condmutex); // Release the mutex
#ifdef DEBUG
printf("\n[THREADPOOL:RESM:INFO] Resume complete!");
#endif
}
9. Destroy pool
void destroyPool(ThreadPool *pool){
if(pool==NULL || !pool->isInitialized){ // Sanity check
printf("\n[THREADPOOL:EXIT:ERROR] Pool is not initialized!");
return;
}
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Trying to wakeup all waiting threads..");
#endif
pool->run = 0; // Stop the pool
pthread_mutex_lock(&pool->condmutex);
pthread_cond_broadcast(&pool->conditional); // Wake up all idle threads
pthread_mutex_unlock(&pool->condmutex);
int rc;
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Waiting for all threads to exit..");
#endif
ThreadList *list = pool->threads, *backup = NULL; // For travsersal
Status stat;
void *c = &stat;
unsigned int i = 0;
while(list!=NULL){
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Joining thread %u..", i);
#endif
rc = pthread_join(list->thread, &c); // Wait for ith thread to join
if(rc)
printf("\n[THREADPOOL:EXIT:WARNING] Unable to join THREAD%u!", i);
#ifdef DEBUG
else
printf("\n[THREADPOOL:EXIT:INFO] THREAD%u joined!", i);
#endif
backup = list;
list = list->next; // Continue
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Releasing memory for THREAD%u..", i);
#endif
free(backup); // Free ith thread
i++;
}
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Destroying remaining jobs..");
#endif
// Delete remaining jobs
while(pool->FRONT!=NULL){
Job *j = pool->FRONT;
pool->FRONT = pool->FRONT->next;
free(j);
}
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Destroying conditionals..");
#endif
rc = pthread_cond_destroy(&pool->conditional); // Destroying idle conditional
rc = pthread_cond_destroy(&pool->endconditional); // Destroying end conditional
if(rc)
printf("\n[THREADPOOL:EXIT:WARNING] Unable to destroy one or more conditionals (error code %d)!", rc);
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Destroying the mutexes..");
#endif
rc = pthread_mutex_destroy(&pool->queuemutex); // Destroying queue lock
rc = pthread_mutex_destroy(&pool->condmutex); // Destroying idle lock
rc = pthread_mutex_destroy(&pool->endmutex); // Destroying end lock
if(rc)
printf("\n[THREADPOOL:EXIT:WARNING] Unable to destroy one or mutexes (error code %d)!", rc);
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Releasing memory for the pool..");
#endif
free(pool); // Release the pool
#ifdef DEBUG
printf("\n[THREADPOOL:EXIT:INFO] Pool destruction completed!");
#endif
}
An workable example can be found at GitHub!
1 Answer 1
Debug output
For each line of debug output you write 3 lines of code. You may also want to write debug output to stderr instead of stdout so you'll be able to isolate debug output from regular output. I suggest defining something like the following in a common header:
#ifdef ENABLE_DEBUG_OUTPUT
# define DEBUG(FMT, ...) fprintf(stderr, FMT, __VA_ARGS__)
#else
# define DEBUG(FMT, ...)
#endif
instead of writing
#ifdef DEBUG
printf("stuff and %s\n", "other stuff");
#endif
you can now write
DEBUG("stuff and %s\n", "other stuff");
That output will only happen if you define ENABLE_DEBUG_OUTPUT
.
Linebreaks
You always put the linebreak at the beginning of your format string (printf("\nstuff")
). This may lead to strange output (demonstrated with bash, but it still applies):
user@host:~$ printf "\nstuff"
stuffuser@host:~$
Always put linebreaks at the end of your format string:
printf("stuff\n");
Struct initialization
Instead of writing
// Initialize members with default values
pool->numThreads = 0;
pool->FRONT = NULL;
pool->REAR = NULL;
pool->waitingThreads = 0;
pool->isInitialized = 0;
pool->removeThreads = 0;
pool->suspend = 0;
I'd just write
memset(pool, 0, sizeof(*pool));
That is both shorter and sure to actually zero the whole structure even if you add fields in the future.
Suspend/Resume
Are there use cases for you being able to suspend/resume your threadpool? Is it useful for something? If you can't think of a use case then don't include it in your API. The more surface your API exposes the harder it is to maintain backwards compatibility in the future.
Lots of conds/mutexes
I think you can actually reduce the number of mutexes quite a bit. Just one mutex to lock access to the ThreadPool
and one condition var to wake up threads should be enough to implement the exact same API you're currently using. Lock the mutex whenever you interact with the ThreadPool
. Always notify your condition var when a job is added. broadcast on the same condition var when the pool is destroyed.
Pseudocode for function executed in thread:
lock(pool->mutex)
while (pool->running) {
var job = get_next_job(pool->queue);
if (job is null) {
// No jobs. Go to sleep.
wait(pool->cond);
continue;
}
else {
unlock(pool->mutex);
execute(job);
lock(pool->mutex);
}
}
unlock(pool->mutex)
-
\$\begingroup\$ The
memset()
isn't necessarily the same, on platforms where null pointers aren't stored as all-bits-zero. \$\endgroup\$Toby Speight– Toby Speight2018年10月25日 14:44:49 +00:00Commented Oct 25, 2018 at 14:44 -
\$\begingroup\$ According to pubs.opengroup.org/onlinepubs/9699919799/basedefs/stddef.h.html for POSIX systems NULL expands to (void*)0. The non-POSIX compatibility ship has already sailed with the usage of pthreads. \$\endgroup\$Richard– Richard2018年10月25日 14:58:17 +00:00Commented Oct 25, 2018 at 14:58
-
\$\begingroup\$ That's absolutely correct (and that's just standard C), but
0
cast to a pointer isn't necessarily represented in memory by a sequence of zero bytes. Assigning the value0
to a pointer (or comparing a pointer against0
) means that compiler does any necessary conversion to/from the platform's null representation. When you go behind the type system's back, none of that is done for you. \$\endgroup\$Toby Speight– Toby Speight2018年10月25日 15:01:52 +00:00Commented Oct 25, 2018 at 15:01 -
\$\begingroup\$ I stand corrected. Thank you for this insight. Do you agree though that for relatively recent architectures
memset
for struct init is a safe thing to do? \$\endgroup\$Richard– Richard2018年10月25日 15:04:43 +00:00Commented Oct 25, 2018 at 15:04 -
1\$\begingroup\$ I'd probably avoid it, and instead copy from a static pre-initialised struct. It should always be safe to
memset()
then overwrite the pointer members. I've never had to use systems (e.g. 8086) with segmented memory or other non-zero null-pointers (I went straight from 68k and PA-RISC to i386), but I always endeavour not to introduce unnecessary non-portability. BTW, great review in general - I've voted. :) \$\endgroup\$Toby Speight– Toby Speight2018年10月25日 15:11:27 +00:00Commented Oct 25, 2018 at 15:11
Explore related questions
See similar questions with these tags.