awhile ago I asked about a thread pool implementation I had made. I took the advice down and modified it a lot over time to fit my needs. It has been serving me extremely well in all of my projects but I was wondering if there was anything else I could improve on?
Here is the class:
class ThreadPool{
public:
/* Constructor */
ThreadPool(uint8_t numThreads) {
assert(numThreads > 0);
createThreads(numThreads);
}
//Destructor
~ThreadPool(){
shutdown = true;
notifier.notify_all();
for(int i = 0; i < threads.size(); ++i){
threads[i].join();
}
}
//add any arg # function to queue
template <typename Func, typename... Args >
auto push(Func&& f, Args&&... args){
//get return type of the function
typedef decltype(f(args...)) retType;
std::packaged_task<retType()> task(std::bind(f, args...));
std::future<retType> future = task.get_future();
{
// lock jobQueue mutex, add job to the job queue
std::unique_lock<std::mutex> lock(JobMutex);
jobQueue.emplace(std::packaged_task<void()>(std::move(task)));
}
notifier.notify_one();
return future;
}
private:
std::vector<std::thread> threads;
std::queue<std::packaged_task<void()>> jobQueue;
std::condition_variable notifier;
std::mutex JobMutex;
std::atomic<bool> shutdown = false;
void createThreads(uint8_t numThreads) {
auto threadFunc = [this]() {
while (true) {
std::packaged_task<void()> job;
{
std::unique_lock<std::mutex> lock(JobMutex);
notifier.wait(lock, [this] {return !jobQueue.empty(); });
if(shutdown){
break;
}
//strange bug where it will continue even if the job queue is empty
if (jobQueue.size() < 1)
continue;
job = std::move(jobQueue.front());
jobQueue.pop();
}
job();
}
};
threads.reserve(numThreads);
for (int i = 0; i != numThreads; ++i) {
threads.emplace_back(std::thread(threadFunc));
}
}
ThreadPool(const ThreadPool& other) = delete;
void operator=(const ThreadPool& other) = delete;
}; /* end ThreadPool Class */
2 Answers 2
You have a logical bug in the code. Imagine all jobs are finished and then shutdown is called. You'll never leave the notifier.wait(...)
.
Also, if a job exists but shutdown is called you quit instantly without doing it - I do not think that this is a good idea. I think you should finish all jobs prior to quitting. Make a special hard reset routine for the immediate exit. Though, it probably requires usage of exceptions.
Also, consider what will happen if an exception is called inside the function f
. In the thread pool one of the threads will die and kill the whole program - most likely. It is not good. It should somehow forward the error to the caller. To this end I have thread pool functionality divided - made a supplementary class for user that wraps whatever additional functionality is needed.
Technical issues:
std::bind(f, args...)
should bestd::bind(f, std::forward<Args>(args)...)
to ensure perfect forwarding as otherwise you might end up with unnecessary copying of data.numThreads
should be defaulted tostd::thread::hardware_concurrency
and I believe anduint8_t
is getting too small for some of the latest processors. Additionally the ThreadPool should have the functionality to tell to its user how many threads it has.I am not too fond of
std::future
- it is somewhat incomplete as Executors TS is still incomplete. Normally I'd expect that user would want to call thepush
task several times to complete a single operation. In which, case you generate multiplestd::future<void>
. It would be better to somehow to make it into a single wait instead of several. But I think I might be nitpicking - honestly I'd wait for Executor TS near completion and try to implement or get from somewhere something with the same interface as long as Executor TS is unavailable.The declarations
ThreadPool(const ThreadPool& other) = delete; void operator=(const ThreadPool& other) = delete;
are redundant as the class already has a std::mutex
that makes the whole class non-copyable and non-movable. Yeah, it isn't important.
-
\$\begingroup\$ In the first situation you mentioned, notifier.notify_all() in the destructor would break out of the notifier.wait() right? For the second issue where there is still a job that hasn't been executed, I can modify the if (shutdown) to also check if there is still a job in the queue. That statement has to stay in the case that the job queue is empty. Ill make sure to properly forward the args now. As for hardware_concurrency, I thought of doing this but I thought giving default values would not be a good idea especially since hardware_concurrency can fail. Ill look into executors! \$\endgroup\$Paul– Paul2020年04月21日 21:57:38 +00:00Commented Apr 21, 2020 at 21:57
-
\$\begingroup\$ Thanks for the info! Ive got some stuff to add \$\endgroup\$Paul– Paul2020年04月21日 21:57:53 +00:00Commented Apr 21, 2020 at 21:57
-
\$\begingroup\$ @Paul
notify_all()
won't break the wait. It'll only tell them to recheck the condition which is still false. \$\endgroup\$ALX23z– ALX23z2020年04月22日 06:25:46 +00:00Commented Apr 22, 2020 at 6:25
Code Review:
Please put your code in a namespace.
ThreadPool(uint8_t numThreads) {
assert(numThreads > 0);
This is dangerous and will never assert!
The input type here uint8_t
which means it is not signed and thus can only be positive (so the only number that it will assert on is if numThreads
is zero.
The problem is that C++ integer type conversion will "automatically" convert a negative signed number into a unsigned value (which will usually result in a very large positive value).
ThreadPool pool(-10); // Will compile.
// Will not assert as -10 is a signed value and will
// be auto converted to unsigned at which point it
// will be positive and pass the assert test.
Here is a good place to use the range based for:
for(int i = 0; i < threads.size(); ++i){
threads[i].join();
}
for(auto& thread: threads){
thread.join();
}
The use of typedef
has been replaced by the using
declaration:
//get return type of the function
typedef decltype(f(args...)) retType;
// Rather use this:
using retType = decltype(f(args...));
Not in favor of using decltype()
here as you are getting the type of the function (not the return type). Then getting the return type by effectively inferring the calling.
typedef decltype(f(args...)) retType;
std::packaged_task<retType()> task(std::bind(f, args...));
^^^^^^^^^. Inferred function call
There are actually templates to extract this value directly from the type:
using RetType = invoke_result_t<Func, Args....>;
Still on the same statement. It is sort of standard practice for user defined types retType
in the case to have an initial uppercase letter. This allows you to easily see types over objects.
If you set the shutdown
flag this still never leaves the wait()
unless there are also objects in the jobQueue
.
notifier.wait(lock, [this] {return !jobQueue.empty(); });
if(shutdown){
break;
}
You need to test for the shutdown inside the
wait()` test method.
notifier.wait(lock, [this] {return !jobQueue.empty() || shutdown; });
if(shutdown){
break;
}
This is worrying. Though I can't spot the issue:
//strange bug where it will continue even if the job queue is empty
if (jobQueue.size() < 1)
continue;
This seems to be wrong advice.
I though function return values were already R-Values.
Where is my thinking going wrong?
(削除)
You don't need the std::move()
here.
job = std::move(jobQueue.front());
(削除ここまで)
When you are using emplace()
you are building the object the container holds by using its constructor.
So here you are creating a temporary object then call the move constructor to move the thread into the container. But you don't need to constructor the temporary thread object as the threadFunc will be forwarded to the thread constructor in the object.
threads.emplace_back(std::thread(threadFunc));
// Remove the thread
threads.emplace_back(threadFunc);
-
\$\begingroup\$ Thanks for the notes! Ill get on adding that stuff. One issue though
job = std::move(jobQueue.front());
When I remove the std::move it tells me that the copy constructor is a deleted function \$\endgroup\$Paul– Paul2020年04月21日 22:22:32 +00:00Commented Apr 21, 2020 at 22:22 -
\$\begingroup\$ OK. Maybe I was wrong. I'll delete that. But I though a function result was already an r-value. \$\endgroup\$Loki Astari– Loki Astari2020年04月21日 23:25:57 +00:00Commented Apr 21, 2020 at 23:25
-
\$\begingroup\$ @MartinYork function result here would be an L-value reference. \$\endgroup\$ALX23z– ALX23z2020年04月22日 06:37:04 +00:00Commented Apr 22, 2020 at 6:37
-
\$\begingroup\$ @ALX23z: Well obviously they are :-) (otherwise it would compile). But why. I was under the impression that all function results are r-value references (otherwise how does
std::move()
work? Ahh it returns by r-value reference. So all return by value and explicit r-value references are r-values. \$\endgroup\$Loki Astari– Loki Astari2020年04月22日 09:57:44 +00:00Commented Apr 22, 2020 at 9:57 -
\$\begingroup\$ @ALX23z: Well obviously they are :-) (otherwise it would compile). But why. I was under the impression that all function results are r-value (as they are un-named objects) references, otherwise how does
std::move()
work? Ahh it returns by r-value reference. So all return by value and explicit r-value references are r-values. I am going to have to go look up the whole l/pr/x/gl/r value topic. \$\endgroup\$Loki Astari– Loki Astari2020年04月22日 10:08:49 +00:00Commented Apr 22, 2020 at 10:08