I'm using this class for producer-consumer setup in C++:
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
template <typename T> class SafeQueue
{
public:
SafeQueue() :
_shutdown(false)
{
}
void Enqueue(T item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
bool was_empty = _queue.empty();
_queue.push(std::move(item));
lock.unlock();
if (was_empty)
_condition_variable.notify_one();
}
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
bool IsEmpty()
{
std::lock_guard<std::mutex> lock(_queue_mutex);
return _queue.empty();
}
void Shutdown()
{
_shutdown = true;
_condition_variable.notify_all();
}
private:
std::mutex _queue_mutex;
std::condition_variable _condition_variable;
std::queue<T> _queue;
std::atomic<bool> _shutdown;
};
Most of it looks fine to me, but the Shutdown
function looks like it might have some problems. If there are problems, what is the best way to 'unblock' a thread waiting on Dequeue
when I want to stop it?
2 Answers 2
Your concerns are correct. The std::notify_one
description says that
Even if the shared variable is atomic, it must be modified under the mutex in order to correctly publish the modification to the waiting thread.
Code wise, return true
/return false
usually indicate a room for improvement. What you are actually returning is a state of _shutdown
. Do it explicitly:
if (!shutdown) {
dequeue item;
}
return !_shutdown;
You may also notice that shutdown
is always tested with negation. It indicates that the semantics is reversed. I recommend to replace shutdown
with something like keep_running
.
-
\$\begingroup\$ Thumbs up for this detail: "You may also notice that shutdown is always tested with negation. It indicates that the semantics is reversed. I recommend to replace shutdown with something like keep_running." This is a general thing applicable various places. I will keep this one in mind :) \$\endgroup\$Stephan Møller– Stephan Møller2021年03月09日 14:38:30 +00:00Commented Mar 9, 2021 at 14:38
You should provide an overload of Enqueue that takes an rvalue reference:
void Enqueue(T&& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
bool was_empty = _queue.empty();
_queue.push(std::move(item));
lock.unlock();
if (was_empty)
_condition_variable.notify_one();
}
The current code can call the move constructor twice for any given:
queue.Enqeueu(std::move(item));
I would also suggesting replacing or supplementing your Deque
method with one that can take advantage of NRVO
T Dequeue()
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(_shutdown){
throw std::runtime_error("shutdown"); //replace with your own exception
}
T item = std::move(_queue.front());
_queue.pop();
return item;
}
Since the user of the class is in control of both Shutdown()
and Dequeue()
it does seem that calling dequeue on a queueu that is shutting down is an exceptional condition.
Explore related questions
See similar questions with these tags.
if (_shutdown) return false;
and add the return after it, as it is the longer command sequence \$\endgroup\$