(See the next iteration.)
I have this easy to use facility that maps input elements to output elements concurrently by the means of a thread pool:
concurrent.h:
#ifndef FORP_H
#define FORP_H
#include <functional>
#include <initializer_list>
#include <iostream>
#include <thread>
#include <vector>
namespace net {
namespace coderodde {
namespace concurrent {
////////////////////////////////////////////////////////////////////
// This is an adhoc concurrent queue used by forp. //
////////////////////////////////////////////////////////////////////
template<class T>
class queue
{
private:
struct queue_node
{
T m_element;
size_t m_element_index;
queue_node* m_next;
queue_node(const T& element, const size_t index) :
m_element{element},
m_element_index{index},
m_next{nullptr}
{
}
};
std::mutex m_mutex;
queue_node* m_head;
queue_node* m_tail;
public:
queue(std::initializer_list<T> list)
{
m_head = nullptr;
size_t index = 0;
for (const auto& element : list)
{
queue_node* new_node = new queue_node(element,
index++);
if (m_head == nullptr)
{
m_head = new_node;
m_tail = new_node;
}
else
{
m_tail->m_next = new_node;
m_tail = new_node;
}
}
}
std::tuple<T, size_t, bool> dequeue()
{
std::tuple<T, size_t, bool> ret;
m_mutex.lock();
if (m_head == nullptr)
{
// The queue is empty.
ret = std::make_tuple(T(), 0, false);
}
else
{
ret = std::make_tuple(m_head->m_element,
m_head->m_element_index,
true);
m_head = m_head->m_next;
}
m_mutex.unlock();
return ret;
}
};
template<class In, class Out>
void thread_do(net::coderodde::concurrent::queue<In>& input_queue,
Out (*process)(In in),
std::vector<Out>& output_vector)
{
while (true)
{
std::tuple<In, size_t, bool> data = input_queue.dequeue();
if (std::get<2>(data) == false)
{
return;
}
const In input_element = std::get<0>(data);
const size_t input_element_index = std::get<1>(data);
Out output_element = process(input_element);
output_vector[input_element_index] = output_element;
}
}
////////////////////////////////////////////////////////////////////
// This function template implements a concurrent, thread-pool-//
// based iteration construct. //
////////////////////////////////////////////////////////////////////
template<class In, class Out>
void forp(std::initializer_list<In>& input_list,
Out (*process)(In in),
std::vector<Out>& output_vector)
{
unsigned thread_count = std::thread::hardware_concurrency();
std::vector<std::thread> thread_vector;
thread_vector.reserve(thread_count);
net::coderodde::concurrent::queue<In> input_queue(input_list);
output_vector.clear();
output_vector.reserve(input_list.size());
for (size_t i = 0; i < input_list.size(); ++i)
{
output_vector.push_back(Out());
}
for (unsigned i = 0; i < thread_count; ++i)
{
thread_vector.push_back(
std::thread(&thread_do<In, Out>,
std::ref(input_queue),
std::ref(process),
std::ref(output_vector)));
}
for (std::thread& thread : thread_vector)
{
thread.join();
}
}
} /* namespace concurrent */
} /* namespace coderodde */
} /* namespace net */
#endif /* FORP_H */
main.cpp:
#include "concurrent.h"
#include <chrono>
#include <cstdint>
#include <initializer_list>
#include <iostream>
#include <sstream>
#include <vector>
class CurrentTime {
std::chrono::high_resolution_clock m_clock;
public:
uint64_t milliseconds()
{
return std::chrono
::duration_cast<std::chrono
::milliseconds>
(m_clock.now().time_since_epoch()).count();
}
};
using net::coderodde::concurrent::forp;
using std::initializer_list;
using std::vector;
using std::cout;
using std::stringstream;
static uint64_t fibonacci(uint64_t n)
{
if (n <= 0)
{
return 0;
}
if (n == 1)
{
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
template<class T>
std::string to_string(std::vector<T>& vec)
{
stringstream ss;
ss << "[";
if (vec.size() > 0)
{
ss << vec[0];
}
for (size_t i = 1; i < vec.size(); ++i)
{
ss << ", " << vec[i];
}
ss << "]";
return ss.str();
}
int main(int argc, char** argv) {
std::initializer_list<uint64_t> fibonacci_task_input_list =
{ 40, 41, 39, 33, 43, 30, 34, 40, 42, 20, 42, 40, 41 };
CurrentTime ct;
vector<uint64_t> result_vector1;
vector<uint64_t> result_vector2;
uint64_t start_time = ct.milliseconds();
for (const int i : fibonacci_task_input_list)
{
result_vector1.push_back(fibonacci(i));
}
uint64_t end_time = ct.milliseconds();
cout << "Serial processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
start_time = ct.milliseconds();
net::coderodde::concurrent::forp(fibonacci_task_input_list,
fibonacci,
result_vector2);
end_time = ct.milliseconds();
cout << "Parallel processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
cout << "Serial result: " << to_string(result_vector1) << "\n";
cout << "Concurrent result: " << to_string(result_vector2) << "\n";
return 0;
}
queue
If you look at the dequeue()
method of the queue
, it returns also a boolean value indicating whether the queue is still nonempty after actually removing an element from it. I did this out of fear of the following scenario:
Suppose the queue contains only one element. Suppose also that a thread \$T_1\$ asks whether the queue is nonempy. Next, another thread \$T_2\$ asks whether the queue is empty. Next, say, the thread \$T_1\$ pops the last element. Eventually, \$T_2\$ still thinks that the queue is not empty when, in fact, it is.
Performance figures
On a dual-core CPU I get the following digits:
Serial processing in 20024 milliseconds. Parallel processing in 10642 milliseconds. Serial result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141] Concurrent result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141]
Since I am not proficient in C++, please, tell me anything that comes to mind.
1 Answer 1
Just a few items which caught my eye:
I wouldn't have bothered implementing my own queue. Just use a
std::deque
or a plainstd::vector
with an index pointing to the current head element. Saves a bunch of code which you don't have to test and maintain.You shouldn't use the mutex directly, you should use a
std::lock_guard
instead to make sure the mutex gets released automatically when the scope is left.You should reduce the scope of the mutex to a minimum to avoid unnecessary lock contention (in this case probably more of an academic point but still a good habit to get into).
So the
dequeue
method could look like this:std::tuple<T, size_t, bool> dequeue() { queue_node* item = nullptr; { std::lock_guard<std::mutex> lock(m_mutex); if (m_head != nullptr) { item = m_head; m_head = m_head->next; } } return std::make_tuple(item ? item->m_element : T(), item ? item->m_element_index : 0, item != nullptr); }
This:
output_vector.clear(); output_vector.reserve(input_list.size()); for (size_t i = 0; i < input_list.size(); ++i) { output_vector.push_back(Out()); }
Can be replaced with
output_vector.clear() output_vector.resize(input_list.size());
since
resize
will automatically insert elements for you if the current size is smaller than the requested size.
Update: Actually I just noticed that your queue
implementation is leaking memory: nodes get new
-ed but never deleted. Which comes back to my first point :)
Also you're copying the In
and Out
elements around a few times when you probably could just move them but I don't do enough day-to-day modern C++ to provide a correct answer on the spot right now. I'll leave that to someone else.
-
\$\begingroup\$ Is there anything else, such as use of move semantics/rvalue references? \$\endgroup\$coderodde– coderodde2016年07月05日 08:36:43 +00:00Commented Jul 5, 2016 at 8:36
-
\$\begingroup\$ @coderodde: You could probably move a few things around instead of (implicitly) copying them, I just can't provide a good enough answer I'm comfortable with on that point right now. \$\endgroup\$ChrisWue– ChrisWue2016年07月05日 09:16:40 +00:00Commented Jul 5, 2016 at 9:16