I'm working on a programming exercise (university, nothing related to industry) which basically asks to implement a Buffer to be used by two threads (a producer and a consumer). The first one enqueues data calling next(T t)
, while the other gets the oldest value (in a FIFO mechanism) calling consume()
or waits if the buffer is empty. The producer can send a stop signal to declare the enqueuing ended. The text also requires a fail()
method in case anything goes wrong, but I'd like to ignore it for this question.
This is my solution
template <typename T>
class Buffer {
std::mutex m;
std::condition_variable cv;
std::queue<T> values;
bool stop, failed;
std::exception_ptr _eptr;
public:
Buffer() : stop(false), failed(false) {}
void fail (const std::exception_ptr &eptr){
{
std::unique_lock ul{m};
failed = true;
_eptr = eptr;
}
cv.notify_all();
}
void terminate(){
{
std::unique_lock ul {m};
if (stop || failed ) throw std::runtime_error("enqueing has stopped");
stop = true;
}
cv.notify_one(); // notify stop signal
}
void next(T t) {
{
std::unique_lock ul{m};
if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
values.push(t);
}
cv.notify_one(); // notify the consumer (if waiting)
}
std::optional<T> consume(){
std::unique_lock ul{m};
cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
if (stop)
return std::nullopt;
else
std::rethrow_exception(_eptr);
}
// extract the value to consume
T val = values.front();
values.pop();
return std::optional<T>(val);
}
};
This is how I think the Buffer might be used (I'm still ignoring the fail()
method)
#define N 10000
Buffer<int> buf;
std::thread prod([&buf](){
for(int i = 0 ; i < N; ++i) {
std::cout << "enqueing: " << i << std::endl;
buf.next(i);
}
buf.terminate();
});
std::thread cons([&buf](){
for(int i = 0; i < N; ++i)
std::cout << "consuming: " << buf.consume().value() << std::endl;
});
prod.join();
cons.join();
I got some questions:
do you agree this is nothing but a blocking queue or am I missing something ?
do I need to implement the destructor ? If it is the case, can you please show me an example of usage that requires having it?
What happens if the object goes out of scope and nobody called
terminate()
? Should I take care of this problem ? Is it anyway aBuffer
's problem or the programmer using this class should care about it ? Can you please show me an example when this happens (I was thinking about the threads being detached instead of joined, does it fit ?) ?
1 Answer 1
Yup. Looks like a queue. Maybe all multithreading is really a queue? ... strokes beard philosophically
Naming:
terminate()
should probably be calledfinished()
ordone()
or something similar. (To distance it fromstd::terminate
).stop()
would also be wrong, since that implies we're telling the threads to stop processing regardless of whether there's more input. What we're really saying is simply that there's no more input to process.next()
should be calledpush()
... because that's what it does.consume()
would be better named something likewait_and_pop()
.
void terminate(){
{
std::unique_lock ul {m};
if (stop || failed ) throw std::runtime_error("enqueing has stopped");
stop = true;
}
cv.notify_one(); // notify stop signal
}
I don't think we need to throw an error if the queue is already stopped (or failed). We can just do nothing and return. (Similar to calling close on a file that's already closed. It's not wrong, there's just nothing to do). Especially since we have no way to check if the queue is currently stopped!
(It might be worth adding an bool is_stopped() const
. Note that making this function const
would mean making the mutex a mutable
variable (which is fine)).
We should call notify_all
instead of notify one. We presumably want all threads to stop waiting promptly.
We could use std::lock_guard
instead of std::unique_lock
(we don't need any of the extra functionality of std::unique_lock
).
void next(T t) {
{
std::unique_lock ul{m};
if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
values.push(t);
}
cv.notify_one(); // notify the consumer (if waiting)
}
We could use std::move
when pushing the value onto the queue: values.push(std::move(t));
.
Again, we could use std::lock_guard
instead of std::unique_lock
.
std::optional<T> consume(){
std::unique_lock ul{m};
cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
if (stop)
return std::nullopt;
else
std::rethrow_exception(_eptr);
}
// extract the value to consume
T val = values.front();
values.pop();
return std::optional<T>(val);
}
Here we do need std::unique_lock
. :)
Again we can do T val = std::move(values.front());
I'm not sure the logic here is quite correct. If the failed
flag is set, we probably need to do something about it even if the queue isn't empty? (I don't know what your spec says though).
Destruction:
Yep, we need a destructor. It's quite possible for the Buffer
to be destroyed while consumer threads are still trying to read from it. A contrived example:
auto consumers = std::vector<std::thread>();
{
auto buf = Buffer<int>();
for (auto i = 0; i != 5; ++i)
consumers.emplace_back([&] () { while (true) { auto value = buf.consume(); if (!value) return; std::cout << value.value(); } });
for (auto i = 0; i != 5000; ++i)
buf.next(i);
buf.terminate();
} // buf goes out of scope here! but we don't know that consumers have finished consuming!
for (auto& c : consumers)
c.join();
We have two choices:
- Make this an obvious programming error (print an error message to
std::cerr
and callstd::terminate
if necessary (whenfailed
isn't set, or whenstop
is set, but the queue isn't empty). - Make it less dangerous (assume that input is finished and we want to finish processing it, so set
stop
in the destructor and then wait for the queue to be empty).
I think the second option is better - depending on whether threads have finished work as an error condition may end up calling std::terminate
quite randomly.
-
\$\begingroup\$ I don't get why using your example, the program exits with 0 and the expected output even if the buffer went out of scope. Notice: I replaced the
while(true)
with afor
iterating N tiems and I only introduced 1 consumer and 1 producer. \$\endgroup\$Elle– Elle2021年07月14日 11:34:05 +00:00Commented Jul 14, 2021 at 11:34 -
\$\begingroup\$ It's undefined behavior, so it might work, or it might not. Maybe the consumer finished quickly, maybe the consumer is still accessing memory that has the right values in it even though the buffer is destroyed... etc. \$\endgroup\$user673679– user6736792021年07月14日 15:32:13 +00:00Commented Jul 14, 2021 at 15:32
-
\$\begingroup\$ You should probably get in the habit of using
scoped_lock
, rather thanlock_guard
. It usually doesn’t make a difference, but when it does, it really does. So why have yet another special case? Why create the problem of having to explain when to use one or the other? Just "forget"lock_guard
—remember it as yet another failed experiment, likeauto_ptr
, that ultimately became obsolete by new and better technologies—and now the only rule you need to remember is "just doauto lock = std::scoped_lock{mutex}
". \$\endgroup\$indi– indi2021年07月14日 22:31:58 +00:00Commented Jul 14, 2021 at 22:31 -
\$\begingroup\$ @indi. I guess. I'm just following the advice here: stackoverflow.com/a/60172828 \$\endgroup\$user673679– user6736792021年07月15日 08:02:57 +00:00Commented Jul 15, 2021 at 8:02