A new version of this code based on the provided advice can be found here. Latest improvement is found here.
I made a simple ThreadPool
implementation in C++ using only atomics ( neither std::mutex
nor std::condition_variable
were used).
The idea is to have a threads_ready
increasing to threads.size()
until all threads are finished, and then back to 0 when all threads are ready to execute again. This was the simplest I could think of, that actually works in the given scenario.
#include <iostream>
#include <functional>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <iomanip>
#include <cassert>
#include <numeric>
using std::vector;
using std::function;
using std::thread;
using std::mutex;
using std::unique_lock;
using std::atomic;
class ThreadPool{
public:
ThreadPool(int number_of_threads, function<void(vector<double>&, int, int)> function)
: target_buffer(nullptr)
, worker_function(function)
, threads()
, threads_ready(0)
, state(IDLE_VALUE)
{
for(int i = 0; i < number_of_threads; ++i)
threads.push_back(thread(&ThreadPool::worker, this, i));
}
~ThreadPool(){
while(0 < threads_ready.load()); /* Wait until all pending operations finish */
state.store(END_VALUE);
while(0 < threads.size()){
if(true == threads.back().joinable())
threads.back().join();
threads.pop_back();
}
}
void start_and_block(vector<double>& buffer){
/* initialize, start.. */
target_buffer = &buffer;
state.store(START_VALUE);
/* wait until the work is done */
while(threads.size() > threads_ready.load());
state.store(IDLE_VALUE);
/* wait until the threads are available again */
while(0 < threads_ready.load());
}
private:
static const int IDLE_VALUE = 0;
static const int START_VALUE = 1;
static const int END_VALUE = 2;
vector<double>* target_buffer;
function<void(vector<double>&, int, int)> worker_function; /* buffer, start, length */
vector<thread> threads;
atomic<int> threads_ready;
atomic<int> state; /* 1 = start; 0 = don't start */
void worker(int thread_index){
int tmp_num;
while(END_VALUE != state){ /* Until the pool is stopped */
while(START_VALUE == state){ /* Wait until start signal is provided */
worker_function(
(*target_buffer),
(thread_index * (target_buffer->size()/threads.size())),
(target_buffer->size()/threads.size())
);/* do the work */
tmp_num = threads_ready.load(); /* signal that work is done! */
while(!threads_ready.compare_exchange_weak(tmp_num, (tmp_num + 1u)))
tmp_num = threads_ready.load(); /* increase "done counter" */
while(START_VALUE == state.load()); /* Wait until the other threads are finished */
tmp_num = threads_ready.load(); /* Signal that thread is finished */
while(!threads_ready.compare_exchange_weak(tmp_num, (tmp_num - 1u)))
tmp_num = threads_ready.load(); /* decrease "done counter" */
} /*while(START_VALUE == state)*/
} /*while(END_VALUE != state)*/
}
};
static int result = 0;
static mutex cout_mutex;
void worker(vector<double> buffer, int start, int length){
double sum = 0;
for(int i = 0; i < length; ++i){
sum += buffer[i];
}
std::lock_guard<mutex> my_lock(cout_mutex);
std::cout << "Partial sum: " << std::setw(4) << sum << " \t\t |" << "\r";
result += sum;
}
int main(int argc, char** agrs){
ThreadPool pool(5,&worker);
result = 0;
for(int i = 0; i< 10000; ++i){
vector<double> test_buffer(500, rand()%10);
result = 0;
pool.start_and_block(test_buffer);
std::cout << "result["<< i << "]: " << std::setw(4) << result << " \t\t |" << "\r";
assert(std::accumulate(test_buffer.begin(),test_buffer.end(), 0) == result);
}
std::cout << "All assertions passed! |"<< std::endl;
return 0;
}
According to the assertions, the behavior is as expected, but is there any dangers/wrong behavior here that I missed? Is there any way to improve this? How might this put up with real-world usage?
-
\$\begingroup\$ I wouldn't call this a thread pool - that would imply that calling code could come along and find a running thread to do its work. Here, we just have a gang of threads that start, do work, and finish. \$\endgroup\$Toby Speight– Toby Speight2021年06月14日 11:57:28 +00:00Commented Jun 14, 2021 at 11:57
-
1\$\begingroup\$ Incorporating advice from an answer into the question violates the question-and-answer nature of this site. You could post improved code as a new question, as an answer, or as a link to an external site - as described in I improved my code based on the reviews. What next?. I have rolled back the edit, so the answers make sense again. \$\endgroup\$Toby Speight– Toby Speight2021年06月14日 14:38:41 +00:00Commented Jun 14, 2021 at 14:38
-
\$\begingroup\$ Thank you! I'll keep this in mind next time. \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月14日 17:12:30 +00:00Commented Jun 14, 2021 at 17:12
2 Answers 2
start_and_block()
is mis-named - call it start_and_busy_wait()
. Busy-waiting is inefficient. Instead of avoiding std::condition_variable
, embrace it; use it in the slow path instead of while(threads.size() > threads_ready.load());
and while(0 < threads_ready.load());
.
Also, it's redundant to compare against literal bools - if(true == threads.back().joinable())
should be simply if(threads.back().joinable())
.
-
\$\begingroup\$ That is not true, the threads also need to be finished and ready, for another potential ̇start_and_block` call. If the slow path doesn't wait until
threads_ready
is back to 0 there are all kinds of synchronisation problems. \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月14日 12:02:03 +00:00Commented Jun 14, 2021 at 12:02 -
\$\begingroup\$ Is the only way to eliminate busy-waiting condition variables? \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月14日 12:04:50 +00:00Commented Jun 14, 2021 at 12:04
-
1\$\begingroup\$ Well the reason was the lack of understanding, but I think a conditional variable could be used in the slow path instead of the
while
cycles, where the predicate would be the same as the condition in the while cycle. \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月14日 12:56:00 +00:00Commented Jun 14, 2021 at 12:56 -
\$\begingroup\$ Yes, that's right. \$\endgroup\$Toby Speight– Toby Speight2021年06月14日 12:57:42 +00:00Commented Jun 14, 2021 at 12:57
-
1\$\begingroup\$ I've made an improved version: codereview.stackexchange.com/questions/263059/… \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月15日 06:56:34 +00:00Commented Jun 15, 2021 at 6:56
Why is the state an int
rather than an enumerated type? Your definitions of constants IDLE_VALUE
etc. looks like it ought to be an enumerated type. Is this used with values other than those special three?
while(0 < threads_ready.load());
Busy waiting is inefficient. At worst, you may end up with the thread doing the work being pre-empted to run the loop that's waiting for it to finish! Better to use a semaphore
to wait for all threads to complete.
: target_buffer(nullptr)
, worker_function(function)
, threads()
, threads_ready(0)
, state(IDLE_VALUE)
Only one of those are initialized with a parameter to the constructor. The rest can use inline immediate initializers on those members for clearer code and maintenance benefits.
if(true == threads.back().joinable())
Testing against true
is just silly. Where does it stop? After all, operator==
is also returning a bool
so should you write true == (true == threads.back().joinable())
? Bools are bool. Your test is if(threads.back().joinable())
.
Your "buffer" is not const so I thought it was going to be for output. It's not. That's the work input being split up. Buffer is a misleading name for this. You should use iterators rather than a collection plus indexes to indicate a group of elements to work on.
-
\$\begingroup\$ "The rest can use inline immediate initializers on those members for clearer code and maintenance benefits. " - Does that mean, to simply omit them from the constructor, for the members to be zero-initialized? \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月15日 16:10:25 +00:00Commented Jun 15, 2021 at 16:10
-
\$\begingroup\$ The Busy waiting is eliminated in the question linked to this one, and indeed you are right, thank you! \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月15日 16:11:46 +00:00Commented Jun 15, 2021 at 16:11
-
\$\begingroup\$ You are also right about the enumeration, I'll keep this in mind with the next rework, although I only have a vague outline of how I might be using iterators rather than a collection.. \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月15日 16:13:40 +00:00Commented Jun 15, 2021 at 16:13
-
\$\begingroup\$ @DavidTóth each worker is given a pair (begin and end) of iterators to define the input elements assigned to it. Are you familiar with the standard library algorithms? \$\endgroup\$JDługosz– JDługosz2021年06月15日 20:10:12 +00:00Commented Jun 15, 2021 at 20:10
-
\$\begingroup\$ I'd much rather we discuss this with the next version I am about to post, it will definately be more specific. \$\endgroup\$Dávid Tóth– Dávid Tóth2021年06月16日 06:18:59 +00:00Commented Jun 16, 2021 at 6:18