Skip to main content
Code Review

Return to Answer

replaced http://stackoverflow.com/ with https://stackoverflow.com/
Source Link

Condition variables provide a way to get notified instead of actively checking conditions. They have to be used with a mutex, so they have a overhead compared to atomic operations. Still, they have their use have their use.

Condition variables provide a way to get notified instead of actively checking conditions. They have to be used with a mutex, so they have a overhead compared to atomic operations. Still, they have their use.

Condition variables provide a way to get notified instead of actively checking conditions. They have to be used with a mutex, so they have a overhead compared to atomic operations. Still, they have their use.

Source Link
Zeta
  • 19.6k
  • 2
  • 57
  • 90

A nice way to warm your cup of tea

Are there any problems with implementing a reusable thread this way?

Yes. Let's have a look at a slightly different main:

int main()
{
 Reusable_Thread thread;
 
 std::this_thread::sleep_for(std::chrono::seconds(10));
 return 0;
}

This should just wait for 10 seconds and then exit, right?

$ g++ reusablethread.cc -std=c++11 -pthread -o demo
$ /usr/bin/time ./demo
10.00user 0.00system 0:10.00elapsed 99%CPU (0avgtext+0avgdata 3148maxresident)k
0inputs+0outputs (0major+129minor)pagefaults 0swaps

Nope. It actually runs full throttle. This is due to your choice of

void thread_worker()
{
 while (!m_thread_quit)
 {
 if (!m_thread_pause)
 {
 /* ... */
 }
 }
}

This isn't the fault of std::atomic<>, but your design. If we don't have something to do, we shouldn't check immediately afterwards. Instead, we should wait. More on that later.

howdoiturnthison

Your class is currently missing features to toggle m_thread_pause, which makes it hard to reason about other behaviour. I guess you're just missing m_thread_pause = false in set_work, e.g.

bool set_work(const std::function<void()>& work_func)
{
 if (get_readiness())
 {
 m_work_func = work_func;
 m_thread_pause = false; // here, but wrong, see below
 return true;
 }
 else
 {
 return false;
 }
}

Furthermore, the name get_readiness is misleading. What is ready? The internal thread? Is the work slot free? is_paused is much better.

Race condition m_work_func and m_thread_pause

Assume we have our Reusable_Thread rthread and it's currently paused. We have access on our rthread from two threads A and B. Both call set_work. What happens?

void foo(){ puts("foo"); }
void bar(){ puts("bar"); }
void thread_Alice(Reusable_Thread & rthread) {
 rthread.set_work(foo);
}
void thread_Bob(Reusable_Thread & rthread) {
 rthread.set_work(bar);
}
int main(){
 Reusable_Thread rthread;
 std::thread A(thread_Alice, std::ref(rthread));
 std::thread B(thread_Alice, std::ref(rthread));
 A.join();
 B.join();
 std::this_thread::sleep_for(std::chrono::seconds(1));
 return 0;
}

It's certainly possible that the result is foo. It's also possible that the result is bar. But it's also possible that it's neither and your program crashes:

  1. We create the std::thread in rthread, we call it R
  2. We create the std::threads A and B. Neither run yet.
  3. A gets scheduled. It calls set_work and:
  • checks m_thread_pause (get_readiness())
  • sets the worker to foo
  • yields due to scheduling
  1. B gets scheduled. It calls set_work and
  • checks m_thread_pause (get_readiness(), still true)
  • yields
  1. A gets scheduled. It resumes the execution and
  • sets m_thread_pause to false
  • returns

A's thread of execution has ended. 5. R gets scheduled. It

  • checks m_thread_quit (is false)
  • checks m_thread_pause (is false due to A)
  • starts reading the std::function<> m_work_fun.
  • yields while reading
  1. B gets scheduled. It finishes its call to set_work:
  • sets the worker to bar, although R currently reads from it
  • yields
  1. R gets scheduled. It finishes reading m_work_fun and ends up with a corrupt std::function<...> and uses operator(). Your program crashes.
  2. Even if your program didn't crash, after R sets m_thread_pause to true, B gets scheduled and finishes set_work, thus setting m_thread_pause to false and the corrupt function gets called again.

So depending on where the threads get interrupted, you end up with either:

  • bar
  • foo
  • bar bar
  • foo foo
  • foo bar
  • bar foo
  • *crash*

So, how does one get rid of this? Usually, you would use std::atomic::exchange or std::atomic::compare_exchange on m_thread_pause:

bool set_work(const std::function<void()>& work_func)
{
 if (m_thread_pause.exchange(false)) // old value was true
 {
 m_work_func = work_func;
 return true;
 }
 else
 {
 return false;
 }
}

But that doesn't work either, because R could start using m_work_func before it's actually set. The underlying problem here is that m_work_func and m_thread_pause are intertwined.

Third wheel to the rescue

We can decouple this by using a third atomic<bool>, that checks whether the function can be set safely:

bool set_work(const std::function<void()>& work_func)
{
 if (!m_function_ready && m_thread_pause.exchange(false))
 {
 m_work_func = work_func;
 m_function_ready = true;
 return true;
 }
 ...
}

Remember, only one thread will see m_thread_pause as true, due to the atomic exchange, so we're save from the previous A/B race.

void thread_worker()
{
 while (!m_thread_quit)
 {
 if (!m_thread_pause && m_function_ready)
 {
 m_work_func();
 m_thread_pause = true;
 m_function_ready = false;
 }
 }
}

Here, both the m_thread_pause and m_function_ready are checked, and the checks make sure that our function is actually set.

The memory ordering std::memory_order_seq_cst could be slightly relaxed (not to be confused with memory_order_relaxed!), but it's not trivial to get this right.

However, remember the problem in the first section of this review? You're still using active polling. A small sleep can help here:

void thread_worker()
{
 while (!m_thread_quit)
 {
 if (!m_thread_pause && m_function_ready)
 {
 m_work_func();
 m_thread_pause = true;
 m_function_ready = false;
 } else {
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 } 
 }
}

Note that still uses more CPU power than it has to. Which brings us to condition variables.

Wait until notified

Condition variables provide a way to get notified instead of actively checking conditions. They have to be used with a mutex, so they have a overhead compared to atomic operations. Still, they have their use.

In this case, you would add a std::mutex and a std::condition_variable to your class:

private:
 std::mutex m_mtx;
 std::condition_variable m_cv;

In set_work, you notify all thread that are currently waiting:

bool set_work(const std::function<void()>& work_func)
{
 if (!m_function_ready && m_thread_pause.exchange(false))
 {
 m_work_func = work_func;
 m_function_ready = true;
 m_cv.notify_all(); // <- notify all waiting threads
 return true;
 }
 ...
}

And worker_thread, you wait if m_thread_pause is false:

void thread_worker()
{
 std::unique_lock<std::mutex> lock(m_mtx); // <- lock the mutex
 while (!m_thread_quit)
 {
 if (!m_thread_pause && m_function_ready)
 {
 m_work_func();
 m_thread_pause = true;
 m_function_ready = false;
 } else {
 m_cv.wait(lock); // <- wait
 } 
 }
}

Note that the std::unique_lock is placed outside the while loop to keep the number of lock aquires/releases to a minimum.

However, we have to make sure that the thread wakes up when we want to stop it, so we have to change the destructor slightly:

 ~Reusable_Thread()
 {
 m_thread_quit = true;
 m_cv.notify_all();
 m_thread.join();
 }

TL;DR

Make sure that you don't introduce race conditions and avoid while(true){if(false){ ... }}. Use proper names for methods and check the behavior on non-trivial examples.

lang-cpp

AltStyle によって変換されたページ (->オリジナル) /