6
\$\begingroup\$

I wrote a simple thread pool, which works pretty well. I would like to see your review of it.

One important feature that I needed in the pool is the ability to wait until all the tasks that I sent to the pool are complete (so that later I could send other tasks that are dependent on the result of the previous tasks).

This is the basic task which you inherit from, to send tasks:

class Task
{
public :
 virtual void ExecuteTask() = 0;
};

This is the .h file of the Thread pool:

#include<time.h>
#include <Windows.h>
#include<list>
using namespace std;
class ThreadPool
{
public :
 ThreadPool(int threadCount=0);
 bool ScheduleTask(Task* task);
 bool WaitForWorkToBeFinished(DWORD dwMilliseconds);
 ~ThreadPool();
private :
 static DWORD WINAPI ThreadStart(LPVOID threadParameters);
 DWORD ThreadMain();
 list<Task*> mTasks;
 HANDLE* mThreads;
 int mThreadCount;
 CRITICAL_SECTION mCriticalSection;
 CONDITION_VARIABLE mConditionVariable;
 CONDITION_VARIABLE mConditionVariableTaskFinished;
 int mNumberOfTasksNotFinished;
 bool mConsum;
};

This is the CPP file:

#include "ThreadPool.h"
ThreadPool::ThreadPool(int threadCount)
{
 mThreadCount=threadCount;
 mThreads = new HANDLE[mThreadCount]; //This handles will be used to wait upon them
 InitializeCriticalSection(&mCriticalSection);
 InitializeConditionVariable(&mConditionVariable);
 InitializeConditionVariable(&mConditionVariableTaskFinished);
 mConsum = true;
 mNumberOfTasksNotFinished = 0;
 for(int i = 0; i< mThreadCount;i++)
 mThreads[i] = CreateThread(NULL, 0, ThreadStart, (void*)this, 0,NULL);
}
bool ThreadPool::ScheduleTask(Task* task)
{
 EnterCriticalSection(&mCriticalSection);
 mTasks.push_back(task); //This is a simple std::list
 mNumberOfTasksNotFinished++;
 LeaveCriticalSection(&mCriticalSection);
 WakeConditionVariable(&mConditionVariable); //Waking up the threads so they will know there is a job to do
 return true;
}
DWORD WINAPI ThreadPool::ThreadStart(LPVOID threadParameters) //Just a static function to help me use member function as a thread
{
 ThreadPool* referenceToThis = (ThreadPool*)threadParameters;
 return referenceToThis->ThreadMain();
}
DWORD ThreadPool::ThreadMain() //The main thread function
{
 do
 {
 Task* currentTask;
 EnterCriticalSection(&mCriticalSection);
 while(mTasks.size() ==0 && mConsum)
 SleepConditionVariableCS(&mConditionVariable,&mCriticalSection, INFINITE);
 if(!mConsum) //Destructor ordered on abort
 {
 LeaveCriticalSection(&mCriticalSection);
 return 0;
 }
 //If we got here, we successfully aquired the lock and the list<Task> is not empty
 currentTask = mTasks.front();
 mTasks.pop_front();
 LeaveCriticalSection(&mCriticalSection);
 currentTask->ExecuteTask();
 //delete currentTask;
 EnterCriticalSection(&mCriticalSection);
 mNumberOfTasksNotFinished--;
 LeaveCriticalSection(&mCriticalSection);
 WakeConditionVariable(&mConditionVariableTaskFinished);
 }while(mConsum);
 return 0;
}
//This function is very important, it gives the user the ability to send 10 tasks to the thread pool
// then to wait till all the tasks completed, and give the next 10 which are dependand on the result of the previous ones.
bool ThreadPool::WaitForWorkToBeFinished(DWORD dwMilliseconds)
{
 EnterCriticalSection(&mCriticalSection);
 while(mNumberOfTasksNotFinished!=0)
 SleepConditionVariableCS(&mConditionVariableTaskFinished,&mCriticalSection, INFINITE);
 LeaveCriticalSection(&mCriticalSection);
 return true;
}
ThreadPool::~ThreadPool()
{
 mConsum = false;
 WakeAllConditionVariable (&mConditionVariable);
 WaitForMultipleObjects(mThreadCount,mThreads, true, INFINITE);
 DeleteCriticalSection(&mCriticalSection);
}
ChrisW
13k1 gold badge35 silver badges76 bronze badges
asked Jan 31, 2014 at 11:57
\$\endgroup\$
6
  • 1
    \$\begingroup\$ Yes its fun writing your own thread pool. But this has been done to death before. Use one of the available ones that has been thoroughly tested and de-bugged. \$\endgroup\$ Commented Jan 31, 2014 at 17:37
  • 1
    \$\begingroup\$ Its 2014. Can we please start using C++11 at least with its built in standard threads. All modern compilers now support it. Most modern compilers will support C++14 by now (yes I know 2014 has only just started and they have not offically voted on it buts its so close and the compilers are so ready). \$\endgroup\$ Commented Jan 31, 2014 at 17:40
  • \$\begingroup\$ What happens if the task throws an exception? In most threading libraries if an exception causes a thread to unroll past its start point the application will exit. \$\endgroup\$ Commented Jan 31, 2014 at 17:45
  • \$\begingroup\$ You are leaking all but one of the resources you created. \$\endgroup\$ Commented Jan 31, 2014 at 17:48
  • \$\begingroup\$ Loki can you please elaborate ? i implementing it to study about threading and to understand how things work. It is important for me to know things thoroughly (I'm also preparing for interviews, and to implement a thread pool is totally possible question). \$\endgroup\$ Commented Jan 31, 2014 at 21:23

2 Answers 2

7
\$\begingroup\$

A few points:

Your EnterCriticalSection/LeaveCriticalSection code should be placed into a scoped RAII object. This saves you from explicitly calling leave, makes your code smaller, and avoids a deadlock if the synchronized code throws.

Do something similar with everything that requires cleanup (like mThreads = new HANDLE[mThreadCount];).

(削除) The interface of your class should receive the tasks by reference and store them by pointer within. This way, you make it clear to client code that the class doesn't take ownership of the tasks it receives. (削除ここまで) (see Third Edit below).

Use reinterpret_cast instead of C-style casts (because C-style casts = bad, mmmkay?!)

using namespace std;

Don't do this. It greatly restricts what client code (whoever includes your class) can write, and the errors generated in client code are almost always obscure. You're better off repeating that in local scopes (if you must) or (better still) not using it (consider writing using std::string; and similar, in the scopes you use the type, or simply declaring std::string x;).

Edit: If bool ThreadPool::ScheduleTask(Task* task) never returns false, why does it return bool? Shouldn't it be void?

Second Edit: Consder using an unsigned int for the number of threads (mThreadCount).

Third edit ( Hi Loki :) )

