Here's a rather hastily made thread pool to suit my requirements and even for my future projects. It is working fine and executing tasks as intended but I want to improve more and I am not sure if it's the proper way to design a thread-pool as I am a self taught enthusiast, and rather weak as a language lawyer.
Tasks are stored in the queue. They are executed in order only when an idle thread is available from the pool. New tasks can be pushed into the queue. To initialize the pool, one has to specify the total number of threads and a bool
variable which will be used to terminate the pool and threads gracefully.
#include "stdafx.h"
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <queue>
#include <chrono>
#include <mutex>
#include <condition_variable>
using namespace std;
typedef function<void(void)> task_t;
class thread_t
{
public:
thread_t(int id, bool& running, condition_variable& cv)
:id_(id)
, running_(running)
, idle_notify_cv_(cv)
{
idle_ = true;
thread_ = new thread([=]() { run(); });
}
~thread_t()
{
notify();
cout << id_ << " stopping \n";
thread_->join();
}
void push(task_t task)
{
task_ = task;
idle_ = false;
cv_.notify_one();
}
void notify()
{
cv_.notify_all();
}
bool is_idle() const
{
return idle_;
}
int get_id() const
{
return id_;
}
private:
void run()
{
cout << id_ << " starting \n";
while (running_)
{
unique_lock<mutex> lock(mu_);
cv_.wait(lock, [=]() { return idle_ == false || !running_; });
if (!running_) return;
task_();
cout << id_ << " :work done \n";
idle_ = true;
idle_notify_cv_.notify_all();
}
}
private:
condition_variable& idle_notify_cv_;
mutex mu_;
condition_variable cv_;
task_t task_;
thread* thread_;
bool idle_;
int id_;
bool& running_;
};
class pool
{
public:
pool(int n, bool& running)
:nthreads_(n)
,running_(running)
{
if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;
for (int i = 0; i < n; i++)
{
threads_.push_back(make_unique<thread_t >(i, running_, idle_notify_cv_));
}
pool_thread_ = new thread([=]() { run(); });
}
void push(task_t task)
{
unique_lock<mutex> lock(write_queue_mu_);
tasks_.push(task);
idle_notify_cv_.notify_one();
}
int get_idle()
{
for (int i = 0; i < nthreads_; i++)
{
if (threads_[i]->is_idle())
{
return i;
}
}
return -1;
}
void run()
{
cout << " pool thread started \n " ;
while (running_)
{
int idle;
if (!tasks_.empty() && (idle = get_idle()) != -1)
{
unique_lock<mutex> lock(write_queue_mu_);
idle_notify_cv_.wait(lock, [=]() { return idle != -1 || !running_; });
if (!running_) return;
auto task = tasks_.front();
tasks_.pop();
lock.unlock();
cout << " thread# " << threads_[idle]->get_id() << " assigned a task \n";
threads_[idle]->push(task);
}
}
}
~pool()
{
pool_thread_->join();
cout << " thread pool destroyed \n ";
}
private:
mutex write_queue_mu_;
queue<task_t> tasks_;
vector<unique_ptr<thread_t>> threads_;
int nthreads_;
bool& running_;
condition_variable idle_notify_cv_;
thread* pool_thread_;
};
int main()
{
bool running = true;
pool pool1(2, running);
task_t task1 = []()
{
this_thread::sleep_for(chrono::seconds(2s));
cout << " Task 1 executed \n";
};
task_t task2 = []()
{
this_thread::sleep_for(chrono::seconds(1s));
cout << " Task 2 executed \n";
};
task_t task3= []()
{
this_thread::sleep_for(chrono::seconds(2s));
cout << " Task 3 executed \n";
};
task_t task4 = []()
{
this_thread::sleep_for(chrono::seconds(1s));
cout << " Task 4 executed \n";
};
pool1.push(task1);
pool1.push(task2);
pool1.push(task3);
pool1.push(task4);
this_thread::sleep_for(chrono::seconds(5s));
running = false;
return 0;
}
-
2\$\begingroup\$ Welcome to CodeReview. Please do not add, remove, or edit code in a question after you've received an answer. The site policy is explained in What to do when someone answers. Since I've removed that section from my UB section it's fine since nothing was invalidated, but please keep that in mind for future questions. \$\endgroup\$Zeta– Zeta2018年03月17日 02:18:14 +00:00Commented Mar 17, 2018 at 2:18
-
\$\begingroup\$ @Zeta: Ok I got it, new to codereview \$\endgroup\$ark1974– ark19742018年03月17日 02:19:37 +00:00Commented Mar 17, 2018 at 2:19
-
\$\begingroup\$ Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers . \$\endgroup\$Mast– Mast ♦2018年03月17日 08:05:43 +00:00Commented Mar 17, 2018 at 8:05
-
\$\begingroup\$ calls new but not delete! \$\endgroup\$Loki Astari– Loki Astari2018年03月19日 03:45:51 +00:00Commented Mar 19, 2018 at 3:45
-
\$\begingroup\$ has pointers but does not follow rule of three \$\endgroup\$Loki Astari– Loki Astari2018年03月19日 03:46:16 +00:00Commented Mar 19, 2018 at 3:46
3 Answers 3
Undefined behaviour
Your code contains undefined behaviour due to data races. The read-access on thread_t::idle_
in thread_t::is_idle
isn't synchronized with possible write-access in thread_t::run
. That's a data race. Same holds for thread_t::push
. I can push
a new task while the old one is currently processed.
Don't use using namespace std
Do not use using namespace std
if your names are likely to collide with the ones from the standard library, and never use it in a header. You had to use thread_t
since std::thread
already exists. However, names that end with _t
are reserved by POSIX. That's usually ignored, though.
Overall design
Your overall design seems sound, but some functions seem strange. Why is thread_t::notify()
part of the public interface? Does pool::get_idle
return the number of idle threads or the first id? Should a user be able to call pool::run()
or thread_t::run()
?
Move those functions into the private
section. Make your classes easy to use and hard to misuse.
By the way, pool::run
might contain a bug. After idle != -1
(in if
), the lambda takes idle
by copy ([=]
). idle
's value will never change at that point, so the check in the lambda is superfluous.
Delete forbidden functions
thread_t
's copy constructor should get explicitly deleted, as well as its copy assignment. Depending on your use case, you maybe even want to prevent moves:
thread_t(thread_t&&) = delete;
thread_t(const thread_t&) = delete;
thread_t& operator=(thread_t&&) = delete;
thread_t& operator=(const thread_t&) = delete;
Add documentation
This depends on whether you want to re-use your code in a (future) project, but what is currently clear to you might not be as clear in some days/weeks/months. While we're at it, use a code formatter. Some of your single-statement if
s have braces, other's dont, e.g.
// nice
if (threads_[i]->is_idle())
{
return i;
}
vs
// not so nice :(
if (n > std::thread::hardware_concurrency()) nthreads_ = n = std::thread::hardware_concurrency()-1;
Other than that you stuck to a single indentation/braces style, which is great.
Reduce complexity
You have both idle_notify_cv_
as a reference and cv_
as a referee. I guess you had two std::condition_variable
s at first and then removed one. If that's the case, I suggest you to remove one of them next time. The compiler will tell you where the variable has been used and you can do a case-by-case decision whether it still needs to get checked or can get removed.
Split functionality
This is likely a toy program for you to fiddle with std::thread
, but for a library split your functionality into several files, e.g.
thread.h
thread.cpp
thread_pool.h
thread_pool.cpp
If you split your implementation from your header, the recompilation time should get down a lot if your application grows in size. It won't be noticeable on your current program, though.
-
\$\begingroup\$ Thanks for the quick inputs I will definitely work on that. \$\endgroup\$ark1974– ark19742018年03月17日 02:10:05 +00:00Commented Mar 17, 2018 at 2:10
-
\$\begingroup\$ Are you sure about UB? There is
idle = get_idle()
assignment. \$\endgroup\$vnp– vnp2018年03月17日 02:11:36 +00:00Commented Mar 17, 2018 at 2:11 -
\$\begingroup\$ @vnp missed that. Didn't expect a side-effect in that
if
. However, thenidle
will never be-1
since it's taken by copy. Hm.... \$\endgroup\$Zeta– Zeta2018年03月17日 02:13:47 +00:00Commented Mar 17, 2018 at 2:13 -
\$\begingroup\$ @Zeta: All your inputs are solid, can't be more better, I am super excited . \$\endgroup\$ark1974– ark19742018年03月17日 02:28:43 +00:00Commented Mar 17, 2018 at 2:28
-
\$\begingroup\$ thread_t::notify() signals the thread-pool that an executor is over and if tasks are in queue, they can be executed also. \$\endgroup\$ark1974– ark19742018年03月17日 02:50:48 +00:00Commented Mar 17, 2018 at 2:50
This is a test analysis of the above thread pool by pushing 100 tasks in the pool.
The result shows multi-threaded is much faster (x10 in this example) than ordinary function call , but in some simple cases it may vary. I want to reserve 1-2 threads in the pool for critical time bound tasks in the next step.
I can't edit my post, but std::this_thread::sleep_for(std::chrono::milliseconds(1))
may inserted in pool::run() while loop to prevent blocking during task push
int main()
{
bool running = true;
thread_pool pool(10, running);
task_t task = []()
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
};
auto start = std::chrono::system_clock::now();
for(int i= 0;i<100;i++)
pool.push_task(task);
while (!pool.empty())
{
std::this_thread::sleep_for(std::chrono::nanoseconds(30));
}
auto stop = std::chrono::system_clock::now();
std::cout <<"Multithreaded took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";
start = std::chrono::system_clock::now();
for (int i = 0; i<100; i++)
task();
stop = std::chrono::system_clock::now();
std::cout <<"Plain function took "<< (std::chrono::duration_cast<std::chrono::milliseconds>(stop - start)).count()/1000 << " sec\n";
running = false;
return 0;
}
Here is the interesting result ( excluding pool setup time): enter image description here
-
\$\begingroup\$ As the sleep does no work, it allows the core to be assigned to a new thread. Therefore all 100 sleeps can run in parallel (as no work is being done any of them). Therefore an optimal implementation should finish in slightly over 100 ms. A run time of 1 second is extremely slow in my opinion for this test. Just saw that you only use 10 threads. So you get the expected result for doing no work (as it can easily run 10 no work threads in parallel). I don't think this test proves much. \$\endgroup\$Loki Astari– Loki Astari2018年03月19日 04:01:59 +00:00Commented Mar 19, 2018 at 4:01
One very important issue hasn't been mentioned so far. You should definitively redesign the active wait loop when waiting for a thread in run
method:
void run()
{
while (running_)
{
if (!tasks_.empty() && (idle = get_idle()) != -1)
{
// ...
}
}
}
This active loop consumes CPU time when there is a task pending but no available worker thread. So not only all the workers are busy doing their work, but you also keeps busy the dispatcher. Better approach is to use some signalling mechanism that will pause your run
method and resume when some of tasks completes.
And one more detail. Passing of reference running
flag to pool
constructor is basically not a good idea. Please note if you pass false
to constructor and later you decide to activate your pool by changing the reference, it will silently fail - nothing starts run
method again. You should consider implementing the enable/disable logic in public methods rather than simple boolean flag.
-
\$\begingroup\$ Nice catch. I miss that. \$\endgroup\$ark1974– ark19742018年03月19日 18:15:23 +00:00Commented Mar 19, 2018 at 18:15