Skip to main content
Code Review

Return to Answer

Commonmark migration
Source Link

###A simple Barrier

A simple Barrier

###A simple Barrier

A simple Barrier

deleted 3 characters in body
Source Link
Loki Astari
  • 97.7k
  • 5
  • 126
  • 341

On startup a lot of the code is there to dealsdeal with making sure that threads don't start work until the SimpleWorkQueue object is correctly initialized.

On startup a lot of the code is there to deals with making sure that threads don't start work until the SimpleWorkQueue object is correctly initialized.

On startup a lot of the code is there to deal with making sure that threads don't start work until the SimpleWorkQueue object is correctly initialized.

Source Link
Loki Astari
  • 97.7k
  • 5
  • 126
  • 341

On startup a lot of the code is there to deals with making sure that threads don't start work until the SimpleWorkQueue object is correctly initialized.

This work is not related to the work queue and should be factored out into its own class.

The concept of a barrier was mentioned in the question comments. So here it is. A barrier is a thread concept that blocks all threads that check-in until the specified number of threads have all arrived then they are all released simultaneously. This is often used for synchronization.

###A simple Barrier

 // This is a one time use barrier.
 // Often these are written to be re-used. But that becomes harder.
 // On reset you have to guarantee that all the old threads were flushed
 // before you allow new threads to check-in.
 class Barrier
 {
 std::mutex& lock;
 int count;
 std::condition_variable threadBarrier;
 public:
 Barrier(std::mutex& m, int count) : lock(m), count(count) {}
 void checkin()
 {
 std::unique_lock<std::mutex> locker(lock);
 --count;
 if (count > 0)
 {
 threadBarrier.wait(locker, [&count, this](){
 return count <= 0;
 });
 }
 else
 {
 threadBarrier.notify_all();
 }
 }
};
 

Now the startup code is much simplified:

SimpleWorkQueue::SimpleWorkQueue(int initialThreadCount)
 : stopping(false)
 , finished(false)
 , threadBarrier(lock, initialThreadCount + 1)
{
 workers.reserve(initialThreadCount);
 for(int loop=0;loop < initialThreadCount;++loop)
 {
 /*
 * Create all the threads we want in this loop.
 * For each thread we also add a job so that we
 * make sure it starts up correctly.
 *
 * Note: Once a thread is in this piece of work it will
 * not leave until all threads have checked in.
 */
 work.push_back([&threadBarrier]()
 {
 threadBarrier.checkin();
 });
 // Create a thread to deal with the job we just started.
 workers.emplace_back(&SimpleWorkQueue::workerAction, this);
 }
 // Release all the threads.
 // Once they have entered the main loop
 // And checked into the barrier.
 threadBarrier.checkin();
}

It looks like you are locking the same mutex twice in tellAllThreadsToStop(). You can not use a normal mutex and lock it twice for this you need a recursive lock (this will allow the same thread to lock a mutex more than once).

void SimpleWorkQueue::tellAllThreadsToStop()
{
 std::unique_lock<std::mutex> locker(lock); // <- Lock
 stopping = true;
 for(std::thread& thread: workers)
 {
 work.push_back([this](){
 std::unique_lock<std::mutex> locker(this->lock); // <- Lock
 this->finished = true;
 });
 }
}

Luckily this is not the case here. The second lock. Is inside a lambda function. Thus not in the same context as the original lock. We are pushing a piece of work into the queue. When this piece of work is executed by a child thread it needs to alter the state of the current object (multiple threads altering state must be done inside a mutal exclusion zone and thus a lock is required).

 work.push_back([this](){
 std::unique_lock<std::mutex> locker(this->lock);
 this->finished = true;
 });

Another thing pointed out in the comments is that all these jobs set finished to true. This seems like a waste as this will be done by each thread (ie initialThreadCount times).

An alternative is to set finished to true in one job and then add initialThreadCount-1 jobs that did nothing. This would have worked just as well and not have been wasteful in attaining the lock for each thread.

Initially this may seem true but this argument means you are thinking serially about the code. Once the job is picked up by a thread it may be unscheduled at the hardware/OS level and a bunch of other threads are then executed (so potentially all the do nothing jobs may finish executing before the job to set finished to true even starts executing (even if it is pulled from the job queue first).

So because we can not guarantee that any particular job will finish before any other job. They must all set finished to true (or be forced to wait at some barrier).

You are accessing mutable state without locking it:

void SimpleWorkQueue::workerAction()
{
 while(!finished) // <- finished can be mutated by a child.
 {
 }
}

Upps. That is definitely a bug. Because other threads can modify finished it must be accessed after a memory barrier to force synchronization across threads. There are a couple of alternatives.

  • We could lock the mutext lock
  • We could make finished an atomic

But I think an easier way is to alter the shut down code.

The reason finished is set inside a work item is that if we set finished inside tellAllThreadsToStop() then threads will start to exit the main loop in workerAction() as soon as finished is set to true; even if there is still plenty of work to do.

But we see from the discussion above that we don't really want to modify finished in a child thread as that adds a whole set of problems. So an alternative is to alter the termination condition.

void SimpleWorkQueue::tellAllThreadsToStop()
{
 std::unique_lock<std::mutex> locker(lock);
 stopping = true;
 for(std::thread& thread: workers)
 {
 // TerminateThread is a private class
 // so nobody else can throw it.
 work.push_back([](){ throw TerminateThread(); });
 }
}
// Now our main thread loop.
// looks like this.
// Note: We can remove 'finished' from everywhere.
void SimpleWorkQueue::workerAction()
{
 while(true)
 {
 std::function<void()> item = getNextWorkItem();
 try
 {
 item();
 }
 catch(TerminateThread const& e)
 {
 // Break out of the while loop.
 // We threw because we wanted to terminate
 // and only our code can throw objects of this type.
 break;
 }
 catch(...)
 {
 // All other exceptions are user code generated.
 // Must be caught but can be ignored.
 // Though logging them is probably a good idea.
 }
 }
}
lang-cpp

AltStyle によって変換されたページ (->オリジナル) /