I wanted to implement a thread pool to test my knowledge of various modern C++ features. The end result could be better, but I need a review as I'm not sure about delicate things especially regarding synchronization and variable argument passing and of course recommendations on improvements of the code as well as my reasoning below! The code compiles and runs on MSVS 2017 15.9.15.
First of, this is the design I've set up, by mainly reading the wikipedia article and briefly skimming-through a couple of similar questions here. I say briefly because I wanted to implement this on my own.
- there is an array of threads
- there is a FIFO queue of tasks (a Task should be a wrapper for a Callable object).
- when a client creates a task he
enqueu()
s it into the task queue - all threads are started up front in the pool and they never quit running; if there's no task in the queue they
wait
- each incoming task
notifies()
a single thread to handle it - if a Task returns a result the client can send a query for its value later
- the thread pool has a switch On/Off flag
- Thread synchronization: a
condition_variable
(m_cond) along with amutex
variable (m_mu) perform thread synchronization- m_mu is used to synchronize access to shared data (the containers mostly)
- m_cond notifies threads upon task arrival and puts them to sleep when there are no tasks
The key implementation question I asked myself is "how can one create a container of callable objects"? In general a container of generic objects, in order to accomodate functions of any signature. std::any
came up to my mind, but I thought it would possibly too much, making the entire class a template. So i questioned whether I can circumvent this somehow.
And I thought of a bare wrapper std::function<void()>
could possibly work, as the type of the task queue, as almost all c++ callables can be converted to an std::function<T>
. But this would be only an indirection, inside this std::function
I would need to pass up the arguments up front (however many the client supplies).
Then I thought that I also want to provide return values to the client. The only way to do this in C++ in an generic way I know of is a future
. And then I realized I need another level of indirection, a std::packaged_task
which will wrap the std::function
which will wrap the call to our target (member) function
. And I must also make sure the packaged_task lives long enough for it to return the value, ie dynamic allocation through a unique_ptr
or shared_ptr
would be suitable. I decided on a shared_ptr
as unique_ptr
was unfortunately not enough.
And now to the code. This is the thread pool code:
#include <iostream>
#include <functional>
#include <vector>
#include <thread>
#include <queue>
#include <condition_variable>
#include <future>
#include <memory>
#include <sstream>
#include <string>
class ThreadPool final
{
using Task = std::function<void()>;
public:
explicit ThreadPool(std::size_t nthreads = std::thread::hardware_concurrency(),
bool enabled = true) noexcept
: m_nthreads{ nthreads },
m_enabled{ enabled }
{
m_pool.reserve(nthreads);
run();
}
~ThreadPool() noexcept
{
stop();
}
ThreadPool(ThreadPool const&) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
void start() noexcept
{
m_enabled.store(true);
run();
}
void stop() noexcept
{
m_enabled.store(false);
//std::unique_lock<std::mutex> lg{ m_mu }; // comment out and the program never ends
m_cond.notify_all();
for (auto& t : m_pool)
{
if (t.joinable())
{
t.join();
}
}
}
template<typename Callback, typename... TArgs>
decltype(auto) enqueue(Callback&& f, TArgs&&... args)
//auto enqueue(Callback&& f, TArgs&&... args) -> std::future<std::invoke_result_t<Callback, TArgs...>>
{
using ReturnType = std::invoke_result_t<Callback, TArgs...>;
using FuncType = ReturnType(TArgs...);
using Wrapped = std::packaged_task<FuncType>;
std::shared_ptr<Wrapped> smartFunctionPointer = std::make_shared<Wrapped>(f);
{
std::unique_lock<std::mutex> lg{ m_mu };
m_tasks.emplace(
[smartFunctionPointer, &args...]() -> void
{
(*smartFunctionPointer)(std::forward<TArgs>(args)...);
return;
}
);
m_cond.notify_one();
}
return smartFunctionPointer->get_future();
}
private:
std::atomic<bool> m_enabled = false;
size_t m_nthreads;
std::vector<std::thread> m_pool;
std::queue<Task> m_tasks;
std::condition_variable m_cond;
std::mutex m_mu;
void run()
{
for (int ti = 0; ti < m_nthreads; ++ti)
{
m_pool.emplace_back(
std::thread{
[this] () mutable
{
//using namespace std::chrono_literals;
Task task;
while (m_enabled)
{
{
std::unique_lock<std::mutex> lg{ m_mu };
while (m_tasks.empty() && m_enabled)
m_cond.wait(lg);
}
if (!m_tasks.empty())
{// there is a task available
std::unique_lock<std::mutex> lg{ m_mu };
task = std::move(m_tasks.front());
m_tasks.pop();
task();
}
}// while threadPool is enabled
//return;
}// thread function
}// launch thread
);// place thread in the pool
}// for all threads
}
};
/////////////////////////////////////////////////////////////////////////
This is the test code sample:
// Create some work to test the Thread Pool
void spitId()
{
std::cout << "thread #" << std::this_thread::get_id() << '\n';
}
void sayAndNoReturn()
{
auto tid = std::this_thread::get_id();
std::cout << "thread #" << tid << "says " << " and returns... ";
std::cout << typeid(tid).name() << '\n'; // std::thread::id
}
char sayWhat(int arg)
{
auto tid = std::this_thread::get_id();
std::stringstream sid;
sid << tid;
int id = std::stoi(sid.str());
std::cout << "\nthread #" << id << " says " << arg << " and returns... ";
if (id > 7000)
return 'X';
return 'a';
}
struct Member
{
int i_ = 4;
void sayCheese(int i)
{
std::cout << "CHEESEE!" << '\n';
std::cout << i + i_ << '\n';
}
};
int vv() { puts("nothing"); return 0; }
int vs(const std::string& str) { puts(str.c_str()); return 0; }
int main()
{
ThreadPool threadPool{ std::thread::hardware_concurrency() };
threadPool.enqueue(spitId);
threadPool.enqueue(&spitId);
threadPool.enqueue(sayAndNoReturn);
auto f1 = threadPool.enqueue([]() -> int
{
std::cout << "lambda 1\n";
return 1;
});
auto sayWhatRet = threadPool.enqueue(sayWhat, 100);
std::cout << sayWhatRet.get() << '\n';
Member member{ 1 };
threadPool.enqueue(std::bind(&Member::sayCheese, member, 100));
std::cout << f1.get() << '\n';
auto f2 = threadPool.enqueue([]()
{
std::cout << "lambda 2\n";
return 2;
});
auto f3 = threadPool.enqueue([]()
{
return sayWhat(100);
});
//threadPool.enqueue(std::function<void(int)>{Member{}.sayCheese(100)});
std::cout << "f1 type = " << typeid(f2).name() << '\n';
std::cout << f2.get() << '\n';
std::cout << f3.get() << '\n';
return EXIT_SUCCESS;
}
Also note that I use an custom-made class in Windows which gets initialized during program start and at the end says whether there are any memory leaks. Thankfully no memory leaks were observed.
The parts of the code I am mostly interested in feedback (because those are the parts I'm mostly unsure of) are:
thread synchronization. Is it done properly? Do I cover any possible scenarios? You will notice a commented
std::unique_lock
in thestop
function. If I comment this out the threadPool never ends. I debugged and the threads never return. I believe this is a deadlock. The main thread grabs the mutex there, in the meantime other threads are waiting when they get notified in the next statement by the main thread they don't seem to wake up.. I'm not sure about this.The way I capture the arguments and the
smartFunctionPointer
in the lambda. I tried out different combinations, by ref etc. and I picked the one which works best (capturing by value). This is however not proper design, as I'm not sure I understand for example, why capturing by reference there causes an exception.
2 Answers 2
Task task;
while (m_enabled)
{
{
std::unique_lock<std::mutex> lg{ m_mu };
while (m_tasks.empty() && m_enabled)
m_cond.wait(lg);
}
if (!m_tasks.empty())
{// there is a task available
std::unique_lock<std::mutex> lg{ m_mu };
task = std::move(m_tasks.front());
m_tasks.pop();
task();
}
}// while threadPool is enabled
This all seems a bit dodgy.
- We acquire a lock for waiting, and checking if tasks is empty (ok).
- Then when a task appears in the queue we dispose of the lock (?).
- We then check for emptiness without a lock (!!), and acquire a new lock. The queue could well become empty between checking for emptiness, and acquiring the new lock (and more importantly the subsequent unchecked call to
front()
). - Then we take the task off the queue, and execute it while still holding the lock (!). So we are effectively executing tasks in a single-threaded fashion.
I'd expect it to look something like:
while (true)
{
std::unique_lock<std::mutex> lg{ m_mu };
m_cond.wait(lg, [&] () { return !m_enabled || !m_tasks.empty(); });
if (!m_enabled)
break;
//assert(!m_tasks.empty());
auto task = std::move(m_tasks.front());
m_tasks.pop();
lg.unlock();
task();
}
std::atomic<bool> m_enabled = false;
This directly contradicts the default argument in the constructor.
: m_nthreads{ nthreads },
m_enabled{ enabled }
Make sure to specify the members in the same order as they are declared in the body of the class (since that is the order they will actually be initialized in). Your compiler should warn you about this.
The current logic for when to run()
or stop()
seems to be faulty. It looks like we call run()
in the constructor, whether or not the thread pool is enabled
.
run()
may then be called a second time in start()
, which will add another nthreads
to the thread pool.
Similarly, after stop()
is called, we still have threads in the pool (and in the vector), even if they have been joined.
Besides the lifetime issues pointed out by L.F., taking the arguments to the packaged task and forwarding them is quite complicated. The user can already manage arguments themselves with std::bind
or lambda capture, so there's probably no need for another method. We can just take a single callable object.
Perhaps std::promise
might be a better choice than std::packaged_task
. We don't need the capabilities of launching a new thread with a packaged_task
, and we are effectively handling the packaging of the task ourselves.
There's not really any need for m_enabled
to be atomic. The only place where we would need a lock for it is in the stop function.
Suggested modifications:
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <future>
#include <queue>
#include <thread>
#include <vector>
class ThreadPool final
{
public:
explicit ThreadPool(std::size_t nthreads = std::thread::hardware_concurrency()):
m_enabled(true),
m_pool(nthreads)
{
run();
}
~ThreadPool()
{
stop();
}
ThreadPool(ThreadPool const&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
template<class TaskT>
auto enqueue(TaskT task) -> std::future<decltype(task())>
{
using ReturnT = decltype(task());
auto promise = std::make_shared<std::promise<ReturnT>>();
auto result = promise->get_future();
auto t = [p = std::move(promise), t = std::move(task)] () mutable { execute(*p, t); };
{
std::lock_guard<std::mutex> lock(m_mu);
m_tasks.push(std::move(t));
}
m_cv.notify_one();
return result;
}
private:
std::mutex m_mu;
std::condition_variable m_cv;
bool m_enabled;
std::vector<std::thread> m_pool;
std::queue<std::function<void()>> m_tasks;
template<class ResultT, class TaskT>
static void execute(std::promise<ResultT>& p, TaskT& task)
{
p.set_value(task()); // syntax doesn't work with void ResultT :(
}
template<class TaskT>
static void execute(std::promise<void>& p, TaskT& task)
{
task();
p.set_value();
}
void stop()
{
{
std::lock_guard<std::mutex> lock(m_mu);
m_enabled = false;
}
m_cv.notify_all();
for (auto& t : m_pool)
t.join();
}
void run()
{
auto f = [this] ()
{
while (true)
{
std::unique_lock<std::mutex> lock{ m_mu };
m_cv.wait(lock, [&] () { return !m_enabled || !m_tasks.empty(); });
if (!m_enabled)
break;
assert(!m_tasks.empty());
auto task = std::move(m_tasks.front());
m_tasks.pop();
lock.unlock();
task();
}
};
for (auto& t : m_pool)
t = std::thread(f);
}
};
Welcome to Code Review! Here's some suggestions:
Sort the
#include
directives in alphabetical order.<sstream>
and<string>
shouldn't be in the implementation.The constructor should not be
noexcept
— it allocates resources. Same forstart
.The destructor is implicitly
noexcept
, so you can omit it.In
enqueue
, the lambda captures&args...
. This potentially creates dangling references. Also, the() -> void
andreturn;
in the lambda can be omitted.In the declaration of
m_nthreads
, the type specifier should bestd::size_t
, notsize_t
. You also didn't#include <cstddef>
.In
run
, the type ofti
should bestd::size_t
, notint
. The lambda does not have to bemutable
because the only thing you capture is thethis
pointer which you don't want to modify, do you?The textual representation of a thread identifier is not guaranteed to be suitable for extracting as
int
.Don't mix C I/O with streams. Especially don't do
puts(str.c_str())
(which should have beenstd::puts
anyway). You can turn off sync between C I/O and streams to (potentially) improve performance.You can omit the final
return
statement inmain
.
-
\$\begingroup\$ "The lambda shouldn't be mutable". Indeed, I thought that popping from the queue would require mutability.. I was wrong.. Can you tell me why capturing
&args...
by reference I create dangling references?args...
is intended to be used only to call the function. They aren't used elsewhere so why would they dangle? Am I missing something? Thanks for the feedback! \$\endgroup\$KeyC0de– KeyC0de2019年09月24日 19:23:17 +00:00Commented Sep 24, 2019 at 19:23 -
1\$\begingroup\$ @Nikos You are welcome.
args
are the parameters of the enclosing function, but the lambda is placed in the queue and will be executed later. At that time, the parameters referred to byargs
may have ceased to exist. \$\endgroup\$L. F.– L. F.2019年09月25日 09:08:34 +00:00Commented Sep 25, 2019 at 9:08 -
\$\begingroup\$ You are right. I resulted in
move
ing them in the lambda instead. \$\endgroup\$KeyC0de– KeyC0de2019年09月25日 19:39:47 +00:00Commented Sep 25, 2019 at 19:39
Explore related questions
See similar questions with these tags.