Task: calculate average price for each stock, using input data, which consists of stock id (int) and its price (float). It's important to detect jumps in prices, therefore need to keep correct sequence of input data, i.e. prices for the same stock id should be processed in correct sequence (from top to bottom).
Stock id# = [1 - 6], Price - any. Example data (txt format):
2 12.6
4 22.8
3 60.3
4 22.2
1 5.1
2 11.6
3 60.9
4 21.2
Solution: Idea is to use one thread per stock id, i.e. process prices for each stock id in a single thread. Assuming 6 cores are available.
I've tried to adopt producer/consumer idiom. Using std::queue
to store prices from producer and launched separate thread per stock id. Each thread retrieving data from corresponding queue and calculates the mean. (And they could do other calculations as well, like sigma jump detection)
class ThrQueue
provides std::queue
per stock id/thread, with thread safe access, and I used std::map
to map stock id vs ThrQueue
object.
To compile:
g++ -std=c++1y -pthread filename.cpp
The code below seemingly works, but not sure if its the best practice or if there are no hidden problems that I couldn't detect. Also, time delays here and there are just experimental, not sure why..? Any comments/suggestions are welcome! (please let me know in comments if anything needs to be clarified)
#include<chrono>
#include<future>
#include<condition_variable>
#include<map>
#include<unordered_map>
#include<queue>
#include<fstream>
#include<iostream>
using namespace std;
#define NTHR 6
bool go(true);
float results[NTHR] = {0};
template<typename T>
class ThrQueue {
public:
ThrQueue(T&& t) {
m_queue.push(std::forward<T>(t));
}
~ThrQueue() {
if(!m_queue.empty()) cout << " ERROR: queue not empty" << endl;
}
T pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this]{return !m_queue.empty();});
auto it = m_queue.front() ;
m_queue.pop() ;
return it;
}
void push(T const& x) {
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(x) ;
}
m_cv.notify_one();
}
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty() ;
}
private:
std::queue<T> m_queue ;
std::mutex m_mutex ;
std::condition_variable m_cv;
};
template<typename F, typename... Ts>
inline auto reallyAsync(F&& f, Ts&&... params){
return std::async(
std::launch::async,
std::forward<F>(f),
std::forward<Ts>(params)...);
}
template<typename T>
void consumer(int n, T it) {
float avg = 0; int count = 0;
while(go) {
while(!it->second.empty()){
avg += it->second.pop() ;
count++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(2)); // ??
}
results[n-1] = avg/count ;
std::this_thread::sleep_for(std::chrono::milliseconds(2)); // ??
}
int main() {
unsigned short n(0), count(0) ; float x(0);
std::future<void> futs[6];
std::unordered_map<int, ThrQueue<float>> queue_map ;
std::ifstream infile("test_sig_in.txt");
if(infile.is_open()){
while(infile >> n >> x) {
//cout << " " << n << " " << x << std::flush;
auto it = queue_map.find(n);
if(it != queue_map.end()) {
it->second.push(x) ;
}
else {
cout << " launch async for id: " << n << endl;
queue_map.emplace(std::piecewise_construct,
std::forward_as_tuple(n),
std::forward_as_tuple(std::move(x)));
futs[count++] = reallyAsync(&consumer<decltype(queue_map.find(n))>, n, queue_map.find(n));
}
}
}
infile.close();
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // ??
go = 0;
for(int i=0; i<NTHR; ++i) {
while(futs[i].wait_for(10ms) != std::future_status::ready) {cout << " wait for " << i << endl;}
cout << " exit consumer: " << i << " avg: " << results[i] << endl;
}
}
3 Answers 3
I see a number of things that may help you improve your code.
Don't abuse using namespace std
Putting using namespace std
at the top of every program is a bad habit that you'd do well to avoid.
Avoid the use of global variables
I see that go
and results
are declared as global variables rather than as local variables. It's generally better to explicitly pass variables your class or function will need rather than using the vague implicit linkage of a global variable.
Fix your formatting
There are inconsistent spaces at the beginning of lines, inconsistent whitespace within lines and an odd and occasional space in front of the ending semicolon for each statement. Being consistent helps others read and understand your code.
Prefer constexpr
variables to #define
Instead of using a #define
, use constexpr
:
constexpr std::size_t maxThreads{6};
The difference is that when they are declared this way, they have a bit of additional type safety. Note that I've also give it a more descriptive name.
Fix the array bug
The array of std::future
assumes that there are exactly NTHR
futures, but if, as in your sample, there are fewer, then some of those futures won't actually have an associated state when the code attempts to get results. Instead, it should only read count
futures.
Understand the future
The logic for the std::future
is not right. What you really want to do is not to wait for sufficient time for the threads to complete (which is what the code is currently doing), but instead to wait until the thread has finished calculating a value. To effect that change, the first thing that needs to change is the declaration:
std::future<float> futs[NTHR];
The second thing is a little more major. You want each consumer
to consume until there are no more values to consume but not to exit early if it just happens to have processed all of the values that are currently in the queue. Right now, that's being done with an ugly combination of a global variable and timed sleeps. Instead, it would make more sense for each queue to use a sentinel value that indicates the end of the queue. To effect this change, I'd suggest this right after the main input loop:
for (auto &item: queue_map) {
item.second.push(sentinel);
}
You can choose any unique non-data value. I chose this:
constexpr float sentinel{-999};
Now we can rewrite the consumer
code like this:
template<typename T>
float consumer(T it) {
float avg{0};
int count{0};
for(float value = it->second.pop(); value != sentinel; value = it->second.pop()) {
avg += value;
++count;
}
return avg/count;
}
This simply reads the queue until the last value was the sentinel value. Note, too that we no longer need to pass or keep n
because the code doesn't care which queue it's working on -- it simply calculates the average and returns the result.
Now the end of main
looks like this:
for(int i=0; i < count; ++i) {
std::cout << " exit consumer: " << i << " avg: " << futs[i].get() << "\n";
}
The results are gotten using get()
which will implicitly wait
for the future to complete. Note that, like the original, this only prints out the averages in an arbitrary order and does not preserve the associated stock number. If I were writing this, I'd use a class for each stock which would hold both the number, the count of data items and the average value.
Prefer std::lock_guard
over std::unique_lock
This code doesn't do anything tricky with locks in most cases and so it should std::lock_guard
rather than std::unique_lock
to more directly express the usage.
Use const
where practical
The empty()
member function doesn't and shouldn't modify the underlying object, so I'd suggest marking it as const
.
Don't overcomplicate
There appears to be no compelling reason to actually use threads or futures in this problem. The bottleneck is much more likely to be the sequential reading of the input file than the calculations of averages. I went through all the above review, however, in the expectation that this was likely to be more of an exercise in learning than a practical solution to this particular problem.
Alternative version
Cleaning up the items mentioned above, as well as streamlining a few other things, including putting the calculation function as a member function of the queue, here's an alternative version which is shorter, cleaner, and free of the bugs I mentioned above.
#include <future>
#include <condition_variable>
#include <map>
#include <unordered_map>
#include <queue>
#include <fstream>
#include <iostream>
template<typename T>
class ThrQueue {
public:
~ThrQueue() {
if(!m_queue.empty())
std::cout << " ERROR: queue not empty" << std::endl;
}
T pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this]{return !m_queue.empty();});
auto it = m_queue.front();
m_queue.pop();
return it;
}
void push(T const& x) {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(x);
}
m_cv.notify_one();
}
bool empty() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.empty();
}
T average() {
T sum = 0;
int count = 0;
for(T value = pop(); value != sentinel; value = pop()) {
sum += value;
++count;
}
return sum/count;
}
static constexpr T sentinel = -999;
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
};
template<typename T>
constexpr T ThrQueue<T>::sentinel;
int main() {
unsigned short n(0);
float x(0);
std::map<int, std::future<float>> futs;
std::unordered_map<int, ThrQueue<float>> queue_map;
std::ifstream infile("test_sig_in.txt");
while(infile >> n >> x) {
//std::cout << " " << n << " " << x << "\n";
queue_map[n].push(x);
// only add a new function if this is a new value
if (queue_map.size() > futs.size()) {
futs[n] = std::async(std::launch::async, &ThrQueue<float>::average, &queue_map[n]);
}
}
infile.close();
for (auto &item: queue_map) {
item.second.push(ThrQueue<float>::sentinel);
}
for(auto &res: futs) {
std::cout << " exit consumer: " << res.first << " avg: " << res.second.get() << "\n";
}
}
I will recommend using const instead of #define macro.
This seems like an overly complicated solution to a simple problem. The overhead of threading is more than the gain.
Just accumulate the sums and entry counts as you read the data, using a new class as a holder (which can be expanded if other intermediate results are needed). Calculate the average (and any other results) when you're done.
Explore related questions
See similar questions with these tags.