Scenario: I'm getting requests through a (thread-safe) queue. Each request then needs to be handled in a separate thread. There is a chance that the function (which is actually calling a Java-program via popen
, and polling its output) takes a very long time. From the main thread I need a mechanism to indicate that situation (basically, measuring thread-running-time).
In my example I'm trying to 'enrich' std::future
with some time information. The sample runs seamlessly - but I'm uncertain if this is the correct way.
Here is a very simple demo that mimics what I'm trying to achieve (where ThreadFunc
stands in for my real processing code):
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <vector>
#include <random>
typedef std::future<int> FutureResultInt;
int ThreadFunc( )
{
std::random_device rd;
std::mt19937 mt(rd());
const int iRand = std::uniform_int_distribution<int>(2000, 6000)(mt);
std::cout << "ThreadFunc waiting for [" << iRand << "] ms ... " << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(iRand));
std::cout << "ThreadFunc [" << iRand << "] done" << std::endl;
return iRand;
}
class CFutureTest
{
public:
CFutureTest() = delete;
CFutureTest(FutureResultInt&& fr)
: m_start(std::chrono::system_clock::now())
, m_result()
{
m_result = std::move(fr);
};
int GetAge() const
{
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - m_start).count();
}
// private:
FutureResultInt m_result;
std::chrono::time_point<std::chrono::system_clock> m_start;
};
int main()
{
std::vector< CFutureTest > futures;
for (int i = 0; i < 5; i++)
futures.push_back(std::move(std::async(std::launch::async, ThreadFunc)));
while (futures.size() > 0)
{
for (std::vector< CFutureTest >::iterator it = futures.begin(); it != futures.end(); ++it)
{
CFutureTest& future = *it;
const std::future_status stat = future.m_result.wait_for(std::chrono::milliseconds(1));
switch (stat)
{
case std::future_status::timeout:
if (future.GetAge() > 4000)
{
std::cout << "Thread has exceeded the time limit" << std::endl;
}
continue;
case std::future_status::deferred:
std::cout << "std::future_status::deferred" << std::endl;
continue;
}
const int iResult = future.m_result.get();
std::cout << "future returned [" << iResult << "] (removing!)" << std::endl;
futures.erase(it);
if (futures.size() < 1)
break;
it = futures.begin();
}
}
return 0;
}
-
\$\begingroup\$ Cross-posted on Stack Overflow \$\endgroup\$Mast– Mast ♦2018年09月21日 07:34:53 +00:00Commented Sep 21, 2018 at 7:34
1 Answer 1
Ugly typedef
I'm not a big fan of this:
typedef std::future<int> FutureResultInt;
It's not significantly shorter or easier to read, it doesn't isolate the user from an underlying type, and it only serves to slow down my reading every time I hit it. That's somewhat subjective, of course, but I don't believe it adds value. I'd be happier with it if it told us what it's for, rather than merely what it is. I'm thinking of something like:
using TimerTaskResult = std::future<int>;
Now the name conveys something that's not just duplicating the standard name.
Member initialization
My compiler warns me that the initializer list is in a misleading order:
203856.cpp:47:54: warning: ‘CFutureTest::m_start’ will be initialized after [-Wreorder]
203856.cpp:46:19: warning: ‘FutureResultInt CFutureTest::m_result’ [-Wreorder]
203856.cpp:33:3: warning: when initialized here [-Wreorder]
CFutureTest(FutureResultInt&& fr)
^~~~~~~~~~~
I'd also recommend value-initializing m_result
rather than default-constructing followed by move-assigning:
CFutureTest(FutureResultInt&& fr)
: m_result{std::move(fr)}
, m_start{std::chrono::system_clock::now()}
{
}
Missing case
203856.cpp: In function ‘int main()’:
203856.cpp:63:14: warning: enumeration value ‘ready’ not handled in switch [-Wswitch]
switch (stat)
^
I like to have this warning enabled. We could provide a minimal case std::future_status::ready: break;
, or we could unify the control flow by changing the other branches from continue
to break
and inlining the following code into the ready
case like this:
switch (stat) {
case std::future_status::timeout:
if (future.GetAge() > 4000) {
std::cout << "Thread has exceeded the time limit" << std::endl;
}
break;
case std::future_status::deferred:
std::cout << "std::future_status::deferred" << std::endl;
break;
case std::future_status::ready:
{
const int iResult = future.m_result.get();
std::cout << "future returned [" << iResult << "] (removing!)" << std::endl;
futures.erase(it);
if (futures.empty()) {
return 0;
} else {
it = futures.begin();
}
}
}
Instead of returning to futures.begin()
when we harvest a result, it's arguably better to keep going from the next element if there is one:
it = futures.erase(it);
if (it == futures.end()) {
if (futures.empty()) {
return 0;
} else {
it = futures.begin();
}
}
Interleaved output
I know it's not part of your production code, but it's quite irritating to have the different threads' output interrupting each other mid-line. We can provide a class to hold a lock while several items are written to a stream:
class LogStream
{
static std::mutex mutex;
std::lock_guard<std::mutex> guard{mutex};
std::ostream& stream{std::clog};
public:
LogStream() {}
template<typename T>
std::ostream& operator<<(T&& t) { return stream << std::forward<T>(t); }
};
std::mutex LogStream::mutex;
And use it like this:
LogStream() << "ThreadFunc waiting for [" << iRand << "] ms ... " << std::endl;
Be clear about units
I don't like this function name:
int GetAge() const
Perhaps change it to getAgeMillis()
, or how about making it a template method? Like this:
template<typename TimeUnit = std::chrono::seconds>
int GetAge() const
{
auto now = std::chrono::system_clock::now();
return std::chrono::duration_cast<TimeUnit>(now - m_start).count();
}
That also helps us deal with the very long lines that are a perennial problem with duration_cast
.
Avoid polling
This is the big one. It's very power-inefficient to poll every 1ms to see whether you have any results; that's a bad thing on battery-powered systems, but it's just as important in a server farm.
What we need to do instead is have each finishing thread notify a condition variable. When we're woken, we then examine all the futures, picking up any where is_ready()
.
Unfortunately, that means we can't use std::async()
, as its future isn't made ready until the called function has returned. We'll need to implement our own version that can accept a std::condition_variable&
to be notified after it sets the future's value.
You might be able to pick up some advice from Stack Overflow - here's a couple of starting points that might have something relevant:
Explore related questions
See similar questions with these tags.