I wrote a simple thread pool, which works pretty well. I would like to see your review of it.
One important feature that I needed in the pool is the ability to wait until all the tasks that I sent to the pool are complete (so that later I could send other tasks that are dependent on the result of the previous tasks).
This is the basic task which you inherit from, to send tasks:
class Task
{
public :
virtual void ExecuteTask() = 0;
};
This is the .h file of the Thread pool:
#include<time.h>
#include <Windows.h>
#include<list>
using namespace std;
class ThreadPool
{
public :
ThreadPool(int threadCount=0);
bool ScheduleTask(Task* task);
bool WaitForWorkToBeFinished(DWORD dwMilliseconds);
~ThreadPool();
private :
static DWORD WINAPI ThreadStart(LPVOID threadParameters);
DWORD ThreadMain();
list<Task*> mTasks;
HANDLE* mThreads;
int mThreadCount;
CRITICAL_SECTION mCriticalSection;
CONDITION_VARIABLE mConditionVariable;
CONDITION_VARIABLE mConditionVariableTaskFinished;
int mNumberOfTasksNotFinished;
bool mConsum;
};
This is the CPP file:
#include "ThreadPool.h"
ThreadPool::ThreadPool(int threadCount)
{
mThreadCount=threadCount;
mThreads = new HANDLE[mThreadCount]; //This handles will be used to wait upon them
InitializeCriticalSection(&mCriticalSection);
InitializeConditionVariable(&mConditionVariable);
InitializeConditionVariable(&mConditionVariableTaskFinished);
mConsum = true;
mNumberOfTasksNotFinished = 0;
for(int i = 0; i< mThreadCount;i++)
mThreads[i] = CreateThread(NULL, 0, ThreadStart, (void*)this, 0,NULL);
}
bool ThreadPool::ScheduleTask(Task* task)
{
EnterCriticalSection(&mCriticalSection);
mTasks.push_back(task); //This is a simple std::list
mNumberOfTasksNotFinished++;
LeaveCriticalSection(&mCriticalSection);
WakeConditionVariable(&mConditionVariable); //Waking up the threads so they will know there is a job to do
return true;
}
DWORD WINAPI ThreadPool::ThreadStart(LPVOID threadParameters) //Just a static function to help me use member function as a thread
{
ThreadPool* referenceToThis = (ThreadPool*)threadParameters;
return referenceToThis->ThreadMain();
}
DWORD ThreadPool::ThreadMain() //The main thread function
{
do
{
Task* currentTask;
EnterCriticalSection(&mCriticalSection);
while(mTasks.size() ==0 && mConsum)
SleepConditionVariableCS(&mConditionVariable,&mCriticalSection, INFINITE);
if(!mConsum) //Destructor ordered on abort
{
LeaveCriticalSection(&mCriticalSection);
return 0;
}
//If we got here, we successfully aquired the lock and the list<Task> is not empty
currentTask = mTasks.front();
mTasks.pop_front();
LeaveCriticalSection(&mCriticalSection);
currentTask->ExecuteTask();
//delete currentTask;
EnterCriticalSection(&mCriticalSection);
mNumberOfTasksNotFinished--;
LeaveCriticalSection(&mCriticalSection);
WakeConditionVariable(&mConditionVariableTaskFinished);
}while(mConsum);
return 0;
}
//This function is very important, it gives the user the ability to send 10 tasks to the thread pool
// then to wait till all the tasks completed, and give the next 10 which are dependand on the result of the previous ones.
bool ThreadPool::WaitForWorkToBeFinished(DWORD dwMilliseconds)
{
EnterCriticalSection(&mCriticalSection);
while(mNumberOfTasksNotFinished!=0)
SleepConditionVariableCS(&mConditionVariableTaskFinished,&mCriticalSection, INFINITE);
LeaveCriticalSection(&mCriticalSection);
return true;
}
ThreadPool::~ThreadPool()
{
mConsum = false;
WakeAllConditionVariable (&mConditionVariable);
WaitForMultipleObjects(mThreadCount,mThreads, true, INFINITE);
DeleteCriticalSection(&mCriticalSection);
}
-
1\$\begingroup\$ Yes its fun writing your own thread pool. But this has been done to death before. Use one of the available ones that has been thoroughly tested and de-bugged. \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:37:54 +00:00Commented Jan 31, 2014 at 17:37
-
1\$\begingroup\$ Its 2014. Can we please start using C++11 at least with its built in standard threads. All modern compilers now support it. Most modern compilers will support C++14 by now (yes I know 2014 has only just started and they have not offically voted on it buts its so close and the compilers are so ready). \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:40:10 +00:00Commented Jan 31, 2014 at 17:40
-
\$\begingroup\$ What happens if the task throws an exception? In most threading libraries if an exception causes a thread to unroll past its start point the application will exit. \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:45:08 +00:00Commented Jan 31, 2014 at 17:45
-
\$\begingroup\$ You are leaking all but one of the resources you created. \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:48:03 +00:00Commented Jan 31, 2014 at 17:48
-
\$\begingroup\$ Loki can you please elaborate ? i implementing it to study about threading and to understand how things work. It is important for me to know things thoroughly (I'm also preparing for interviews, and to implement a thread pool is totally possible question). \$\endgroup\$OopsUser– OopsUser2014年01月31日 21:23:21 +00:00Commented Jan 31, 2014 at 21:23
2 Answers 2
A few points:
Your EnterCriticalSection
/LeaveCriticalSection
code should be placed into a scoped RAII object. This saves you from explicitly calling leave, makes your code smaller, and avoids a deadlock if the synchronized code throws.
Do something similar with everything that requires cleanup (like mThreads = new HANDLE[mThreadCount];
).
(削除) The interface of your class should receive the tasks by reference and store them by pointer within. This way, you make it clear to client code that the class doesn't take ownership of the tasks it receives. (削除ここまで) (see Third Edit below).
Use reinterpret_cast instead of C-style casts (because C-style casts = bad, mmmkay?!)
using namespace std;
Don't do this. It greatly restricts what client code (whoever includes your class) can write, and the errors generated in client code are almost always obscure. You're better off repeating that in local scopes (if you must) or (better still) not using it (consider writing using std::string;
and similar, in the scopes you use the type, or simply declaring std::string x;
).
Edit: If bool ThreadPool::ScheduleTask(Task* task)
never returns false, why does it return bool? Shouldn't it be void
?
Second Edit: Consder using an unsigned int for the number of threads (mThreadCount
).
Third edit ( Hi Loki :) )
Regarding ownership of the threads and object lifetime management (how to implement safely and indicate it in the public interface of the class).
Consider this client code:
ThreadPool runner();
// bad bad code: don't call new with raw pointers like this in practice
// but easy to write to make my point below.
BackgroundTask *t = new BackgroundTask(); // class BackgroundTask: public Task
// defined elsewhere
runner.ScheduleTask(t);
What you don't know looking at this code: should you delete t
after the ThreadPool completes running? Will runner
delete the task itself? If it does, t
will point to deleted memory after runner
finishes. If it doesn't, you will have to remember to check when runner has finished, and call delete explicitly, every single time you use the thread pool.
You could implement the ThreadPool to delete
all tasks after execution, but then, an enterprising programmer (maybe yourself) could write:
ThreadPool runner();
BackgroundTask t();
runner.ScheduleTask(&t); // pass address of automatic storage object
and this will crash your program when your ThreadPool deletes the task.
Another client code alternative, this one much much worse:
void crashy(ThreadPool& pool) {
BackgroundTask t;
pool.ScheduleTask(&t);
} // t goes out of scope here and is destroyed but p still holds it's address
ThreadPool p;
crashy(p); // some compilers tend to happily accept this (maybe with a warning)
// p now has a pointer to a deleted BackgroundTask instance
The interface doesn't indicate ownership and makes you interrupt your coding, to look at the implementation, just so you know how to write client code that doesn't leak or crash.
Solutions:
You could change your interface to take a Task&
instead. (Similar to passing by pointer), the only drawback is that you need to make sure the tasks you pass to ScheduleTask must still be in scope until the ThreadPool stops running (in other words, you can still write the nice crashy
function).
This is better, as at least you don't need to wonder if it will delete the task (which passing by pointer may, or may not suggest).
Better yet, change your interface to take a std::unique_ptr instead:
class ThreadPool
{
public :
bool ScheduleTask(std::unique_ptr<Task> task) // << std::unique_ptr by val.
{
EnterCriticalSection(&mCriticalSection);
mTasks.push_back( std::move(task) ); // << notice std::move
mNumberOfTasksNotFinished++;
LeaveCriticalSection(&mCriticalSection); // also notice non-raii unlock;
// if push_back throws on
// allocating new list node,
// your whole app is deadlocked
WakeConditionVariable(&mConditionVariable);
return true;
}
private :
list<std::unique_ptr<Task>> mTasks; // << notice std::unique_ptr
...
};
Client code:
ThreadPool runner();
std::unique_ptr<Task> p(new BackgroundTask());
runner.ScheduleTask(std::move(p)); // works, no lifetime management problems
// and no ambguity either
Since the function takes a unique_ptr now, you now know you need to pass a pointer and you also know you are passing ownership as well.
-
\$\begingroup\$ stackoverflow.com/q/1452721/14065 \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:41:26 +00:00Commented Jan 31, 2014 at 17:41
-
\$\begingroup\$ Talk about ownership of the
task
inScheduleTask()
and why you should not be using RAW pointers then you will get my vote. \$\endgroup\$Loki Astari– Loki Astari2014年01月31日 17:42:08 +00:00Commented Jan 31, 2014 at 17:42 -
1\$\begingroup\$ +1: A nice alternative to
std::list<std::unique_ptr<Task>>
isboost::ptr_list<Task>
\$\endgroup\$Loki Astari– Loki Astari2014年02月01日 11:39:47 +00:00Commented Feb 1, 2014 at 11:39 -
\$\begingroup\$ is it a good idea to make a copy of task (like vector/list in std) instead of using unique_ptr ? \$\endgroup\$OopsUser– OopsUser2014年02月02日 14:33:23 +00:00Commented Feb 2, 2014 at 14:33
-
\$\begingroup\$ @OopsUser, it could work, but (keeping in mind that Task is the base of other classes) you would need to preserve polymorphic behavior, so you would probably need a
virtual Task* Task::clone()
or similar for creating the copy. It complicates the design and adds to the effort necessary to implementTask
specializations (but maybe there are other reasons for that). All in all, unless there is a specific reason, I would keep the design withstd::unique_ptr
. \$\endgroup\$utnapistim– utnapistim2014年02月02日 19:30:41 +00:00Commented Feb 2, 2014 at 19:30
Re. the comments to this question, I don't see (in the Win32 API documentation) any function required to delete a condition variable initialized using InitializeConditionVariable
.
But, all the thread handles which you created using CreateThread
should be closed: by using a corresponding call to CloseHandle(mThreads[i])
in your ThreadPool::~ThreadPool()
destructor.
One more thing: your Task
class should probably have a defined destructor; see for example:
Explore related questions
See similar questions with these tags.