1
\$\begingroup\$

I've been using a thread pool that I wrote using Windows threads for a while and I've decided to update it to use std::threads. I'm looking for some feedback on how this could be improved in terms of (in order of importance) reliability, usability, and performance.

Header

#pragma once
#include <queue>
#include <thread>
#include <vector>
#include <functional>
namespace std
{
 class mutex;
 class condition_variable;
}//namespace std
namespace Utilities
{
 class Semaphore final
 {
 private:
 std::condition_variable* _condition;
 std::mutex* _mutex;
 unsigned _count;
 public:
 Semaphore();
 ~Semaphore();
 public:
 void Wait();
 void Notify();
 };
 class Worker final
 {
 public:
 typedef std::function<void()> Task;
 private: 
 std::queue<Task> _tasks;
 std::thread* _thread;
 std::mutex* _mutex;
 Semaphore _semaphore;
 bool _alive;
 public:
 Worker();
 ~Worker();
 public:
 std::thread::id id () const;
 size_t task_count() const;
 void Join ( ) const;
 void PushTask (Task task);
 void ClearTasks( );
 };
 class ThreadPool final
 {
 private:
 std::vector<Worker> _workers;
 unsigned _index;
 public:
 ThreadPool(unsigned thread_count = 0);
 ~ThreadPool();
 public:
 unsigned thread_count();
 unsigned task_count ();
 void Join ( );
 void PushTask (Worker::Task task);
 void ClearTasks( );
 };
}//namespace Utilities

Source

#include "ThreadPool.h"
#include <mutex>
#include <algorithm>
#include <condition_variable>
using namespace Utilities;
Semaphore::Semaphore() :
 _condition(new std::condition_variable()),
 _mutex (new std::mutex ()),
 _count (0 )
{ }
Semaphore::~Semaphore()
{
 delete _condition;
 delete _mutex;
}
void Semaphore::Wait()
{
 std::unique_lock<std::mutex> lock(*_mutex);
 while (!_count)
 _condition->wait(lock);
 --_count;
}//Wait
void Semaphore::Notify()
{
 std::unique_lock<std::mutex> lock(*_mutex);
 _condition->notify_one();
 ++_count;
}//Notify
Worker::Worker() :
 _mutex(new std::mutex()),
 _alive(true )
{
 _thread = new std::thread(
 [&]
 {
 while (_alive)
 {
 _semaphore.Wait();
 while (!_tasks.empty())
 {
 _mutex->lock();
 Task task = _tasks.front();
 _mutex->unlock();
 task();
 _mutex->lock();
 if (!_tasks.empty())
 _tasks.pop();
 _mutex->unlock();
 }
 }
 });
}
Worker::~Worker()
{
 ClearTasks();
 _alive = false;
 _semaphore.Notify();
 _thread->join();
 delete _thread;
 delete _mutex;
}
std::thread::id Worker::id() const
{ return _thread->get_id(); }
//id
size_t Worker::task_count() const
{ return _tasks.size(); }
//task_count
void Worker::Join() const
{ while (_tasks.size()); }
//Join
void Worker::PushTask(Task task)
{
 _mutex->lock();
 _tasks.push(task);
 _mutex->unlock();
 _semaphore.Notify();
}//PushTask
void Worker::ClearTasks()
{
 _mutex->lock();
 _tasks = std::queue<Task>();
 _mutex->unlock();
}//ClearTasks
ThreadPool::ThreadPool(unsigned thread_count) :
 _index(0)
{
 if (!thread_count)
 thread_count = std::thread::hardware_concurrency();
 _workers = std::vector<Worker>(std::max(1u, thread_count));
}
ThreadPool::~ThreadPool()
{
 ClearTasks();
 Join ();
}
unsigned ThreadPool::thread_count()
{ return _workers.size(); }
//thread_count
unsigned ThreadPool::task_count()
{
 unsigned task_count = 0;
 for (auto& worker : _workers)
 task_count += worker.task_count();
 return task_count;
}//task_count
void ThreadPool::Join()
{ while (task_count()); }
//Join
void ThreadPool::PushTask(Worker::Task task)
{
 _workers[_index++].PushTask(task);
 if (_index >= _workers.size())
 _index = 0;
}//PushTask
void ThreadPool::ClearTasks()
{
 for (auto& worker : _workers)
 worker.ClearTasks();
}//ClearTasks

Usage Example

