I just created a more or less simple thread pool and now I am curious if there is somethin wrong with the implementation or if I missed something which could come crashing at me.
I tried around with it a bit and by now everything seems to work as intended. But since threading brings the elite of all bugs to light I would feel more secure if someone could look over it.
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <stdint.h>
#include <queue>
#include <vector>
#include <memory>
namespace natives
{
typedef int16_t int16;
typedef int32_t int32;
typedef int64_t int64;
typedef std::atomic_bool flag;
}
class ThreadPool
{
public:
struct BaseTask
{
virtual void runTask() = 0;
};
template <class T> struct Task : public BaseTask
{
Task(T task)
: m_Task(task)
{}
virtual void runTask()
{
m_Task();
}
T m_Task;
};
template <class T, class P1> struct ParameteredTask : public BaseTask
{
ParameteredTask(T task, const P1& p1)
: m_Task(task), m_P1(p1)
{}
virtual void runTask()
{
m_Task(m_P1);
}
T m_Task;
P1 m_P1;
};
typedef std::queue<BaseTask*> TaskQueue;
typedef std::vector <std::shared_ptr<std::thread> > WorkerGroup;
typedef std::mutex QueueLock;
typedef std::unique_lock<std::mutex> QueueGuard;
typedef std::condition_variable WorkerSignal;
static void thMain(TaskQueue* queue, QueueLock* qlock, WorkerSignal* signal, natives::flag* online)
{
while (*online)
{
BaseTask* task = nullptr;
std::shared_ptr<ThreadPool::QueueGuard> qguard(std::make_shared<ThreadPool::QueueGuard>(*qlock));
if (!queue->empty())
{
task = queue->front();
queue->pop();
qguard.reset();
}
else if (*online)
{
signal->wait(*qguard);
}
if (nullptr != task)
{
task->runTask();
delete task;
}
}
}
ThreadPool(natives::int32 size)
: m_Online(true)
{
for (natives::int32 counter = 0; size > counter; ++counter)
{
m_Workers.push_back(std::make_shared<std::thread>(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
}
}
void addThread()
{
m_Workers.push_back(std::make_shared<std::thread>(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
}
~ThreadPool()
{
m_Online = false;
m_Signal.notify_all();
std::for_each(m_Workers.begin(), m_Workers.end(), [](std::shared_ptr<std::thread> thread)->void {thread->join(); });
}
void enqueue(BaseTask* task)
{
QueueGuard guard(m_QLock);
m_Queue.push(task);
m_Signal.notify_all();
}
template <class T> void enqueue(T task)
{
QueueGuard guard(m_QLock);
m_Queue.push(new Task<T>(task));
m_Signal.notify_all();
}
template <class T, class P1> void enqueue(T task, const P1& p1)
{
QueueGuard guard(m_QLock);
m_Queue.push(new ParameteredTask<T, P1>(task, p1));
m_Signal.notify_all();
}
natives::int32 getQueueSize()
{
QueueGuard guard(m_QLock);
natives::int32 size = m_Queue.size();
return size;
}
WorkerGroup m_Workers;
TaskQueue m_Queue;
QueueLock m_QLock;
WorkerSignal m_Signal;
natives::flag m_Online;
};
Here is a little usage example:
std::mutex m;
void report(natives::int32 i)
{
{
std::unique_lock<std::mutex> guard(m);
std::cout << "hi " << i << "\n";
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
int main()
{
ThreadPool pool(4);
pool.enqueue(report, 1);
pool.enqueue(report, 2);
pool.enqueue(report, 3);
pool.enqueue(report, 4);
pool.enqueue(report, 5);
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.addThread();
getchar();
return 0;
}
1 Answer 1
Higher Level Functionality
You should also note that C++ now supports much higher level constructs than threads.
If you use std::async(job)
it will effectively add job
to a background system threadpool for you.
Things to read about std::async
, std::future
, std::promise
Review
This is a C header
#include <stdint.h>
The C++ version of this library is:
// http://en.cppreference.com/w/cpp/header/cstdint
#include <cstdint>
The main difference is that the C++ version puts the types correctly in the std
namespace rather than the global namespace.
These types already seem to have the same names.
namespace natives
{
typedef int16_t int16;
typedef int32_t int32;
typedef int64_t int64;
typedef std::atomic_bool flag;
}
I don't see any advantage to defining non standard names for them. Also is native::int32
easier to write than std::int32_t
? Also int16
and int64
are never used in the code below.
Re-inventing std::function.
struct BaseTask
{
virtual void runTask() = 0;
};
template <class T> struct Task : public BaseTask
{
Task(T task)
: m_Task(task)
{}
virtual void runTask()
{
m_Task();
}
T m_Task;
};
OK. You mention in a previous post you are using an old compiler. So fine if you have to use an old compiler this will work. But moving forward this should really be done using std::function
.
This class is not needed though.
template <class T, class P1> struct ParameteredTask : public BaseTask
{
ParameteredTask(T task, const P1& p1)
: m_Task(task), m_P1(p1)
{}
virtual void runTask()
{
m_Task(m_P1);
}
T m_Task;
P1 m_P1;
};
Even old compilers supported std::bind
to bind a single (or two) values to a functor.
Ownership
typedef std::queue<BaseTask*> TaskQueue;
This is a bit sketchy as you don't define ownership semantics of the task objects. Are you keeping a pointer to a task owned by somebody else or are you taking ownership of the object?
If I look at your code. Your second enqueue()
implies you are taking ownership.
template <class T> void enqueue(T task)
{
QueueGuard guard(m_QLock);
m_Queue.push(new Task<T>(task));
m_Signal.notify_all();
}
In this function you create a new object and push it onto the queue. Which means you are doing manual memory management. But then we look at the first enqueue()
. This has no ownership associated with the pointer being passed.
void enqueue(BaseTask* task)
{
QueueGuard guard(m_QLock);
m_Queue.push(task);
m_Signal.notify_all();
}
So If I write the following code:
WorkIWantDone work; // derived from BaseTask
ThreadPool pool(4);
pool.enqueue(&work);
Now we have a conundrum. As I am passing an object (but not expecting ownership to be transferred). But the other enqueue()
is dynamically creating objects. So I have both types of pointer on the queue. So when the destructor is called how do we tell which objects should be deleted?
So if we look at the destructor:
~ThreadPool()
{
m_Online = false;
m_Signal.notify_all();
std::for_each(m_Workers.begin(), m_Workers.end(), [](std::shared_ptr<std::thread> thread)->void {thread->join(); });
}
No objects are destroyed from the m_Queue
. Well there is a leak. Unless the threads guarantee they well delete all items in the m_Queue
before they exit? Check the thread function. No they don't guarantee that. They will execute the current job then exit; so if we have a lot of jobs on the queue then we will have some leakage.
This interface is better defined as:
typedef std::queue<std::unique_ptr<BaseTask>> TaskQueue; // If you have an old compiler you can use `std::auto_ptr`
// better than nothing.
Shared Ownership of the threads.
typedef std::vector <std::shared_ptr<std::thread> > WorkerGroup;
Don't think that is a good idea. The threads are not shared with anything else. So better would have been std::unique_ptr
but even that is not good. You don't need to dynamically allocate the thread. You can simply have a vector of threads.
typedef std::vector<std::thread> WorkerGroup;
ThreadPool(natives::int32 size)
: m_Online(true)
{
for (natives::int32 counter = 0; size > counter; ++counter)
{
m_Workers.emplace(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
}
}
Renaming standard types?
typedef std::mutex QueueLock;
typedef std::unique_lock<std::mutex> QueueGuard;
typedef std::condition_variable WorkerSignal;
Sure but I see little point in this. It means every C++ programmer has to go and look up the type definition. If you just use the normal type they should immediately recognize it and understand what it does.
Static Member thread.
There is no need to make the thread function a static member. It can very simply call a member method. Thus saving you the trouble of passing all the variables it needs.
static void thMain(TaskQueue* queue, QueueLock* qlock, WorkerSignal* signal, natives::flag* online)
What is this obsession with passing pointers. Stop it. In modern C++ code you should practically never see a RAW pointer. The only time you should see a RAW pointer is when creating a container or smart pointer (or something very similar). In this case you could have used references.
No need to dynamiclly create the lock guard!
std::shared_ptr<ThreadPool::QueueGuard> qguard(std::make_shared<ThreadPool::QueueGuard>(*qlock));
// easier to write.
ThreadPool::QueueGuard qguard(*qlock);
We can simplify you'r following code too:
// Assuming member function now.
void threadRunner()
{
while(online) {
BaseTask* task = getTask();
if (task) {
task->runTask();
}
}
}
BaseTask* getTask()
{
ThreadPool::QueueGuard qguard(qlock);
signal.wait(qguard, [&queue, &online](){return !(online && queue.empty());}
if (!online || queue.empty()) {
return nullptr;
}
BaseTask* task = queue.front()'
queue.pop();
return task;
}
Stop using Yoda conditionals
if (nullptr != task)
They provide absolutely no benefit. In fact they are considered worse than useless because they give you a false sense of security.
The better way to get the compiler to tell you when there is an issue.
Get your compiler to treat warnings as errors (because warnings are actually logical errors in your thinking). Then you compiler will never let you use the assignment operator in a test.
Doesn't "if (0 == value) ..." do more harm than good?
In C and C++, what methods can prevent accidental use of the assignment(=) where equivalence(==) is needed?
-
\$\begingroup\$ as far as i know will passing a ref to a thread resulting in handing a copy to a thread and i simply did not see a reason to pass a smart object to the thread. If I has to pass somthing else than a raw pointer to the thread, a weak pointer would serve this purpose. The point with the leaking queue is a good one and i will take look at the c++ header. I weren't aware og bind, that's also a good one. Since I am using some third party librarys i won't treat warnings as errors since i am using vs warning level 4 which leads to some warnings from libraries which aren't mine \$\endgroup\$Mango– Mango2017年04月25日 18:28:21 +00:00Commented Apr 25, 2017 at 18:28
-
\$\begingroup\$ mist of the points were good and valid and i will consider them (at least) but to the point with the yoda conditionals: Well, actually i like them more than writing them the othercway around and I think at least this point is a matter of personal taste. \$\endgroup\$Mango– Mango2017年04月25日 18:30:16 +00:00Commented Apr 25, 2017 at 18:30
-
\$\begingroup\$ And btw, i simply didn't know std::function^^ since i swapped to vs2015 std::function should be supported. \$\endgroup\$Mango– Mango2017年04月25日 18:48:49 +00:00Commented Apr 25, 2017 at 18:48
-
\$\begingroup\$ @Mango, passing an object by reference can be achieved by using
std::ref(object)
, which will give youstd::reference_wrapper<>
, which then will be implicitly converted toobject
s type. You can implement it even in C++98 by hand. \$\endgroup\$Incomputable– Incomputable2017年04月25日 18:48:52 +00:00Commented Apr 25, 2017 at 18:48 -
\$\begingroup\$ @Mango: Note: If you call a method rather than a static function then you don't even need to pass those objects as parameters. But by ref (or reference_wrapper) is better than by pointer as it shows you are not passing ownership. \$\endgroup\$Loki Astari– Loki Astari2017年04月25日 21:51:51 +00:00Commented Apr 25, 2017 at 21:51