There's lots of code out there for basic adapters of std::deque
to provide a thread-safe queue. I've adopted that, but wanted to provide a relatively full analog to std::queue
, so I've added the whole set of constructors and operator=
. The queue works fine. My questions are mainly related to the constructors and operator=
:
- Have I applied the correct type traits for identifying whether the constructor will be
noexcept
? - I'm using
notify_once
forpush
andemplace
, andnotify_all
foroperator=
overloads. Is that correct? - Because I need to acquire a lock before altering the queue, the constructors have to be written a bit differently from the
std::queue
adapter. For example, the initializer list can't include the data copy/move. Does that code look correct? - The conditional
noexcept
syntax I've added forsize
andempty
looks bizarre. Am I doing that correctly?
Other comments not related to those questions are welcome. One note about the class: I did add some combination methods (e.g., clear_count_push
) because that is a common combination of calls that I use—such as at program shutdown when I push a close thread semaphore on the queue so that the thread taking work off the queue will know it's time to shut down.
Note: since posting I've updated this significantly. See https://gist.github.com/rsjaffe/59d22db0649d8276e42aca1061d7c08c for the latest version.
template<typename T, class Container = std::deque<T>> class BlockingQueue {
public:
using container_type = Container;
using value_type = typename Container::value_type;
using size_type = typename Container::size_type;
using reference = typename Container::reference;
using const_reference = typename Container::const_reference;
static_assert(std::is_same_v<T, value_type>, "container adaptors require consistent types");
// Constructors: see https://en.cppreference.com/w/cpp/container/queue/queue
// These are in same order and number as in cppreference
/*1*/ BlockingQueue() noexcept(std::is_nothrow_default_constructible_v<Container>){};
/*2*/ explicit BlockingQueue(const Container& cont) noexcept(
std::is_nothrow_copy_constructible_v<Container>)
: queue_{cont}
{
}
/*3*/ explicit BlockingQueue(Container&& cont) noexcept(
std::is_nothrow_move_constructible_v<Container>)
: queue_{std::move(cont)}
{
}
/*4*/ BlockingQueue(const BlockingQueue& other)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = other.queue_;
}
/*5*/ BlockingQueue(BlockingQueue&& other) noexcept(
std::is_nothrow_move_constructible_v<BlockingQueue>)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = std::move(other.queue_);
}
/*6*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
explicit BlockingQueue(const Alloc& alloc) noexcept(
std::is_nothrow_constructible_v<Container, const Alloc&>)
: queue_{alloc}
{
}
/*7*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(const Container& cont, const Alloc& alloc) : queue_{cont, alloc}
{
}
/*8*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(Container&& cont, const Alloc& alloc) noexcept(
std::is_nothrow_constructible_v<Container, Container, const Alloc&>)
: queue_(std::move(cont), alloc)
{
}
/*9*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(const BlockingQueue& other, const Alloc& alloc) : queue_(alloc)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = other.queue_;
}
/*10*/ template<class Alloc,
class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(BlockingQueue&& other, const Alloc& alloc) noexcept(
std::is_nothrow_constructible_v<Container, Container, const Alloc&>)
: queue_(alloc)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = std::move(other.queue_);
}
// operator=
BlockingQueue& operator=(const BlockingQueue& other)
{
{
auto lock{std::scoped_lock(mutex_, other.mutex_)};
queue_ = other.queue_;
}
condition_.notify_all();
return *this;
}
BlockingQueue& operator=(BlockingQueue&& other) noexcept(
std::is_nothrow_move_assignable_v<Container>)
{
{
auto lock{std::scoped_lock(mutex_, other.mutex_)};
queue_ = std::move(other.queue_);
}
condition_.notify_all();
return *this;
}
// destructor
~BlockingQueue() = default;
// methods
void push(const T& value)
{
{
auto lock{std::scoped_lock(mutex_)};
queue_.push_back(value);
}
condition_.notify_one();
}
void push(T&& value)
{
{
auto lock{std::scoped_lock(mutex_)};
queue_.push_back(std::move(value));
}
condition_.notify_one();
}
template<class... Args> void emplace(Args&&... args)
{
{
auto lock{std::scoped_lock(mutex_)};
queue_.emplace_back(std::forward<Args>(args)...);
}
condition_.notify_one();
}
T pop()
{
auto lock{std::unique_lock(mutex_)};
condition_.wait(lock, [this]() noexcept(noexcept(std::declval<Container&>().empty())){
return !queue_.empty();
});
T rc{std::move(queue_.front())};
queue_.pop_front();
return rc;
}
[[nodiscard]] std::optional<T> try_pop()
{
auto lock{std::scoped_lock(mutex_)};
if (queue_.empty())
return std::nullopt;
T rc{std::move(queue_.front())};
queue_.pop_front();
return rc;
}
void clear()
{
auto lock{std::scoped_lock(mutex_)};
queue_.clear();
}
[[nodiscard]] auto size() const noexcept(noexcept(std::declval<Container&>().size()))
{
auto lock{std::scoped_lock(mutex_)};
return queue_.size();
}
[[nodiscard]] auto clear_count()
{
auto lock{std::scoped_lock(mutex_)};
auto ret = queue_.size();
queue_.clear();
return ret;
}
auto clear_count_push(const T& value)
{
size_type ret;
{
auto lock{std::scoped_lock(mutex_)};
ret = queue_.size();
queue_.clear();
queue_.push_back(value);
}
condition_.notify_one();
return ret;
}
auto clear_count_push(T&& value)
{
size_type ret;
{
auto lock{std::scoped_lock(mutex_)};
ret = queue_.size();
queue_.clear();
queue_.push_back(std::move(value));
}
condition_.notify_one();
return ret;
}
template<class... Args> auto clear_count_emplace(Args&&... args)
{
size_type ret;
{
auto lock{std::scoped_lock(mutex_)};
ret = queue_.size();
queue_.clear();
queue_.emplace_back(std::forward<Args>(args)...);
}
condition_.notify_one();
return ret;
}
[[nodiscard]] bool empty() const noexcept(noexcept(std::declval<Container&>().empty()))
{
auto lock{std::scoped_lock(mutex_)};
return queue_.empty();
}
private:
Container queue_{};
mutable std::condition_variable condition_{};
mutable std::mutex mutex_{};
};
1 Answer 1
/*9*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(const BlockingQueue& other, const Alloc& alloc) : queue_(alloc)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = other.queue_;
}
This doesn't look quite right. queue_ = other.queue;
may cause the allocator from other.queue
to be used, depending on the allocator traits.
To ensure correct behavior, we probably need to use the relevant Container
constructor instead:
/*9*/ template<class Alloc, class = std::enable_if_t<std::uses_allocator_v<Container, Alloc>>>
BlockingQueue(const BlockingQueue& other, const Alloc& alloc)
{
auto lock{std::scoped_lock(other.mutex_)};
queue_ = Container(other.queue_, alloc);
}
(And the same for /*10*/
).
[[nodiscard]] auto size() const noexcept(noexcept(std::declval<Container&>().size()))
[[nodiscard]] auto clear_count()
... etc.
We could use size_type
rather than auto
.
[[nodiscard]] auto clear_count()
... etc.
Perhaps count_clear
would better reflect the order that this function does things in. (And the same for the other similar functions).
(I'm not very familiar with noexcept
, so I haven't checked that aspect).
-
\$\begingroup\$ Interesting. So operator= may result in the allocator being changed to the same as the source allocator. I'll change #9 and #10 to do what you suggest. Thanks. \$\endgroup\$rsjaffe– rsjaffe2019年10月23日 15:21:16 +00:00Commented Oct 23, 2019 at 15:21