I implemented, somewhat closely based on this implementation, a blocking queue with emphasis on filling up first. So if one producer and one consumer thread are using the queue, the producing queue gets prioritized.
// BQueue.hpp
//#pragma once
#ifndef _BQUEUE_HPP
#define _BQUEUE_HPP
#include <condition_variable>
#include <mutex>
#include <queue>
template <class T> class BQueue {
public:
BQueue(size_t size);
void push(T item);
bool pop(T &item);
/* ALTERNATIVE 1
void push(std::unique_ptr<T> item);
bool pop(std::unique_ptr<T> &item);
*/
private:
std::mutex _mutex;
//std::queue<std::unique_ptr<T>> _queue; // ALTERNATIVE 1
std::queue<T> _queue;
size_t _size;
std::condition_variable _condition_full;
std::condition_variable _condition_empty;
};
#include "BQueue.hxx"
#endif // _BQUEUE_HPP
implementation
// BQueue.hxx
#include "BQueue.hpp"
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <queue>
template <class T> BQueue<T>::BQueue(size_t size) : _size(size) {}
// ALTERNATIVE 1
// template <class T> void BQueue<T>::push(std::unique_ptr<T> item) {
template <class T> void BQueue<T>::push(T item) {
std::unique_lock<std::mutex> lock(_mutex);
while (_queue.size() >= _size) {
_condition_full.wait(lock, [&]() { return (_queue.size() < _size); });
}
_queue.push(std::move(item));
// _queue.push(item); // ALTERNATIVE 1
// if queue is full, notify consumation part first
if (_queue.size() >= _size) {
_condition_empty.notify_one();
}
_condition_full.notify_one();
}
// template <class T> bool BQueue<T>::pop(T &item) {
template <class T> bool BQueue<T>::pop(T &item) {
std::unique_lock<std::mutex> lock(_mutex);
while (_queue.empty()) {
if (!_condition_empty.wait_for(lock, std::chrono::seconds(1),
[&]() { return !_queue.empty(); })) {
// waited too long for input.
return false;
}
}
item = std::move(_queue.front());
// item = _queue.fron(); // ALTERNATIVE
_queue.pop();
/* THIS FOLLOWING CODE MAY BE NEEDED;
* KEEP IT IN CASE
* _condition_empty.notify_one();
* // if queue is empty, notify production
* if (_queue.empty()) { //*/
_condition_full.notify_one();
// assert(_queue.size() < _size); //*/
return true;
}
main
// alternatively: class Resource
struct Resource {
int x;
};
int main() {
BQueue<std::unique_ptr<struct Resource>> q{40};
//BQueue<struct Resource> q_alternative1[40};
std::unique_ptr<struct Resource> res1{new struct Resource};
res1->x = 42;
q.push(std::move(res1));
q.push(std::move(std::unique_ptr<struct Resource>{new struct Resource}));
for (size_t i = 0; i < 30; i++) {
std::unique_ptr<struct Resource> res{new struct Resource};
res->x = i;
q.push(std::move(res));
}
for (size_t i = 0; i < 15; i++)
q.pop(res1);
return 0;
}
Now, I came across the question of resource handling. What does make more sense? Using the shown code or implement the whole thing using std::unique_ptr
where necessary (see ALTERNATIVE 1)? std::move
even works with non-resource wrapped types, apparently.
And, is the main function leak-free or did I miss anything? Does it work for shared_ptr
types as well, or do I have to specialize because of pop(T item &)
being a reference?
1 Answer 1
So if one producer and one consumer thread are using the queue, the producing queue gets prioritized.
I don't see anything in your code that corresponds to this sentence in your explanation. I think you have no "prioritization" going on here. The only thing determining which threads get to run when is the OS thread scheduler. And there's nothing at the algorithm level that could be described as "priorities"; this isn't like a rwlock where one side can "starve" the other. Contrariwise — each side here is "feeding" the other!
std::unique_ptr<struct Resource> res1{new struct Resource};
Two or three things about this line:
- Don't use raw
new
anddelete
; usestd::make_unique
here instead. - Don't Repeat Yourself: use
auto
to avoid repetition here. - In C++, writing
struct
(orclass
orunion
) in a type-name is not idiomatic.
Thus this line should become:
auto res1 = std::make_unique<Resource>();
and your main
routine in total becomes:
int main() {
BQueue<std::unique_ptr<Resource>> q{40};
q.push(std::make_unique<Resource>(42));
q.push(std::make_unique<Resource>());
for (int i = 0; i < 30; ++i) {
q.push(std::make_unique<Resource>(i));
}
for (int i = 0; i < 15; ++i) {
std::unique_ptr<Resource> res1;
q.pop(res1);
}
}
Writing return 0;
at the end of main
is unnecessary and unidiomatic (unless you want to be explicit because you return a non-zero value by some other codepath).
About unique_ptr
inside the container versus outside: Instinctively prefer abstractions that are composable. So, if your options are:
- create
BQueue<T>
that deals internally instd::unique_ptr<T>
, or - create
BQueue<T>
that deals internally inT
,
you should obviously prefer the latter. As a bonus, this means that your code will be useful ("reusable") even for people who are still stuck on boost::unique_ptr
— they won't have to switch all their code from boost
to std
just to use your queue class.
while (_queue.size() >= _size) {
_condition_full.wait(lock, [&]() { return (_queue.size() < _size); });
}
This loop is extremely confusing. You're looping with one condition outside and a different condition in the inner lambda. I strongly recommend removing one or the other of these loops. Specifically, I would remove the inner loop.
while (_queue.size() >= _size) {
_condition_full.wait(lock);
}
The nice thing about removing the inner loop (as above) is that it's easily readable by a non-expert (say, someone coming from Java, C#, or Python). But if your target audience is C++ experts, this would also be okay:
_condition_full.wait(lock, [&]() { return (_queue.size() < _size); });
Your condition variables have mildly confusing names, IMO. Is _condition_full
the thing I should wait on when the queue is full? Or is it the thing that gets notified when the queue is full? A little thought shows that it must be the former; but I don't like thinking, even a little bit. :)
Notice that this line —
_queue.push(std::move(item));
— runs under the mutex lock. If the item's move-constructor does something sketchy, this could be dangerous.
If you generalize your BQueue
far enough that the user-programmer is allowed to customize the underlying container (e.g. change std::queue<T>
to std::queue<T, std::list<T, A>>
), then be aware that the user-defined memory allocator will also be running under the mutex lock. This probably isn't a problem at your current level of caring-about-metaprogrammy-bits, but it's something to consider.
if (!_condition_empty.wait_for(lock, std::chrono::seconds(1),
[&]() { return !_queue.empty(); })) {
// waited too long for input.
return false;
}
This (a function that blocks, but only for 1 second) is weird and dangerous. Don't do this.
If you want a pop
function, write one. If you want a try_pop
function (that can fail if the queue is empty), write one. If you want a try_pop_for
function (that can fail by timing out), write one. But what you did was
- write a
try_pop_for_1s
function (?!) - misname it
pop
(?!)
/* THIS FOLLOWING CODE MAY BE NEEDED;
* KEEP IT IN CASE
* _condition_empty.notify_one();
I believe this code is not needed. Consider:
- The queue is empty. Thread C1 enters
pop
and blocks on_condition_empty
. - Thread C2 enters
pop
and blocks on_condition_empty
. - Thread P enters
push
, adds an item to the queue, and notifies-one on_condition_empty
. - Thread P enters
push
again, adds a second item, and notifies-one on_condition_empty
.
At this point, if either thread C1 or C2 is still waiting, it's a bug in the standard library. They should both be awake and waiting their turns to grab the mutex lock and process one item each off of the queue.
Similarly, the _condition_full.notify_one()
at the end of push
should not be needed, because if you've got two threads waiting on _condition_full
, you might as well wait for the second pop
before you unblock the second producer thread.
So:
template <class T>
void BQueue<T>::push(T item) {
std::unique_lock<std::mutex> lock(_mutex);
_condition_full.wait(lock, [&]() { return (_queue.size() < _size); });
_queue.push(std::move(item)); // DANGER: running user code under a lock
_condition_empty.notify_one(); // the queue is no longer empty
}
template <class T>
bool BQueue<T>::pop(T &item) {
std::unique_lock<std::mutex> lock(_mutex);
if (_condition_empty.wait_for(lock, std::chrono::seconds(1),
[&]() { return !_queue.empty(); })) {
item = std::move(_queue.front()); // DANGER: running user code under a lock
_queue.pop(); // DANGER: running user code under a lock
_condition_full.notify_one(); // the queue is no longer full
return true;
}
// waited too long for input; the queue is still empty
return false;
}
-
\$\begingroup\$ Thank you very much for the answers, although I have changed the code already quite a bit, there was still much to learn from your input. I overcome a lot of your trivial criticims like new/delete, variable names, composability. The very interesting bits like try_pop, loops you mentioned, I still struggle with.. Probably need some time to wrap my head around these. In any case, thanks. \$\endgroup\$Jonas K– Jonas K2018年01月08日 10:54:53 +00:00Commented Jan 8, 2018 at 10:54
-
\$\begingroup\$ Care to explain the danger about push(std::move(..))? Is it because I did not specify is_no_throw_constructible or similar? whats the scenario that push(std::move(...)) can become a problem? Would just limiting it is_trivially_constructible be safer? \$\endgroup\$Jonas K– Jonas K2018年01月08日 11:05:07 +00:00Commented Jan 8, 2018 at 11:05
-
\$\begingroup\$ "
push(std::move(...))
runs under the mutex lock. If the item's move-constructor does something sketchy, this could be dangerous." For example, deadlock: The code inside our item's move-constructor attempts to take a global non-recursive lock on some mutexm
, which is already held by some other thread which is currently waiting onthis->_mutex
. Or, silly example, if the move-constructor containssleep(10)
, no other thread will be able to push or pop anything for 10 seconds! This is a special low-stakes case of "it's inappropriate to run untrusted code in a privileged environment." \$\endgroup\$Quuxplusone– Quuxplusone2018年01月08日 20:57:47 +00:00Commented Jan 8, 2018 at 20:57 -
\$\begingroup\$ Constraining
T
to be trivially move-constructible, trivially move-assignable, and trivially destructible would indeed remove the danger from all three// DANGER:
comments in my rewrite above, as far as I can tell. Of course then you couldn't use your queue to storeunique_ptr
s orstring
s... \$\endgroup\$Quuxplusone– Quuxplusone2018年01月08日 20:59:39 +00:00Commented Jan 8, 2018 at 20:59 -
\$\begingroup\$ I see. C++ is hard to understand, but getting feedback like this is really helpful, cheers. \$\endgroup\$Jonas K– Jonas K2018年01月09日 22:02:20 +00:00Commented Jan 9, 2018 at 22:02