Skip to main content
Code Review

Return to Question

replaced http://codereview.stackexchange.com/ with https://codereview.stackexchange.com/
Source Link

I have incorporated all the cool points made by ChrisWue ChrisWue in the initial iteration of this post initial iteration of this post.

I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.

I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.

Source Link
coderodde
  • 31.7k
  • 15
  • 77
  • 202

Concurrent for loop in C++ - follow-up

I have incorporated all the cool points made by ChrisWue in the initial iteration of this post.

Now, I am not reinventing the wheel for my concurrent queue, but use internally std::deque. Also, I modified the API of the concurrent forp construct: now it takes any range specified by means of iterators as the input list.

See what I have:

concurrent.h:

#ifndef FORP_H
#define FORP_H
#include <deque>
#include <functional>
#include <iterator>
#include <thread>
#include <vector>
namespace net {
 namespace coderodde {
 namespace concurrent {
 ////////////////////////////////////////////////////////////////////
 // This is an adhoc concurrent queue used by forp. //
 ////////////////////////////////////////////////////////////////////
 template<class Iter>
 class queue
 {
 typedef typename std::iterator_traits<Iter>::value_type T;
 private:
 std::deque<T> m_queue;
 std::size_t m_index;
 std::mutex m_mutex;
 public:
 queue(Iter begin, Iter end) : m_index{0}
 {
 for (Iter i = begin; i != end; ++i) 
 {
 m_queue.push_back(*i);
 }
 }
 size_t size() 
 {
 return m_queue.size();
 }
 std::tuple<T, size_t, bool> dequeue() 
 {
 std::tuple<T, size_t, bool> ret;
 {
 std::lock_guard<std::mutex> lock(m_mutex);
 if (m_queue.empty())
 {
 ret = std::make_tuple(T(), 0, false);
 }
 else 
 {
 ret = std::make_tuple(m_queue.front(), 
 m_index++, 
 true);
 m_queue.pop_front();
 }
 }
 return ret;
 }
 };
 template<class InIter, class Out>
 void thread_do(net::coderodde::concurrent::queue<InIter>& input_queue,
 Out (*process)(typename std::iterator_traits<InIter>::value_type in),
 std::vector<Out>& output_vector)
 {
 typedef typename std::iterator_traits<InIter>::value_type In;
 while (true)
 {
 std::tuple<In, size_t, bool> data = input_queue.dequeue();
 if (std::get<2>(data) == false)
 {
 return;
 }
 const In input_element = std::get<0>(data);
 const size_t input_element_index = std::get<1>(data);
 Out output_element = process(input_element);
 output_vector[input_element_index] = output_element;
 }
 }
 ////////////////////////////////////////////////////////////////////
 // This function template implements a concurrent, thread-pool-//
 // based iteration construct. //
 ////////////////////////////////////////////////////////////////////
 // Side effects: the input range remains intact; output_vector is
 // cleared and populated with the output data.
 ////////////////////////////////////////////////////////////////////
 template<class InIter, class Out>
 void forp(InIter begin, InIter end, 
 Out (*process)(typename std::iterator_traits<InIter>::value_type), 
 std::vector<Out>& output_vector)
 {
 unsigned thread_count = std::thread::hardware_concurrency();
 std::vector<std::thread> thread_vector;
 thread_vector.reserve(thread_count);
 net::coderodde::concurrent::queue<InIter> input_queue(begin, 
 end);
 output_vector.clear();
 output_vector.resize(input_queue.size());
 for (unsigned i = 0; i < thread_count; ++i) 
 {
 thread_vector.push_back(
 std::thread(&thread_do<InIter, Out>, 
 std::ref(input_queue), 
 std::ref(process), 
 std::ref(output_vector)));
 }
 for (std::thread& thread : thread_vector)
 {
 thread.join();
 }
 }
 } /* namespace concurrent */
 } /* namespace coderodde */
} /* namespace net */
#endif /* FORP_H */

main.cpp:

#include "concurrent.h"
#include <chrono>
#include <cstdint>
#include <iostream>
#include <list>
#include <vector>
class CurrentTime {
 std::chrono::high_resolution_clock m_clock;
public:
 uint64_t milliseconds() 
 {
 return std::chrono
 ::duration_cast<std::chrono
 ::milliseconds>
 (m_clock.now().time_since_epoch()).count();
 }
};
using net::coderodde::concurrent::forp;
using std::cout;
using std::list;
using std::stringstream;
using std::vector;
static uint64_t fibonacci(uint64_t n)
{
 if (n <= 0) 
 {
 return 0;
 }
 if (n == 1) 
 {
 return 1;
 }
 return fibonacci(n - 1) + fibonacci(n - 2);
}
template<class T>
std::ostream& operator<<(std::ostream& out, std::vector<T>& vector)
{
 out << "[";
 if (!vector.empty())
 {
 out << vector[0];
 }
 for (size_t i = 1; i < vector.size(); ++i) 
 {
 out << ", " << vector[i];
 }
 return out << "]";
}
int main(int argc, char** argv) {
 list<uint64_t> fibonacci_task_input_list = 
 { 40, 41, 39, 33, 43, 30, 34, 40 };
 CurrentTime ct;
 vector<uint64_t> result_vector1;
 vector<uint64_t> result_vector2;
 uint64_t start_time = ct.milliseconds();
 for (const int i : fibonacci_task_input_list)
 {
 result_vector1.push_back(fibonacci(i));
 }
 uint64_t end_time = ct.milliseconds();
 cout << "Serial processing in " 
 << (end_time - start_time)
 << " milliseconds.\n";
 start_time = ct.milliseconds();
 net::coderodde::concurrent::forp(fibonacci_task_input_list.begin(),
 fibonacci_task_input_list.end(),
 fibonacci,
 result_vector2);
 end_time = ct.milliseconds();
 cout << "Parallel processing in "
 << (end_time - start_time)
 << " milliseconds.\n";
 cout << "Serial result: " << result_vector1 << "\n";
 cout << "Concurrent result: " << result_vector2 << "\n";
 return 0;
}

Any critique is much appreciated.

lang-cpp

AltStyle によって変換されたページ (->オリジナル) /