#include "ThreadPool.h"
#include <iostream>
static void Method_0( ) { std::cout << "Method_0()" << std::endl; }
static void Method_1(float my_float) { std::cout << my_float << std::endl; }
static void Method_2(int& my_int ) { my_int += my_int; }
struct Foo
{
 void Bar() { std::cout << "Foo::Bar() : Instance" << std::endl; }
 static void Baz() { std::cout << "Foo::Baz() : Static" << std::endl; }
};
int main(int argc, char* argv[])
{
 using namespace Utilities;
 Foo foo;
 int my_int = 16;
 ThreadPool thread_pool;
 thread_pool.PushTask(Method_0);
 thread_pool.PushTask(std::bind(Method_1, 3.14f));
 thread_pool.PushTask(std::bind(Method_2, std::ref(my_int)));
 thread_pool.PushTask(std::bind(&Foo::Bar, foo));
 thread_pool.PushTask(&Foo::Baz);
 thread_pool.PushTask([]{ std::cout << "Lambda" << std::endl; });
 thread_pool.Join();
 std::cout << my_int << std::endl;
 return 0;
}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Apr 26, 2015 at 15:55
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Miscellaneous:

I imagine you did this:

namespace std
{
 class mutex;
 class condition_variable;
}//namespace std

To have a forward declaration of those types and avoid the #includes. I don't like this though. There is no guarantee that mutex or condition_variable are plain class types. They could be anything, structs, template classes, or even typedefs to anything else. So this is certainly not portable across compilers and might even break on a future update of your current compiler. Including the relevant files in the header is cleaner, safer and also simpler.

Also refer to this question.


The busy wait loop in Worker::Join() could take advantage of std::this_thread::yield(), which might perform better than just spinning.

void Worker::Join() const
{ 
 while (_tasks.size())
 {
 std::this_thread::yield();
 }
}

Those comments at the end of every other method are plain noise.

void ThreadPool::ClearTasks()
{
 for (auto& worker : _workers)
 worker.ClearTasks();
}//ClearTasks

Your methods are not that big, I can see where they begin and end very easily without that comment.


While not technically wrong in the context you've used, names starting with an underscore bare a few restrictions in C++. FYI, I suggest reading this SO question.


Copy and assignment:

As it stands, your classes can be copied using operator = or the implicit copy constructor. Doing so would wreak havoc in your program, since each copy would be a shallow copy and multiple attempts to delete the same pointers would be done by the destructors of different objects. You should at the very least delete the copy constructor and assignment operator of Semaphore and Worker, and perhaps also from Thread. E.g.:

class Semaphore final
{
 Semaphore(const Semaphore &) = delete;
 Semaphore & operator = (const Semaphore &) = delete;
 // copy and assignment are now disallowed.
};

This falls into what is known as Rule of Three, which means that if a construtor does some non-trivial work, such as memory allocation, then more than likely you'll need to define custom copy constructor and assignment as well (or disable them).


Memory Management:

In the Semaphore class, condition_variable and mutex are declared as pointers. I would declare them by value to avoid extra memory allocations and improve data locality. I don't see any obvious reason for them to be pointers. If you do keep the pointers and dynamic allocations, then consider using a std::unique_ptr rather than manual new and delete calls, to ensure exception safety and simplify the resource management logic. Same applies to thread and mutex in Worker. Those could be declared by value as well. Otherwise, would be better as smart pointers too.

answered Apr 26, 2015 at 18:03
\$\endgroup\$
6
  • \$\begingroup\$ Thanks for the feedback.I was not aware that mutex, and condition_variable are not guaranteed to be classes, en.cppreference.com/w/cpp/thread/mutex shows it to be a class, but I can easily move the includes into the header. I really don't like having thread, mutex, and condition_variable as raw pointers, but since they need to be contained in standard containers, I felt that raw pointers turned out to be the simplest solution in this case. I will certainly look into std::this_thread::yield for Join(). \$\endgroup\$ Commented Apr 26, 2015 at 18:06
  • \$\begingroup\$ @PatrickPurcell, Yes, that is a possible/reference interface, however, there is no standard way to forward declare anything inside the namespace std, besides including the relevant files. This question asks precisely this, the single answer provided cites the C++11 standard. \$\endgroup\$ Commented Apr 26, 2015 at 18:26
  • \$\begingroup\$ @PatrickPurcell, I also forgot to mention the Rule of Three. This is important. I'll update the answer now. \$\endgroup\$ Commented Apr 26, 2015 at 18:28
  • \$\begingroup\$ Thanks for the link to the SO question with the C++11 standard for forward declarations in the std namespace. I will fix that as well as add copy constructors and copy assignment operators. \$\endgroup\$ Commented Apr 26, 2015 at 18:34
  • \$\begingroup\$ In this case would you prefer non-copyable over having a custom copy constructor and assignment operator, if so, why? \$\endgroup\$ Commented Apr 26, 2015 at 18:43

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.