Description
I am trying to implement a robust, multi-threaded producer-consumer model with a single producer in the main thread and several consumers in other threads.
The generator thread loops over data, recognizes through some condition which sort of data it is dealing with, and proceeds to delegate the sample to the respective worker thread (via pushBack(data)
). When reaching the end of the source data, it sends a signal to the worker threads and waits for them to finish. Afterwards, it cleans up and exits.
For the consumer threads, I have written a class that starts a thread upon instantiation. Each class instance has its own FIFO queue into which the producer thread pushes samples. Upon receiving the data, it can immediately start processing as it is not dependent on other threads. When the queue is empty, it waits indefinitely until either a sample is received OR a stop signal is received (e.g. method stopThread
is called). In case the stop signal is received when the queue is not empty yet, it proceeds to finish the remaining elements in the queue and then exits.
The queue implementation that I am using is not complicated to use (https://github.com/cameron314/readerwriterqueue, but I'll give a quick overview of what each used method does:
- try_enqueue: Enqueues a copy of element if there is room in the queue. Returns true if the element was enqueued, false otherwise.
- try_dequeue: Attempts to dequeue an element; if the queue is empty, returns false instead.
- peek: Returns a pointer to the front element in the queue (the one that would be removed next by a call to
try_dequeue
orpop
). If the queue appears empty at the time the method is called, nullptr is returned instead.
Code
#include <chrono>
#include <iostream>
#include <thread>
#include "readerwriterqueue.h"
class Consumer
{
public:
Consumer(int id) : m_BufferQueue(64) {
m_id = id;
m_thread = std::thread([this] {work();});
}
void pushBack(int* num){
while (!m_BufferQueue.try_enqueue(num)){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
void stopThread(){
m_running = false;
if (m_thread.joinable())
m_thread.join();
}
void work() {
m_running = true;
while(m_running || m_BufferQueue.peek())
{
int* received;
bool succeeded = m_BufferQueue.try_dequeue(received);
if(succeeded)
{
std::cout << "Thread " << m_id << " received value " << *received << std::endl;
delete received;
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
int m_id;
moodycamel::ReaderWriterQueue<int*> m_BufferQueue;
std::thread m_thread;
std::atomic_bool m_running;
};
int main() {
Consumer* c1 = new Consumer(1);
Consumer* c2 = new Consumer(2);
// data generator
for(int i = 0; i < 10; i++)
{
int* val = new int(i);
if (i % 2 == 0)
c1->pushBack(val);
else
c2->pushBack(val);
}
c1->stopThread();
c2->stopThread();
delete c1;
delete c2;
std::cout << "EXIT" << std::endl;
return 0;
}
Points of Concern
- Does the code encapsulate the individual threads in such a way that they are truly independent from each other? I am worried that I might have overlooked something here.
- When stopping the thread, is it good practice to check if it is joinable, before joining it? To my understanding, joinable just checks if the thread has been started, but at that time that should always be the case. So it might be unnecessary.
- Any other oversights that I might have missed?
Link to file "readerwriterqueue.h"
-
\$\begingroup\$ Welcome to Code Review! I have rolled back Rev 4 → 3. Please see What to do when someone answers. \$\endgroup\$Sᴀᴍ Onᴇᴌᴀ– Sᴀᴍ Onᴇᴌᴀ ♦2021年12月13日 19:17:52 +00:00Commented Dec 13, 2021 at 19:17
-
\$\begingroup\$ @SᴀᴍOnᴇᴌᴀ Sorry, I was not aware of that. I have written a self-answer instead. \$\endgroup\$DocDriven– DocDriven2021年12月13日 19:36:32 +00:00Commented Dec 13, 2021 at 19:36
1 Answer 1
Addressing your concerns
Does the code encapsulate the individual threads in such a way that they are truly independent from each other? I am worried that I might have overlooked something here.
There is nothing in the implementation of Consumer
that makes multiple instances dependent on each other, assuming nothing in ReaderWriterQueue
causes them to be dependent somehow. If ReaderWriterQueue
makes system calls or allocates memory as part of enqueueing or dequeueing items, then two threads might block each other if the operating system or memory allocator need to take a global lock.
When stopping the thread, is it good practice to check if it is joinable, before joining it? To my understanding, joinable just checks if the thread has been started, but at that time that should always be the case. So it might be unnecessary.
It is indeed unnecessary, and I would even consider checking for joinability a code smell.
Any other oversights that I might have missed?
See below.
Simplify the constructor
It is weird to see m_BufferQueue
being initialized in the member initializer list, but m_id
and m_thread
in the body of the constructor. Everything can be initialized in the member initializer list, and this should be preferred as it ensures initialization happens in the correct order for stack unwinding to work correctly if any of the initializers throw
during construction.
But I would even use default member initializers. This even works for m_thread
:
class Consumer {
public:
Consumer(int id) : m_id(id) {}
...
private:
int m_id;
moodycamel::ReaderWriterQueue<int*> m_BufferQueue{64};
std::thread m_thread{&Consumer::work, this};
...
};
Note that I also rewrote the initialization of m_thread
to avoid having to pass a lambda.
Make work()
private
The function work()
should only be called internally. Make it private
to avoid someone accidentily calling work()
on a Consumer
that is already running.
Join the thread in the destructor
If you start the thread in the constructor, you should join it in the destructor. Otherwise, bad things will happen if a Consumer
object is destroyed before stopThread()
has been called. You might even consider removing stopThread()
altogether, and just leave it up to the destructor:
class Consumer {
public:
...
~Consumer() {
m_running = false;
m_thread.join();
}
...
};
With C++20, you might want to consider using std::jthread
, which automatically takes care of that for you.
Alternatively, if you want to be able to start and stop the thread multiple times during the lifetime of one Consumer
object, have both startThread()
and stopThread()
member functions that manage the thread.
Wait instead of poll
If there is nothing in the queue, the worker thread will poll for new items every millisecond. A millisecond is an arbitrary amount of time. You probably chose it because you think it's very small on human timescales. But it has several problems. One is that now you potentially introduce a 1 millisecond delay for every item in the queue; especially if there is some feedback loop where the next item will only be produced once the consumer has done something with a previous item. Also, 1 millisecond might be too slow for a real-time task. There is also no guarantee that std::this_thread::sleep_for()
will wait for exactly the amount you specify; the operating system might cause it to sleep for much longer, either because it will just schedule other work, or because the granularity of the system's timers are coarser than 1 millisecond. Finally, doing something every millisecond might prevent a CPU from going into a low power mode; thereby causing your code to consume more energy than necessary.
If your ReaderWriterQueue
has a dequeue()
function that waits, you should use this. The only problem then is that setting m_running
to false
has no effect unless there is something in the queue. The usual trick to solve this issue is to set m_running
to false
first, and then enqueue a dummy item into the queue. This ensures dequeue()
will wake up and return something (potentially the dummy item).
Unnecessary use of new
Manual calls to new
and delete
should be avoided. Instead, let containers manage memory for items directly, or use std::unique_ptr
. Also, in main()
there is no reason at all to dynamically allocate memory, you can just allocate the consumers as regular variables:
int main() {
{
Consumer c1(1);
Consumer c2(2);
for (...) {
...
}
}
std::cout << "EXIT\n";
}
Turn Consumer
into a generic class
The code you have now is not very generic. You would have to modify Consumer::work()
to do what you want with the items you receive from the queue. Consider making Consumer
a template that works with any item type, and which takes a function that processes each item received. It might look like:
template<typename T>
class Consumer {
public:
Consumer(int id, std::function<void(int, T&)> func): m_id(id), m_func(func) {}
~Consumer() {...}
void pushBack(const T& t) {
m_bufferQueue.enqueue(t);
}
private:
void work() {
while(...) {
T t = m_BufferQueue.dequeue();
m_func(m_id, t);
}
}
int m_id;
std::function<void(int, T&)> m_func;
moodycamel::ReaderWriterQueue<T> m_BufferQueue{64};
...
};
Then it can be used like so:
int main() {
auto func = [](int id, int& val){
std::cout << "Thread " << id << " received value " << val << "\n";
}
Consumer<int> c1(1, func);
Consumer<int> c2(2, func);
for(int i = 0; i < 10; i++) {
auto& consumer = (i % 2 == 0) ? c1 : c2;
consumer.pushBack(i);
}
}
Thread safety issues
When writing multi-threaded code, always think about possible race conditions. While the queue might be thread-safe, you have to think about the whole lifetime of a Consumer
. You start the thread in the constructor, and then the thread itself will set m_running
to true
. But consider that the constructor might return before m_running
has been set, and then something might call stopThread()
before m_running
has been set. In that case, stopThread
will set m_running
to false, but the first thing the worker thread does when it gets scheduled is to set m_running
to true, and happily waits for events which will never arrive, and stopThread()
will forever wait because the thread is still running. The fix is to set m_running
to true
in the constructor as well, and for that to work the member variable m_running
must be declared before m_thread
.
Furthermore, as Edward mentioned in the comments, whatever a consumer does when it receives a new item must of course also be thread-safe itself. Also consider whether you want to allow whatever runs in the worker thread to be able to call stopThread()
or any other member function of Consumer
. If so, make sure these functions can be called from multiple threads. If not, it's always a good idea to add this restriction to the documentation of the member functions.
-
1\$\begingroup\$ There actually is still an unresolved data race for the
Consumer
threads -- both usestd::cout
without a semaphore or lock. With C++20, they could usestd::osyncstream
to address that flaw. \$\endgroup\$Edward– Edward2021年12月15日 13:16:45 +00:00Commented Dec 15, 2021 at 13:16 -
\$\begingroup\$ Generally one wants to send a signal to <b>all</b> the threads that they should stop before joining them. \$\endgroup\$jdt– jdt2021年12月16日 15:14:54 +00:00Commented Dec 16, 2021 at 15:14
Explore related questions
See similar questions with these tags.