I want to write small server that has one thread that pushes some jobs into queue and several threads that pull the jobs and process it somehow.
I wrote (of course no, I found an example) of blocking queue. Then I wrote cFakeWorker
that is server prototype. In general it works ok, but I'm not sure that the way I implemented termination is very good. And, so far, I will appreciate for ideas how should be done.
Currently I have m_bForceStop
member of cBlockingQueue
class. pull()
function checks its state with condition_variable
and throws exception in case of termination.
cFakeWorker
catches this exception while waits for the new jos.
The moment I call cBlockingQueue::stop_waiting()
is cFakeWorker::Terminate()
. This implementation works, but I have bad feeling that it might be done better. So I will appreciate for the review.
UPDATE! Uploading all sources @Phrancis, thanks for the comment. I will appreciate for comments.
main.cpp
void main()
{
cFakeWorker fw;
std::thread thFW([&fw](){fw.StartFlow();});
while (true)
{
std::string s;
std::cin >> s;
if (s == "exit")
{
std::cout << __func__ << " Terminating .... " << std::endl;
fw.Terminate();
std::cout << __func__ << " Termination done" << std::endl;
thFW.join();
return;
}
}
}
cFakeWorker.h
#ifndef CFAKEWORKER_H
#define CFAKEWORKER_H
#include <chrono>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "cBlockingQueue.h"
using namespace std::chrono_literals;
struct taskData
{
std::string str;
std::chrono::milliseconds sleep_period;
};
class cFakeWorker
{
public:
cFakeWorker();
virtual ~cFakeWorker();
void StartFlow();
void Terminate();
private:
cBlockingQueue<taskData> m_queue;
int m_NCores;
int m_NPullThreads;
std::vector<std::thread> m_vecPullThreads;
std::thread m_PushThread;
std::mutex m_mtxIO;
std::mutex m_mtxTerminate;
std::condition_variable m_cvTerminate;
bool m_bForceTermination;
void PushJob();
void PullJob();
};
#endif /* CFAKEWORKER_H */
cFakeWorker.cpp
#include "cFakeWorker.h"
#include <iostream>
#include <sstream>
#include <cstdlib> // std::rand()
#include <ctime>
#include <time.h>
#include <iomanip>
#include <sched.h>
cFakeWorker::cFakeWorker()
{
m_NCores = std::thread::hardware_concurrency();
m_NPullThreads = (m_NCores > 1) ? m_NCores - 1 : 1;
}
void cFakeWorker::StartFlow()
{
m_bForceTermination = false;
m_PushThread = std::thread([this](){this->PushJob();});
for (int i = 0; i < m_NPullThreads; i++)
{
{
std::lock_guard<std::mutex> lckIO(m_mtxIO);
std::cout << "Starting pull thread #" << i << std::endl;
m_vecPullThreads.push_back(std::thread([this](){this->PullJob();}));
}
}
}
void cFakeWorker::PushJob()
{
std::condition_variable cv;
std::srand(std::time(nullptr));
while (true)
{
{
std::lock_guard<std::mutex> lck(m_mtxTerminate);
if(m_bForceTermination)
{
return;
}
}
std::stringstream ss;
std::chrono::high_resolution_clock::time_point tp = std::chrono::high_resolution_clock::now();
std::time_t t = std::chrono::system_clock::to_time_t(tp);
ss << std::put_time(std::localtime(&t), "%Y %a %b %d %H:%M:%S");
int nSleepDuration = ( std::rand() + 1) % 999;
std::this_thread::sleep_for(100ms);
taskData dat;
dat.str = ss.str();
dat.sleep_period = std::chrono::milliseconds(nSleepDuration);
m_queue.push(dat);
}
}
void cFakeWorker::PullJob()
{
while (m_queue.is_waiting())
{
try
{
taskData dat;
try
{
dat = m_queue.pull();
}
catch (const cTerminationException & ex)
{
std::cout << __func__ << ": " << "cBlockingQueue::pull() was terminated" << std::endl;
return;
}
{
std::lock_guard<std::mutex> lckIO(m_mtxIO);
std::cout << " Start doing "
<< "------ >>> "
<< "CPU: " << sched_getcpu()
<< " [" << std::this_thread::get_id() << "] --"
<< " msg : " << dat.str
<< " sleep: " << dat.sleep_period.count()
<< std::endl;
}
// fake load
for(int i = 1; i < 50000; i++)
{
char * p = new char [i];
for (int j = 0; j < i; ++j)
{
p[j] = 2;
}
delete [] p;
}
std::cout << " Stop doing "
<< "------ >>> "
<< "CPU: " << sched_getcpu()
<< std::endl;
}
catch (const std::exception & ex)
{
std::cout << __func__ << ": " << "something has happened. Keep pulling" << std::endl;
}
}
}
void cFakeWorker::Terminate()
{
{
std::lock_guard<std::mutex> lck(m_mtxTerminate);
m_bForceTermination = true;
}
m_PushThread.join();
m_queue.stop_waiting();
for (auto& th : m_vecPullThreads)
{
th.join();
}
}
cFakeWorker::~cFakeWorker()
{
try
{
Terminate();
}
catch (const std::exception & ex)
{}
}
cBlockingQueue.h (example code I found)
#ifndef CBLOCKINGQUEUE_H #define CBLOCKINGQUEUE_H #include <mutex> #include <deque> #include <condition_variable> #include <thread> #include <iostream> #include <chrono> using namespace std::chrono_literals; class cTerminationException : public std::exception { }; template <typename T> class cBlockingQueue { public: cBlockingQueue() { m_bForceStop = false; } void push(const T & param) { { std::unique_lock<std::mutex> lock(m_mut); m_deque.push_front(param); } m_cv.notify_all(); } T pull() { std::unique_lock<std::mutex> lock(m_mut); m_cv.wait(lock, [=](){return !this->m_deque.empty() || m_bForceStop;}); if (m_bForceStop) { std::cout << __func__ << " Throwing cTerminationException" << std::endl; throw cTerminationException(); } T ret(std::move(m_deque.back())); m_deque.pop_back(); return ret; } void stop_waiting() { m_bForceStop = true; m_cv.notify_all(); } bool is_waiting() { return !m_bForceStop; } private: std::mutex m_mut; std::deque<T> m_deque; std::condition_variable m_cv; bool m_bForceStop; }; #endif /* CBLOCKINGQUEUE_H */
1 Answer 1
main.cpp
void main()
This is not valid C++. main()
must return int
.
while (true)
{
std::string s;
std::cin >> s;
// ...
This works... sorta... but it's not exactly the most efficient way to go about it. The problem is that on each loop, the string s
is destroyed and then recreated, and then filled with data. If you move the string outside of the loop, then its memory is retained on each loop iteration.
But the more important issue is that you're not checking to see if std::cin
is in a fail state. If the input hits EOF
before getting "exit", or if there's any kind of input failure, your program will just spin forever. What you want to do is check that the input succeeded.
So what you have now is something like this:
std::string s;
while (std::cin >> s)
{
// ...
This could also be written:
for (std::string s; std::cin >> s; )
{
// ...
This is more efficient, and - more importantly - if there's any kind of input error, your program will exit.
However, now that you're exiting both on "exit" and on input error, you will need to move your termination logic outside of the loop:
for (std::string s; std::cin >> s; )
{
if (s == "exit")
break;
}
// Do termination here.
Next is:
std::cout << __func__ << " Terminating .... " << std::endl;
Don't use std::endl
. All you need is:
std::cout << __func__ << " Terminating .... \n";
cFakeWorker.h
using namespace std::chrono_literals;
Using directives normally shouldn't be in headers. And you don't even use it in the header.
class cFakeWorker
Prefixing identifiers with type information is bad practice in C++. Prefixing private members with m_
is okay-ish, but prefixing classes with c
or C
is totally unnecessary. That also goes for all the N
, vec
, and so on, but even more so, because it makes it more difficult if you ever want to change types.
virtual ~cFakeWorker();
Unless you intend for a type to be inheritable, there's no need to make the destructor virtual. cFakeWorker
doesn't seem like a type built for extending - it has no other virtual functions, for example.
int m_NCores;
int m_NPullThreads;
Are these two variables really necessary? The only time you use m_NCores
is to determine m_NPullThreads
... and the only time you use m_NPullThreads
is when you create the pull threads (after that, m_NPullThreads
is equal to m_vecPullThreads.size()
). The only thing the constructor does is set these variables, so without them, you don't need the constructor either.
std::mutex m_mtxTerminate;
std::condition_variable m_cvTerminate;
bool m_bForceTermination;
There seems an awful lot of complexity here that really isn't necessary. For starters, it doesn't seem like you actually use m_cvTerminate
. And it looks like the only purpose of m_mtxTerminate
is to guard m_bForceTermination
, and you don't use it consistently.
It seems to me that you can accomplish everything these variables do with a single atomic bool
. As a bonus, that might even make it lock-free.
std::atomic<bool> m_bForceTermination = false;
cFakeWorker.cpp
#include <cstdlib> // std::rand()
Don't use rand()
. It was always a terrible random number generator, and nobody uses it properly. (You are not using it properly either.)
C++ has a much better random library, and once you've had it explained to you, it's really easy to use. All you use rand()
for is to generate a number between 1 and 998, so all you need is this:
#include <random>
auto generate_sleep_duration()
{
static auto eng = std::default_random_engine{std::random_device{}()};
static auto dist = std::uniform_int_distribution<>{1, 998};
return dist(eng);
}
Even better: since you want it to be std::chrono::milliseconds
, you can return std::chrono::milliseconds
.
#include <ctime>
#include <time.h>
Is there any need for the second include? In any case, including C headers (like <time.h>
rather than <ctime>
) in C++ is deprecated.
The default constructor seems unnecessary.
With StartFlow()
, you don't take into account what should happen if StartFlow()
is called twice in a row, or StartFlow(); Terminate(); StartFlow();
. That could be intentional; you may intend for doing those things to be undefined behaviour. I'm just warning you.
cFakeWorker::PushJob()
std::condition_variable cv;
std::srand(std::time(nullptr));
These first two lines should probably be removed. You never use the condition variable, and this is the wrong place to call srand()
. If you were going to use rand()
, srand()
should be called only once globally. With this design, it will be called every time you start a new push thread.
while (true)
{
{
std::lock_guard<std::mutex> lck(m_mtxTerminate);
if(m_bForceTermination)
{
return;
}
}
If you change m_bForceTermination
to be an atomic<bool>
, all of the above becomes:
while (!m_bForceTermination)
{
Next:
std::chrono::high_resolution_clock::time_point tp = std::chrono::high_resolution_clock::now();
std::time_t t = std::chrono::system_clock::to_time_t(tp);
First, auto
is your friend:
auto tp = std::chrono::high_resolution_clock::now();
But the real problem here is that you're mixing clocks. This happens to work on your system, probably because system_clock
and high_resolution_clock
are the same. But it is not guaranteed to work generally.
Using a high resolution clock is specious in this situation anyway. All you do with it is print the time at a 1 second resolution. You don't need any more resolution than that.
cFakeWorker::PullJob()
The nested try
blocks bother me - this function could be flattened:
while (m_queue.is_waiting())
{
try
{
auto dat = m_queue.pull();
// rest of stuff in inner try block
}
catch (const cTerminationException & ex)
{
std::cout << __func__ << ": " << "cBlockingQueue::pull() was terminated\n";
return;
}
catch (const std::exception & ex)
{
std::cout << __func__ << ": " << "something has happened. Keep pulling\n";
}
}
I realize the fake load is just, well, "fake", but it could still be done a lot better:
// fake load
for(int i = 1; i < 50000; i++)
auto v = std::vector<signed char>(i, 2);
But why not just sleep, right?
cFakeWorker::Terminate()
Terminate()
by itself isn't too bad, but the problem is that Terminate()
is not built to be called multiple times, but you call it manually and also in the destructor. I'm actually surprised that your test program doesn't terminate (due to throwing an unhandled exception), because you join threads in the destructor that have already been shut down. Maybe I'm missing something.
To fix the issue, you need to only do cleanup if required. How you figure out how to do that is up to you. (You also need to properly cleanup. After joining all the pull threads, you don't clear the vector... but in StartFlow()
, you begin pushing new threads without checking to see if the vector is empty. You probably want to call m_vecPullThreads.clear()
after joining the threads.)
cBlockingQueue.h
I assume since this is someone else's code, it's not part of the review, right?
-
\$\begingroup\$ Sorry for the late answer. Thanks a lot! I agree with you, all these things must be improved. Thanks one more \$\endgroup\$Yura– Yura2018年06月15日 06:37:13 +00:00Commented Jun 15, 2018 at 6:37
I found an example
so did you write it yourself or not? \$\endgroup\$push()
method \$\endgroup\$