I'm learning about threading support in C++. I've got a basic understanding of thread-pools and why creating and destroying threads on some systems can be expensive. As far as I'm aware C++11 doesn't have any built in support for worker/background threads. std::thread
is only designed to perform one task which you set in the constructor; it then destroys the thread after the task is complete.
At the moment I have no use for a fully fledged thread-pool but I would like to be able to reuse a thread for different tasks after I create it. I've created a simple class that will accept any function or method wrapped in a lambda and will then execute it exactly once. The reusable thread object should not destruct until the work function is fully complete.
This is the implementation:
class Reusable_Thread
{
public:
Reusable_Thread()
: m_thread_pause(true), m_thread_quit(false),
m_thread(&Reusable_Thread::thread_worker, this)
{ }
~Reusable_Thread()
{
m_thread_quit = true;
m_thread.join();
}
bool get_readiness() const { return m_thread_pause; }
bool set_work(const std::function<void()>& work_func)
{
if (get_readiness())
{
m_work_func = work_func;
return true;
}
else
{
return false;
}
}
private:
std::atomic<bool> m_thread_pause;
std::atomic<bool> m_thread_quit;
std::thread m_thread;
std::function<void()> m_work_func;
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause)
{
m_work_func();
m_thread_pause = true;
}
}
}
};
And this is some example usage:
int main()
{
Reusable_Thread thread;
auto f1 = [&] () { /* do some work */ };
thread.set_work(f1);
while (thread.get_readiness() == false)
{
// do some other stuff
}
// set some new work
auto f2 = [&] () { /* do some new work */ };
thread.set_work(f2);
// maybe call f1 again, depends if f2 has finished yet
if (!thread.set_work(f1))
{
// thread too busy to call f1, call it on the main thread
f1();
}
// thread won't destruct until work has finished
return 0;
}
Are there any problems with implementing a reusable thread this way?
1 Answer 1
A nice way to warm your cup of tea
Are there any problems with implementing a reusable thread this way?
Yes. Let's have a look at a slightly different main:
int main()
{
Reusable_Thread thread;
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
This should just wait for 10 seconds and then exit, right?
$ g++ reusablethread.cc -std=c++11 -pthread -o demo
$ /usr/bin/time ./demo
10.00user 0.00system 0:10.00elapsed 99%CPU (0avgtext+0avgdata 3148maxresident)k
0inputs+0outputs (0major+129minor)pagefaults 0swaps
Nope. It actually runs full throttle. This is due to your choice of
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause)
{
/* ... */
}
}
}
This isn't the fault of std::atomic<>
, but your design. If we don't have something to do, we shouldn't check immediately afterwards. Instead, we should wait. More on that later.
howdoiturnthison
Your class is currently missing features to toggle m_thread_pause
, which makes it hard to reason about other behaviour. I guess you're just missing m_thread_pause = false
in set_work
, e.g.
bool set_work(const std::function<void()>& work_func)
{
if (get_readiness())
{
m_work_func = work_func;
m_thread_pause = false; // here, but wrong, see below
return true;
}
else
{
return false;
}
}
Furthermore, the name get_readiness
is misleading. What is ready? The internal thread? Is the work slot free? is_paused
is much better.
Race condition m_work_func
and m_thread_pause
Assume we have our Reusable_Thread rthread
and it's currently paused. We have access on our rthread
from two threads A
and B
. Both call set_work
. What happens?
void foo(){ puts("foo"); }
void bar(){ puts("bar"); }
void thread_Alice(Reusable_Thread & rthread) {
rthread.set_work(foo);
}
void thread_Bob(Reusable_Thread & rthread) {
rthread.set_work(bar);
}
int main(){
Reusable_Thread rthread;
std::thread A(thread_Alice, std::ref(rthread));
std::thread B(thread_Alice, std::ref(rthread));
A.join();
B.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
It's certainly possible that the result is foo
. It's also possible that the result is bar
. But it's also possible that it's neither and your program crashes:
- We create the
std::thread
inrthread
, we call itR
- We create the
std::thread
sA
andB
. Neither run yet. A
gets scheduled. It callsset_work
and:- checks
m_thread_pause
(get_readiness()
) - sets the worker to
foo
- yields due to scheduling
- checks
B
gets scheduled. It callsset_work
and- checks
m_thread_pause
(get_readiness()
, stilltrue
) - yields
- checks
A
gets scheduled. It resumes the execution and- sets
m_thread_pause
tofalse
returns
A
's thread of execution has ended.- sets
R
gets scheduled. It- checks
m_thread_quit
(isfalse
) - checks
m_thread_pause
(isfalse
due toA
) - starts reading the
std::function<>
m_work_fun
. - yields while reading
- checks
B
gets scheduled. It finishes its call toset_work
:- sets the worker to
bar
, althoughR
currently reads from it - yields
- sets the worker to
R
gets scheduled. It finishes readingm_work_fun
and ends up with a corruptstd::function<...>
and usesoperator()
. Your program crashes.- Even if your program didn't crash, after
R
setsm_thread_pause
totrue
,B
gets scheduled and finishesset_work
, thus settingm_thread_pause
tofalse
and the corrupt function gets called again.
So depending on where the threads get interrupted, you end up with either:
- bar
- foo
- bar bar
- foo foo
- foo bar
- bar foo
- *crash*
So, how does one get rid of this? Usually, you would use std::atomic::exchange
or std::atomic::compare_exchange
on m_thread_pause
:
bool set_work(const std::function<void()>& work_func)
{
if (m_thread_pause.exchange(false)) // old value was true
{
m_work_func = work_func;
return true;
}
else
{
return false;
}
}
But that doesn't work either, because R
could start using m_work_func
before it's actually set. The underlying problem here is that m_work_func
and m_thread_pause
are intertwined.
Third wheel to the rescue
We can decouple this by using a third atomic<bool>
, that checks whether the function can be set safely:
bool set_work(const std::function<void()>& work_func)
{
if (!m_function_ready && m_thread_pause.exchange(false))
{
m_work_func = work_func;
m_function_ready = true;
return true;
}
...
}
Remember, only one thread will see m_thread_pause
as true
, due to the atomic exchange, so we're save from the previous A
/B
race.
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause = true;
m_function_ready = false;
}
}
}
Here, both the m_thread_pause
and m_function_ready
are checked, and the checks make sure that our function is actually set.
The memory ordering std::memory_order_seq_cst
could be slightly relaxed (not to be confused with memory_order_relaxed
!), but it's not trivial to get this right.
However, remember the problem in the first section of this review? You're still using active polling. A small sleep
can help here:
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause = true;
m_function_ready = false;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
Note that still uses more CPU power than it has to. Which brings us to condition variables.
Wait until notified
Condition variables provide a way to get notified instead of actively checking conditions. They have to be used with a mutex, so they have a overhead compared to atomic operations. Still, they have their use.
In this case, you would add a std::mutex
and a std::condition_variable
to your class:
private:
std::mutex m_mtx;
std::condition_variable m_cv;
In set_work
, you notify all thread that are currently waiting:
bool set_work(const std::function<void()>& work_func)
{
if (!m_function_ready && m_thread_pause.exchange(false))
{
m_work_func = work_func;
m_function_ready = true;
m_cv.notify_all(); // <- notify all waiting threads
return true;
}
...
}
And worker_thread
, you wait if m_thread_pause
is false:
void thread_worker()
{
std::unique_lock<std::mutex> lock(m_mtx); // <- lock the mutex
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause = true;
m_function_ready = false;
} else {
m_cv.wait(lock); // <- wait
}
}
}
Note that the std::unique_lock
is placed outside the while
loop to keep the number of lock aquires/releases to a minimum.
However, we have to make sure that the thread wakes up when we want to stop it, so we have to change the destructor slightly:
~Reusable_Thread()
{
m_thread_quit = true;
m_cv.notify_all();
m_thread.join();
}
TL;DR
Make sure that you don't introduce race conditions and avoid while(true){if(false){ ... }}
. Use proper names for methods and check the behavior on non-trivial examples.
-
\$\begingroup\$ Howdoiturnthison made me laugh. The line is there in the original source but I accidentally removed it whilst deleting superfluous comments before posting. Your answer is a great help, I hadn't considered running threads on threads or that the scheduler might stop an assignment halfway through (with hindsight it makes sense, its why we have atomics). Condition variables are new to me but from what you've explained, doesn't it make
m_function_ready
redundant? It doesn't matter ifm_thread_pause
is true for a little while whilst we assignm_work_func
because the loop is locked. \$\endgroup\$Fibbs– Fibbs2016年07月08日 17:58:39 +00:00Commented Jul 8, 2016 at 17:58 -
\$\begingroup\$ After looking at the standard it turns out it does matter because
std::condition_variable
may spuriously wake up the thread. This would leave us back in the same position as the original code where the thread may be unpaused butm_work_func
may not be fully assigned. \$\endgroup\$Fibbs– Fibbs2016年07月08日 18:55:30 +00:00Commented Jul 8, 2016 at 18:55