As an exercise in using C++11 features I decided to make a thread pool class.
I would like to have a review on the code with focus on:
- Standards compliance / Portability issues / Best practices
- Thread Safety / Exception handling
- API
- Performance issues (Yes I know
std::mutex
is slow on windows, but besides that :) - Any ideas on how I can get rid of the specialization on
void
return type?
I have inlined all methods in the class declaration to make it easier to read here. In the actual implementation they are not inlined. Sorry for the failed line-breaks on some places, the CR code window is a bit narrower than I'd like.
#include <functional>
#include <future>
#include <deque>
#include <thread>
/// <summary> A typical thread worker queue that can execute arbitrary jobs.
/// </summary>
///
/// <remarks>
/// * Thread Safety : Full.
/// * Exception Safety: Strong. </remarks>
class WorkQueue{
public:
/// <summary> Constructors a new work queue object. </summary>
/// <param name="numWorkers"> (Optional) number of workers, less than 0 to
/// auto-detect (may fail on esoteric platforms). </param>
explicit WorkQueue(int numWorkers = -1){
if (numWorkers < 1){
numWorkers = std::thread::hardware_concurrency() + 1;
}
while (numWorkers--){
m_workers.emplace_back(std::thread(&WorkQueue::doWork, this));
}
}
/// <summary> Will abort all pending jobs and run any in-progress jobs to
/// completion upon destruction. </summary>
~WorkQueue(){
abort();
}
/// <summary> Stops work queue and finishes jobs currently being executed.
/// Queued jobs that have not begun execution will have their promises
/// broken. </summary>
void abort(){
m_exit = true;
m_finish_work = false;
m_signal.notify_all();
joinAll();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.clear();
}
}
/// <summary> Stops new work from being submitted to this work queue.</summary>
void stop(){
m_exit = true;
m_finish_work = true;
m_signal.notify_all();
}
/// <summary> Wait for completion of all submitted work. No more work will
/// be allowed to be submitted. </summary>
void waitForCompletion(){
stop();
joinAll();
assert(m_work.empty());
}
/// <summary> Executes the given function asynchronously. </summary>
/// <exception cref="std::runtime_error"> Thrown if attempting to submit a job
/// to a work queue that is terminating. </exception>
/// <param name="function"> [in] The function to execute. </param>
/// <returns> A std::future<RETVAL> for the result that will be generated by
/// the function argument. Exceptions from the function will be
/// thrown by get() on the future.</returns>
template<typename RETVAL>
std::future<RETVAL> submit(std::function<RETVAL()>&& function){
if (m_exit){
throw std::runtime_error("Caught work submission to work queue that is desisting.");
}
// Workaround for lack of lambda move capture
typedef std::pair<std::promise<RETVAL>, std::function<RETVAL()>> pair_t;
std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<RETVAL>(), std::move(function));
std::future<RETVAL> future = data->first.get_future();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.emplace_back([data](){
try{
data->first.set_value(data->second());
}
catch (...){
data->first.set_exception(std::current_exception());
}
});
}
m_signal.notify_one();
return std::move(future);
}
template<>
std::future<void> submit(std::function<void()>&& function){
if (m_exit){
throw std::runtime_error("Caught work submission to work queue that is desisting.");
}
// Workaround for lack of lambda move capture
typedef std::pair<std::promise<void>, std::function<void()>> pair_t;
std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<void>(), std::move(function));
std::future<void> future = data->first.get_future();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.emplace_back([data](){
try{
data->second();
data->first.set_value();
}
catch (...){
data->first.set_exception(std::current_exception());
}
});
}
m_signal.notify_one();
return std::move(future);
}
private:
std::deque<std::function<void()>> m_work;
std::mutex m_mutex;
std::condition_variable m_signal;
std::atomic<bool> m_exit{ false };
std::atomic<bool> m_finish_work{ true };
std::vector<std::thread> m_workers;
void doWork(){
std::unique_lock<std::mutex> ul(m_mutex);
while (!m_exit || (m_finish_work && !m_work.empty())){
if (!m_work.empty()){
std::function<void()> work(std::move(m_work.front()));
m_work.pop_front();
ul.unlock();
work();
ul.lock();
}
else{
m_signal.wait(ul);
}
}
}
void joinAll(){
for (auto& thread : m_workers){
thread.join();
}
m_workers.clear();
}
void operator=(const WorkQueue&) = delete;
WorkQueue(const WorkQueue&) = delete;
};
Example use:
int main(){
WorkQueue wq;
wq.submit<void>([](){std::cout << "foo" << std::endl; });
wq.submit<void>([](){std::cout << "bar" << std::endl; });
std::future<int> f0 = wq.submit<int>([](){return 4; });
std::future<int> f1 = wq.submit<int>([](){return 40; });
std::cout << f1.get() << std::endl;
wq.waitForCompletion();
std::cout << f.get() << std::endl;
return 0;
}
2 Answers 2
Please note that the code should be presented in the same way that you use it to avoid spurious errors. Compiling your code as is, gave me an error on the specialization of submit
.
Naming
pair_t
is a rather generic name that does not tell what is storedm_work
could be improved a little although I don't have any idea yetm_mutex
should be renamed tom_work_mutex
to indicate what it is for (or bundle it together withm_work
to make it impossible to accessm_work
without lockingm_mutex
)m_signal
could take at least some explanation about its uses: what does it signal? Maybe the name can reflect this as well.m_exit
should be named something likem_accept_no_more_work
(oraccept_more_work
and its logic should be inverted because negation in a (boolean) variable name makes reasoning harder)m_workers
could be namem_worker_threads
Derive the return type automatically
Why bother the user with giving the return type to submit
when it can be extracted?
template <typename FunctionObject>
auto submit(FunctionObject &&function) -> std::future<decltype(function())>;
Remove code duplication in submit
Your two submit
functions have largely the same code which indicates that your switch on the template parameter is too coarse. The only differing part is in the try{}
block. The obvious solution is to extract the differing parts into an own function. For shortness I will use a template function specialization:
template <>
inline void WorkQueue::execute_and_set_data<void>(const DataPointer<void> &data) {
data->second();
data->first.set_value();
}
template <typename ReturnType>
void WorkQueue::execute_and_set_data(const DataPointer<ReturnType> &data) {
data->first.set_value(data->second());
}
These functions use some template types:
template <typename ReturnType>
using PromiseFunctionPair =
std::pair<std::promise<ReturnType>, std::function<ReturnType()>>;
template <typename ReturnType>
using DataPointer = std::shared_ptr<PromiseFunctionPair<ReturnType>>;
And they replace the call in the try catch block. Because of the nontrivial dependency between ReturnType
and these types they cannot be used for automatic template parameter deduction:
try {
execute_and_set_data<ReturnType>(data);
}
However, I would recommend to replace PromiseFunctionPair
by a templated struct
with better names for the members than first
and second
which would then allow automatic template parameter deduction.
Missing includes
- you are using
std::vector
without#include <vector>
- you are using
assert
without#include <cassert>
Typo
f.get()
should probably bef0.get()
?
Besides the mentioned flaws I really enjoyed reading your code :)
-
\$\begingroup\$ Excellent review! Thanks! By the way what compiler did you use? The submit method compiles as is on VS2013. As for the include, my default PCH must have included vector and it was in the test program. The
f.get()
was a typo yes, I added that as an extra example after the fact. I thought I wouldn't mess up such a simple addition... figures lol. I will wait a bit longer with accepting an answer to see if there are more comments. \$\endgroup\$Emily L.– Emily L.2014年08月19日 09:45:19 +00:00Commented Aug 19, 2014 at 9:45 -
\$\begingroup\$ I used both gcc 4.9 and clang++3.6 the latter gave the error
error: explicit specialization of 'submit' in class scope
have a look at this SO answer for the standard's position on this problem. As to the PCH comment: I have run into this problem several times. This problem can also occur when you use your headers only in source files where the required headers are included as well. It helps to have a dummy source file for the header that does not include anything else (and no PCH) to detect missing includes. \$\endgroup\$Nobody moving away from SE– Nobody moving away from SE2014年08月19日 10:55:33 +00:00Commented Aug 19, 2014 at 10:55
Let me start by saying that this is a nicely written class with good documentation. I also think that Nobody's answer provides some good advice, so mostly I'll build on that.
I only have one correctness comment: you have problematic behavior in joinAll
. If waitForCompletion
and/or abort
are called from multiple threads (and I can certainly imagine one thread calling waitForCompletion
and another thread later calling abort
), you have a race in joinAll
; one thread may be mid-iteration when the other calls m_workers.clear()
. Try the following:
void joinAll(){
std::vector<std::thread> workers;
{
std::lock_guard<std::mutex> lg(m_mutex);
workers = std::move(m_workers);
}
for (auto& thread : workers) {
thread.join();
}
}
(This is only a partial solution; you'll have to add a condition variable or similar to make sure that future calls to joinAll
don't return before this one joins all its threads).
You document WorkQueue::WorkQueue
, saying that numWorkers < 0
causes autodetection, but you autodetect whenever numWorkers < 1
. That is, your documentation and implementation differ on behavior when numWorkers == 0
.
Why, when autodetecting, do you create one more thread than the number of concurrent threads the hardware supports? Is this just to avoid special-casing the potential return of 0
?
I suggest reserving space for m_workers
: m_workers.reserve(numWorkers);
. It's quite minor in this case but a good habit.
When initializing m_workers
, you use m_workers.emplace_back(std::thread(...))
. I would either use m_workers.emplace_back(...)
or m_workers.push_back(std::thread(...))
(and I would prefer the first). emplace_back
takes constructor arguments for the element type, and push_back
has an rvalue-reference overload.
In stop
, I would avoid setting m_finish_work
; if abort
has already been called, I don't think you want to override that setting, and if it hasn't, m_finish_work
is already true.
I certainly agree with Nobody's advice to (a) extract execute_and_set_data
and (b) make submit a template parameterized by the type of function
rather than coercing it to std::function<T()>
. I also agree you should introduce a struct
with named fields rather than using std::pair
.
submit
should return future
, not std::move(future)
. Returning a local variable from a function already moves it, and explicitly calling std::move
inhibits NRVO. Alternatively, return data->first.get_future()
; you never refer to future
outside of its definition and the return line (maybe this is an artifact from an earlier version of the code in which you moved the promise
away?).
I'd like to see a little more use of auto
. auto
can make code easier to read and modify by reducing the clutter of redundant code. Examples:
std::shared_ptr<pair_t> data = std::make_shared<pair_t>(...);
You're writing the type twice within a few characters.std::make_shared<T>
is a well-known function that always returnsstd::shared_ptr<T>
; when you specify the type of the variable you're storing it in, you're forcing me to spend brainpower making sure the variable's type makes sense (Is it the same? Is it maybe a base class pointer?).std::future<T> future = data->first.get_future();
This is preventing you from templating on the function type rather than on the return type, making your code a bit less generic.std::function<void()> work(std::move(m_work.front()));
There's a lot going on here; I think it reads a lot better asauto work = std::move(m_work.front());
The only point that I disagree with Nobody is on some of the naming comments. m_work
, m_mutex
, and m_workers
all seem like fine names. However, I would certainly add comments to m_work
and m_workers
indicating that they should only be read or written when holding m_mutex
's lock and to m_signal
indicating that it should only be signaled or waited on when holding m_mutex
's lock.
-
\$\begingroup\$ Thanks for the comments! Locking
m_mutex
injoinAll
will create a dead-lock as the threads try to reacquire the mutex. I would need another mutex form_threads
for that. The documentation for the constructor is out of date, I caught that soon afterwards :3 It is a general rule of thumb to havenumcpu+1
worker threads for high utilization of CPU resources. The +1 is so that there is work to be executed on the CPU while one other thread is blocking on IO. But not +n>1 to avoid frequent thread context switching. That and it does nicely dodge the potential return of 0 ;) \$\endgroup\$Emily L.– Emily L.2014年08月19日 10:33:50 +00:00Commented Aug 19, 2014 at 10:33 -
\$\begingroup\$ Oops, you're right about the deadlock; I've edited to remove that advice and suggest an alternative. \$\endgroup\$ruds– ruds2014年08月19日 12:04:05 +00:00Commented Aug 19, 2014 at 12:04
-
\$\begingroup\$ An alternative to the
joinAll
I've suggested is simply to add a secondstd::mutex
to the class to protectm_workers
. \$\endgroup\$ruds– ruds2014年08月19日 13:23:01 +00:00Commented Aug 19, 2014 at 13:23 -
\$\begingroup\$ std::move leaves the vector in an unspecified, destructible state. More specifically std::move is not required to make it so that (
empty()
returns true) on the moved container which could cause problems for other threads entering the function. A better solution would be to useworkers.swap(m_workers)
. Regardless, this solution tojoinAll
changes semantics of the function to no longer block until all threads are joined (only the first entry will block). I believe the second mutex is the only choice to preserve semantics. \$\endgroup\$Emily L.– Emily L.2014年08月19日 13:40:05 +00:00Commented Aug 19, 2014 at 13:40 -
\$\begingroup\$ @EmilyL. You're right, that will break the assertion, but I don't think that otherwise your class's semantics require that the vector be empty. \$\endgroup\$ruds– ruds2014年08月19日 13:43:19 +00:00Commented Aug 19, 2014 at 13:43
Explore related questions
See similar questions with these tags.