0
\$\begingroup\$

I just created a more or less simple thread pool and now I am curious if there is somethin wrong with the implementation or if I missed something which could come crashing at me.

I tried around with it a bit and by now everything seems to work as intended. But since threading brings the elite of all bugs to light I would feel more secure if someone could look over it.

#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <stdint.h>
#include <queue>
#include <vector>
#include <memory>
namespace natives
{
 typedef int16_t int16;
 typedef int32_t int32;
 typedef int64_t int64;
 typedef std::atomic_bool flag;
}
class ThreadPool
{
public:
 struct BaseTask
 {
 virtual void runTask() = 0;
 };
 template <class T> struct Task : public BaseTask
 {
 Task(T task)
 : m_Task(task)
 {}
 virtual void runTask()
 {
 m_Task();
 }
 T m_Task;
 };
 template <class T, class P1> struct ParameteredTask : public BaseTask
 {
 ParameteredTask(T task, const P1& p1)
 : m_Task(task), m_P1(p1)
 {}
 virtual void runTask()
 {
 m_Task(m_P1);
 }
 T m_Task;
 P1 m_P1;
 };
 typedef std::queue<BaseTask*> TaskQueue;
 typedef std::vector <std::shared_ptr<std::thread> > WorkerGroup;
 typedef std::mutex QueueLock;
 typedef std::unique_lock<std::mutex> QueueGuard;
 typedef std::condition_variable WorkerSignal;
 static void thMain(TaskQueue* queue, QueueLock* qlock, WorkerSignal* signal, natives::flag* online)
 {
 while (*online)
 {
 BaseTask* task = nullptr;
 std::shared_ptr<ThreadPool::QueueGuard> qguard(std::make_shared<ThreadPool::QueueGuard>(*qlock));
 if (!queue->empty())
 {
 task = queue->front();
 queue->pop();
 qguard.reset();
 }
 else if (*online)
 {
 signal->wait(*qguard);
 }
 if (nullptr != task)
 {
 task->runTask();
 delete task;
 }
 }
 }
 ThreadPool(natives::int32 size)
 : m_Online(true)
 {
 for (natives::int32 counter = 0; size > counter; ++counter)
 {
 m_Workers.push_back(std::make_shared<std::thread>(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
 }
 }
 void addThread()
 {
 m_Workers.push_back(std::make_shared<std::thread>(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
 }
 ~ThreadPool()
 {
 m_Online = false;
 m_Signal.notify_all();
 std::for_each(m_Workers.begin(), m_Workers.end(), [](std::shared_ptr<std::thread> thread)->void {thread->join(); });
 }
 void enqueue(BaseTask* task)
 {
 QueueGuard guard(m_QLock);
 m_Queue.push(task);
 m_Signal.notify_all();
 }
 template <class T> void enqueue(T task)
 {
 QueueGuard guard(m_QLock);
 m_Queue.push(new Task<T>(task));
 m_Signal.notify_all();
 }
 template <class T, class P1> void enqueue(T task, const P1& p1)
 {
 QueueGuard guard(m_QLock);
 m_Queue.push(new ParameteredTask<T, P1>(task, p1));
 m_Signal.notify_all();
 }
 natives::int32 getQueueSize()
 {
 QueueGuard guard(m_QLock);
 natives::int32 size = m_Queue.size();
 return size;
 }
 WorkerGroup m_Workers;
 TaskQueue m_Queue;
 QueueLock m_QLock;
 WorkerSignal m_Signal;
 natives::flag m_Online;
};

Here is a little usage example:

std::mutex m;
void report(natives::int32 i)
{
 {
 std::unique_lock<std::mutex> guard(m);
 std::cout << "hi " << i << "\n";
 }
 std::this_thread::sleep_for(std::chrono::seconds(5));
}
int main()
{
 ThreadPool pool(4);
 pool.enqueue(report, 1);
 pool.enqueue(report, 2);
 pool.enqueue(report, 3);
 pool.enqueue(report, 4);
 pool.enqueue(report, 5);
 std::this_thread::sleep_for(std::chrono::seconds(2));
 pool.addThread();
 getchar();
 return 0;
}
Incomputable
9,7143 gold badges34 silver badges73 bronze badges
asked Apr 25, 2017 at 14:54
\$\endgroup\$

1 Answer 1

4
\$\begingroup\$

Higher Level Functionality

You should also note that C++ now supports much higher level constructs than threads.

If you use std::async(job) it will effectively add job to a background system threadpool for you.

Things to read about std::async, std::future, std::promise

Review

This is a C header

#include <stdint.h>

The C++ version of this library is:

// http://en.cppreference.com/w/cpp/header/cstdint
#include <cstdint>

The main difference is that the C++ version puts the types correctly in the std namespace rather than the global namespace.

These types already seem to have the same names.

namespace natives
{
 typedef int16_t int16;
 typedef int32_t int32;
 typedef int64_t int64;
 typedef std::atomic_bool flag;
}

I don't see any advantage to defining non standard names for them. Also is native::int32 easier to write than std::int32_t? Also int16 and int64 are never used in the code below.

Re-inventing std::function.

 struct BaseTask
 {
 virtual void runTask() = 0;
 };
 template <class T> struct Task : public BaseTask
 {
 Task(T task)
 : m_Task(task)
 {}
 virtual void runTask()
 {
 m_Task();
 }
 T m_Task;
 };

OK. You mention in a previous post you are using an old compiler. So fine if you have to use an old compiler this will work. But moving forward this should really be done using std::function.

This class is not needed though.

 template <class T, class P1> struct ParameteredTask : public BaseTask
 {
 ParameteredTask(T task, const P1& p1)
 : m_Task(task), m_P1(p1)
 {}
 virtual void runTask()
 {
 m_Task(m_P1);
 }
 T m_Task;
 P1 m_P1;
 };

Even old compilers supported std::bind to bind a single (or two) values to a functor.

Ownership

 typedef std::queue<BaseTask*> TaskQueue;

This is a bit sketchy as you don't define ownership semantics of the task objects. Are you keeping a pointer to a task owned by somebody else or are you taking ownership of the object?

If I look at your code. Your second enqueue() implies you are taking ownership.

template <class T> void enqueue(T task)
{
 QueueGuard guard(m_QLock);
 m_Queue.push(new Task<T>(task));
 m_Signal.notify_all();
}

In this function you create a new object and push it onto the queue. Which means you are doing manual memory management. But then we look at the first enqueue(). This has no ownership associated with the pointer being passed.

void enqueue(BaseTask* task)
{
 QueueGuard guard(m_QLock);
 m_Queue.push(task);
 m_Signal.notify_all();
}

So If I write the following code:

WorkIWantDone work; // derived from BaseTask
ThreadPool pool(4);
pool.enqueue(&work);

Now we have a conundrum. As I am passing an object (but not expecting ownership to be transferred). But the other enqueue() is dynamically creating objects. So I have both types of pointer on the queue. So when the destructor is called how do we tell which objects should be deleted?

So if we look at the destructor:

~ThreadPool()
{
 m_Online = false;
 m_Signal.notify_all();
 std::for_each(m_Workers.begin(), m_Workers.end(), [](std::shared_ptr<std::thread> thread)->void {thread->join(); });
}

No objects are destroyed from the m_Queue. Well there is a leak. Unless the threads guarantee they well delete all items in the m_Queue before they exit? Check the thread function. No they don't guarantee that. They will execute the current job then exit; so if we have a lot of jobs on the queue then we will have some leakage.

This interface is better defined as:

typedef std::queue<std::unique_ptr<BaseTask>> TaskQueue; // If you have an old compiler you can use `std::auto_ptr` 
 // better than nothing.

Shared Ownership of the threads.

 typedef std::vector <std::shared_ptr<std::thread> > WorkerGroup;

Don't think that is a good idea. The threads are not shared with anything else. So better would have been std::unique_ptr but even that is not good. You don't need to dynamically allocate the thread. You can simply have a vector of threads.

 typedef std::vector<std::thread> WorkerGroup;
 ThreadPool(natives::int32 size)
 : m_Online(true)
 {
 for (natives::int32 counter = 0; size > counter; ++counter)
 {
 m_Workers.emplace(thMain, &m_Queue, &m_QLock, &m_Signal, &m_Online));
 }
 }

Renaming standard types?

 typedef std::mutex QueueLock;
 typedef std::unique_lock<std::mutex> QueueGuard;
 typedef std::condition_variable WorkerSignal;

Sure but I see little point in this. It means every C++ programmer has to go and look up the type definition. If you just use the normal type they should immediately recognize it and understand what it does.

Static Member thread.

There is no need to make the thread function a static member. It can very simply call a member method. Thus saving you the trouble of passing all the variables it needs.

 static void thMain(TaskQueue* queue, QueueLock* qlock, WorkerSignal* signal, natives::flag* online)

What is this obsession with passing pointers. Stop it. In modern C++ code you should practically never see a RAW pointer. The only time you should see a RAW pointer is when creating a container or smart pointer (or something very similar). In this case you could have used references.

No need to dynamiclly create the lock guard!

 std::shared_ptr<ThreadPool::QueueGuard> qguard(std::make_shared<ThreadPool::QueueGuard>(*qlock));
 // easier to write.
 ThreadPool::QueueGuard qguard(*qlock);

We can simplify you'r following code too:

 // Assuming member function now.
 void threadRunner()
 {
 while(online) {
 BaseTask* task = getTask();
 if (task) {
 task->runTask();
 }
 }
 }
 BaseTask* getTask()
 {
 ThreadPool::QueueGuard qguard(qlock);
 signal.wait(qguard, [&queue, &online](){return !(online && queue.empty());}
 if (!online || queue.empty()) {
 return nullptr;
 }
 BaseTask* task = queue.front()'
 queue.pop();
 return task;
 }

Stop using Yoda conditionals

 if (nullptr != task)

They provide absolutely no benefit. In fact they are considered worse than useless because they give you a false sense of security.

The better way to get the compiler to tell you when there is an issue.

Get your compiler to treat warnings as errors (because warnings are actually logical errors in your thinking). Then you compiler will never let you use the assignment operator in a test.

Doesn't "if (0 == value) ..." do more harm than good?
In C and C++, what methods can prevent accidental use of the assignment(=) where equivalence(==) is needed?

answered Apr 25, 2017 at 18:12
\$\endgroup\$
5
  • \$\begingroup\$ as far as i know will passing a ref to a thread resulting in handing a copy to a thread and i simply did not see a reason to pass a smart object to the thread. If I has to pass somthing else than a raw pointer to the thread, a weak pointer would serve this purpose. The point with the leaking queue is a good one and i will take look at the c++ header. I weren't aware og bind, that's also a good one. Since I am using some third party librarys i won't treat warnings as errors since i am using vs warning level 4 which leads to some warnings from libraries which aren't mine \$\endgroup\$ Commented Apr 25, 2017 at 18:28
  • \$\begingroup\$ mist of the points were good and valid and i will consider them (at least) but to the point with the yoda conditionals: Well, actually i like them more than writing them the othercway around and I think at least this point is a matter of personal taste. \$\endgroup\$ Commented Apr 25, 2017 at 18:30
  • \$\begingroup\$ And btw, i simply didn't know std::function^^ since i swapped to vs2015 std::function should be supported. \$\endgroup\$ Commented Apr 25, 2017 at 18:48
  • \$\begingroup\$ @Mango, passing an object by reference can be achieved by using std::ref(object), which will give you std::reference_wrapper<>, which then will be implicitly converted to objects type. You can implement it even in C++98 by hand. \$\endgroup\$ Commented Apr 25, 2017 at 18:48
  • \$\begingroup\$ @Mango: Note: If you call a method rather than a static function then you don't even need to pass those objects as parameters. But by ref (or reference_wrapper) is better than by pointer as it shows you are not passing ownership. \$\endgroup\$ Commented Apr 25, 2017 at 21:51

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.