I have incorporated all the cool points made by ChrisWue ChrisWue in the initial iteration of this post initial iteration of this post.
I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.
I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.
Concurrent for loop in C++ - follow-up
I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.
Now, I am not reinventing the wheel for my concurrent queue, but use internally std::deque
. Also, I modified the API of the concurrent forp
construct: now it takes any range specified by means of iterators as the input list.
See what I have:
concurrent.h:
#ifndef FORP_H
#define FORP_H
#include <deque>
#include <functional>
#include <iterator>
#include <thread>
#include <vector>
namespace net {
namespace coderodde {
namespace concurrent {
////////////////////////////////////////////////////////////////////
// This is an adhoc concurrent queue used by forp. //
////////////////////////////////////////////////////////////////////
template<class Iter>
class queue
{
typedef typename std::iterator_traits<Iter>::value_type T;
private:
std::deque<T> m_queue;
std::size_t m_index;
std::mutex m_mutex;
public:
queue(Iter begin, Iter end) : m_index{0}
{
for (Iter i = begin; i != end; ++i)
{
m_queue.push_back(*i);
}
}
size_t size()
{
return m_queue.size();
}
std::tuple<T, size_t, bool> dequeue()
{
std::tuple<T, size_t, bool> ret;
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_queue.empty())
{
ret = std::make_tuple(T(), 0, false);
}
else
{
ret = std::make_tuple(m_queue.front(),
m_index++,
true);
m_queue.pop_front();
}
}
return ret;
}
};
template<class InIter, class Out>
void thread_do(net::coderodde::concurrent::queue<InIter>& input_queue,
Out (*process)(typename std::iterator_traits<InIter>::value_type in),
std::vector<Out>& output_vector)
{
typedef typename std::iterator_traits<InIter>::value_type In;
while (true)
{
std::tuple<In, size_t, bool> data = input_queue.dequeue();
if (std::get<2>(data) == false)
{
return;
}
const In input_element = std::get<0>(data);
const size_t input_element_index = std::get<1>(data);
Out output_element = process(input_element);
output_vector[input_element_index] = output_element;
}
}
////////////////////////////////////////////////////////////////////
// This function template implements a concurrent, thread-pool-//
// based iteration construct. //
////////////////////////////////////////////////////////////////////
// Side effects: the input range remains intact; output_vector is
// cleared and populated with the output data.
////////////////////////////////////////////////////////////////////
template<class InIter, class Out>
void forp(InIter begin, InIter end,
Out (*process)(typename std::iterator_traits<InIter>::value_type),
std::vector<Out>& output_vector)
{
unsigned thread_count = std::thread::hardware_concurrency();
std::vector<std::thread> thread_vector;
thread_vector.reserve(thread_count);
net::coderodde::concurrent::queue<InIter> input_queue(begin,
end);
output_vector.clear();
output_vector.resize(input_queue.size());
for (unsigned i = 0; i < thread_count; ++i)
{
thread_vector.push_back(
std::thread(&thread_do<InIter, Out>,
std::ref(input_queue),
std::ref(process),
std::ref(output_vector)));
}
for (std::thread& thread : thread_vector)
{
thread.join();
}
}
} /* namespace concurrent */
} /* namespace coderodde */
} /* namespace net */
#endif /* FORP_H */
main.cpp:
#include "concurrent.h"
#include <chrono>
#include <cstdint>
#include <iostream>
#include <list>
#include <vector>
class CurrentTime {
std::chrono::high_resolution_clock m_clock;
public:
uint64_t milliseconds()
{
return std::chrono
::duration_cast<std::chrono
::milliseconds>
(m_clock.now().time_since_epoch()).count();
}
};
using net::coderodde::concurrent::forp;
using std::cout;
using std::list;
using std::stringstream;
using std::vector;
static uint64_t fibonacci(uint64_t n)
{
if (n <= 0)
{
return 0;
}
if (n == 1)
{
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
template<class T>
std::ostream& operator<<(std::ostream& out, std::vector<T>& vector)
{
out << "[";
if (!vector.empty())
{
out << vector[0];
}
for (size_t i = 1; i < vector.size(); ++i)
{
out << ", " << vector[i];
}
return out << "]";
}
int main(int argc, char** argv) {
list<uint64_t> fibonacci_task_input_list =
{ 40, 41, 39, 33, 43, 30, 34, 40 };
CurrentTime ct;
vector<uint64_t> result_vector1;
vector<uint64_t> result_vector2;
uint64_t start_time = ct.milliseconds();
for (const int i : fibonacci_task_input_list)
{
result_vector1.push_back(fibonacci(i));
}
uint64_t end_time = ct.milliseconds();
cout << "Serial processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
start_time = ct.milliseconds();
net::coderodde::concurrent::forp(fibonacci_task_input_list.begin(),
fibonacci_task_input_list.end(),
fibonacci,
result_vector2);
end_time = ct.milliseconds();
cout << "Parallel processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
cout << "Serial result: " << result_vector1 << "\n";
cout << "Concurrent result: " << result_vector2 << "\n";
return 0;
}
Any critique is much appreciated.