Regarding ownership of the threads and object lifetime management (how to implement safely and indicate it in the public interface of the class).

Consider this client code:

ThreadPool runner();
// bad bad code: don't call new with raw pointers like this in practice
// but easy to write to make my point below.
BackgroundTask *t = new BackgroundTask(); // class BackgroundTask: public Task
 // defined elsewhere
runner.ScheduleTask(t);

What you don't know looking at this code: should you delete t after the ThreadPool completes running? Will runner delete the task itself? If it does, t will point to deleted memory after runner finishes. If it doesn't, you will have to remember to check when runner has finished, and call delete explicitly, every single time you use the thread pool.

You could implement the ThreadPool to delete all tasks after execution, but then, an enterprising programmer (maybe yourself) could write:

ThreadPool runner();
BackgroundTask t();
runner.ScheduleTask(&t); // pass address of automatic storage object

and this will crash your program when your ThreadPool deletes the task.

Another client code alternative, this one much much worse:

void crashy(ThreadPool& pool) {
 BackgroundTask t;
 pool.ScheduleTask(&t);
} // t goes out of scope here and is destroyed but p still holds it's address
ThreadPool p;
crashy(p); // some compilers tend to happily accept this (maybe with a warning)
// p now has a pointer to a deleted BackgroundTask instance

The interface doesn't indicate ownership and makes you interrupt your coding, to look at the implementation, just so you know how to write client code that doesn't leak or crash.

Solutions:

You could change your interface to take a Task& instead. (Similar to passing by pointer), the only drawback is that you need to make sure the tasks you pass to ScheduleTask must still be in scope until the ThreadPool stops running (in other words, you can still write the nice crashy function).

This is better, as at least you don't need to wonder if it will delete the task (which passing by pointer may, or may not suggest).

Better yet, change your interface to take a std::unique_ptr instead:

class ThreadPool
{
public :
 bool ScheduleTask(std::unique_ptr<Task> task) // << std::unique_ptr by val.
 {
 EnterCriticalSection(&mCriticalSection);
 mTasks.push_back( std::move(task) ); // << notice std::move
 mNumberOfTasksNotFinished++;
 LeaveCriticalSection(&mCriticalSection); // also notice non-raii unlock;
 // if push_back throws on 
 // allocating new list node,
 // your whole app is deadlocked
 WakeConditionVariable(&mConditionVariable);
 return true;
 }
private :
 list<std::unique_ptr<Task>> mTasks; // << notice std::unique_ptr
 ... 
};

Client code:

ThreadPool runner();
std::unique_ptr<Task> p(new BackgroundTask());
runner.ScheduleTask(std::move(p)); // works, no lifetime management problems
 // and no ambguity either

Since the function takes a unique_ptr now, you now know you need to pass a pointer and you also know you are passing ownership as well.

answered Jan 31, 2014 at 14:30
\$\endgroup\$
7
  • \$\begingroup\$ stackoverflow.com/q/1452721/14065 \$\endgroup\$ Commented Jan 31, 2014 at 17:41
  • \$\begingroup\$ Talk about ownership of the task in ScheduleTask() and why you should not be using RAW pointers then you will get my vote. \$\endgroup\$ Commented Jan 31, 2014 at 17:42
  • 1
    \$\begingroup\$ +1: A nice alternative to std::list<std::unique_ptr<Task>> is boost::ptr_list<Task> \$\endgroup\$ Commented Feb 1, 2014 at 11:39
  • \$\begingroup\$ is it a good idea to make a copy of task (like vector/list in std) instead of using unique_ptr ? \$\endgroup\$ Commented Feb 2, 2014 at 14:33
  • \$\begingroup\$ @OopsUser, it could work, but (keeping in mind that Task is the base of other classes) you would need to preserve polymorphic behavior, so you would probably need a virtual Task* Task::clone() or similar for creating the copy. It complicates the design and adds to the effort necessary to implement Task specializations (but maybe there are other reasons for that). All in all, unless there is a specific reason, I would keep the design with std::unique_ptr. \$\endgroup\$ Commented Feb 2, 2014 at 19:30
2
\$\begingroup\$

Re. the comments to this question, I don't see (in the Win32 API documentation) any function required to delete a condition variable initialized using InitializeConditionVariable.

But, all the thread handles which you created using CreateThread should be closed: by using a corresponding call to CloseHandle(mThreads[i]) in your ThreadPool::~ThreadPool() destructor.


One more thing: your Task class should probably have a defined destructor; see for example:

Guideline #4: A base class destructor should be either public and virtual, or protected and nonvirtual.

answered Feb 1, 2014 at 0:25
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.