I implemented the following class to dispatch std::function<void(void)>
objects to a thread pool. Multiple threads will block on the pop call until a task is available.
I am using the C++14 standard.
#pragma once
#include<memory>
#include<queue>
#include<mutex>
#include<condition_variable>
namespace alpha {
template<typename T>
class AsyncQueue {
private:
std::shared_ptr<std::queue<T>> queue;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cond;
public:
/* Big 5 */
AsyncQueue(const AsyncQueue& other) = default;
AsyncQueue(AsyncQueue&& other) = default;
AsyncQueue& operator=(const AsyncQueue& other) = default;
AsyncQueue& operator=(AsyncQueue&& other) = default;
virtual ~AsyncQueue() = default;
/**
* @brief Default constructor
*/
AsyncQueue() :
queue(new std::queue<T>()),
mutex(new std::mutex),
cond(new std::condition_variable()) {
}
/**
* @brief Push a value to the async queue
*/
void push(const T& object) {
{
std::lock_guard<std::mutex> lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Push a value to the async queue (move variant)
*/
void push(T&& object) {
{
std::lock_guard<std::mutex> lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Pop a value from the queue
*/
T&& pop() {
std::unique_lock<std::mutex> lock(*mutex);
while(queue->empty()) {
cond->wait(lock);
}
T object = std::move(queue->front());
queue->pop();
return std::move(object);
}
};
}
Specific questions:
- Is there a way to keep both implementations of
push
DRY? The only difference is the type of the argument. - Is this safe? I tested it multiple times with
true; while [[ $? == 0 ]]; do ./test; done
, but i'd feel a lot better if someone told me that my idea of thread-safety is correct. - pop returns a rvalue-reference, which seems to cause problems when passing it directly to a function with the argument type
const T&
. Assigning it to a local variable first solves that, but is there a way to make push return a rvalue-reference only when it is needed? I am asking because i know C++ compilers implicitly move a lot.
EDIT: The version that has the suggestions included can be found here. I applied the BSD 3-clause license to it.
2 Answers 2
Separation of concerns
You have the members of your class as shared_ptr
s this indicates that you want shallow copy assignment and construction. This is not something that you should concern yourself when designing the class.
I would recommend that you design your queue as non-copyable and non-assignable. Because assigning a queue makes no sense. If the user needs to pass a queue around, it is their responsibility to make a shared_ptr<AsyncQueue>
. This has the benefit of making your implementation cleaner, but also performing better as your member variables will now get a better cache locality.
The big 5
Following the above the big 5 should now be:
AsyncQueue(const AsyncQueue& other) = delete;
AsyncQueue(AsyncQueue&& other) = delete;
AsyncQueue& operator=(const AsyncQueue& other) = delete;
AsyncQueue& operator=(AsyncQueue&& other) = delete;
virtual ~AsyncQueue() = default;
(note delete
instead of default
). The move assignment and construction cannot be implemented as moving a mutex is not supported (for good reason).
Use make_shared
If you still insist on using shared_ptr
, you should use make_shared
to construct them because this will co-allocate the reference count with the object data. This saves some memory but more importantly it improves data locality.
Forwarding references
You're better off using forwarding references (also known as universal reference but forwarding reference is the new preferred name) for the push
method. In fact I would go as far as to not provide push
but instead provide emplace
that constructs the object in place like this:
template<typename... Args>
void emplace(Args&&... args){
{
std::lock_guard<std::mutex> lock(*mutex);
queue->emplace(std::forward<Args>(args)...);
}
cond->notify_one();
}
emplace()
will construct the queued object in place using the arguments passed to emplace()
as constructor arguments to avoid a copy. This way you leave the decision whether or not to move to your user.
Obj o;
asyncqueue.emplace(o); // Will copy construct.
asyncqueue.emplace(std::move(o)); // Will move construct.
// Will evaluate o*3 into a temporary and move from it.
// Possibly eliding the copy.
asyncqueue.emplace( o * 3);
asyncqueue.emplace("whatevz"); // Will call constructor taking c-string.
Returning by move in pop
The reason you're crashing is that the local value you moved out of is destroyed when you leave the function scope. So you're moving from rubbish. Returning by r-value reference: Just don't. The compiler will likely perform (N)RVO and the copy will be optimized out if it is possible.
Condtional wait
You have an infinite conditional wait. It may be useful to have a functionality to "sever" the queue and release all threads waiting on jobs.
Typically you would have:
std::atomic<bool> still_running{true};
void stop(){ still_running = false; cond->notify_all();}
void push(T&&){
if(!still_running) throw error("severed pipe");
...
}
void pop(){
...
while(queue->empty() && still_running) {
cond->wait(lock);
}
if(queue->empty(){
// Check for queue empty here, as still_running could
// have been changed after we returned but we could still
// return useful data.
throw error("severed pipe");
}
...
}
Also prefer the more concise way of waiting on a condition variable:
while(queue->empty()) {
cond->wait(lock);
}
could be:
cond->wait(lock, [this](){ return !queue->empty();});
For reference, you might be interested in: my implementation of a similar class
-
\$\begingroup\$ "The following is not a function that will bind to a r-value reference" — Wrong;
T&&
here is not a universal (read: forwarding) reference.T&&
is a non-const rvalue reference toT
, which is a very specific type in this context — it's the type thatAsyncQueue<T>
was instantiated with. For example,AsyncQueue<int>::push(int&&)
. A forwarding reference would only exist here ifT
were a completely new template type parameter subject to type deduction. \$\endgroup\$Quuxplusone– Quuxplusone2015年07月20日 17:28:02 +00:00Commented Jul 20, 2015 at 17:28 -
\$\begingroup\$ Looks good to me now! FYI, I (and
wc
) disagree with you about which version of thecond.wait
code is "more concise". ;) \$\endgroup\$Quuxplusone– Quuxplusone2015年07月20日 22:24:05 +00:00Commented Jul 20, 2015 at 22:24 -
\$\begingroup\$ @Quuxplusone I prefer not to write explicit loops when I can, no particular reson I just think that if my logic can be expressed using a standard algorithm, it's better that way. For example I prefer to use
std::copy
over a raw loop to copy an array to another. \$\endgroup\$Emily L.– Emily L.2015年07月21日 07:17:40 +00:00Commented Jul 21, 2015 at 7:17
Is there a way to keep both implementations of push DRY? The only difference is the type of the argument.
I actually think you don't need the void push(const T& object)
, a universal reference should bind to a const value. You should use std::forward
to get the move semantics though, otherwise you copy on queue->push(object)
/**
* @brief Push a value to the async queue (move variant)
*/
void push(T&& object) {
{
std::lock_guard<std::mutex> lock(*mutex);
queue->push(std::forward<T>(object));
}
cond->notify_one();
}
Is this safe? I tested it multiple times with true; while [[ $? == 0 ]]; do ./test; done, but i'd feel a lot better if someone told me that my idea of thread-safety is correct.
Seems safe for pushing/poping but it could leak while construction.
std::shared_ptr<std::queue<T>> queue;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cond;
Do you really need ptr's? I don't see a reason for them. If you really need them use std::make_shared
AsyncQueue() :
queue(std::make_shared<std::queue<T>>()),
mutex(std::make_shared<std::mutex>()),
cond(std::make_shared<std::condition_variable>()) {
}
pop returns a rvalue-reference, which seems to cause problems when passing it directly to a function with the argument type const T&. Assigning it to a local variable first solves that, but is there a way to make push return a rvalue-reference only when it is needed? I am asking because i know C++ compilers implicitly move a lot.
Thats because you can't return a dangling reference to a (to be cleaned-up) local variable. You should just return T
to get the move-out behavior. Also, you don't need to cast the variable to a rvalue std::move(object)
because a rule called '(named) return value optimization' should move it out anyways, or, if possible, creates the variable directly in-place. Your std::move prevents NRVO and RVO.
AsyncQueue(const AsyncQueue& other) = default;
AsyncQueue(AsyncQueue&& other) = default;
AsyncQueue& operator=(const AsyncQueue& other) = default;
AsyncQueue& operator=(AsyncQueue&& other) = default;
virtual ~AsyncQueue() = default;
Oh, now i see why you use shared_ptr's. But thats not a good idea in my opinion. Rather than sharing the state state between copies of this class use a shared_ptr<AsyncQueue>
and pass that around
-
\$\begingroup\$ I needed to make push a template for this to work (
template<typename U> void push(U&& object)
). When i used T&& it did not compile (cannot bind lvalue to rvalue reference
). With a template it works. Thank you for your review. \$\endgroup\$Richard– Richard2015年07月20日 13:10:14 +00:00Commented Jul 20, 2015 at 13:10 -
\$\begingroup\$ "a universal reference should bind to a const value" — Correct; but
T&&
here is not a universal (read: forwarding) reference.T&&
is a non-const rvalue reference toT
, which is a very specific type in this context — it's the type thatAsyncQueue<T>
was instantiated with. For example,AsyncQueue<int>::push(int&&)
. A forwarding reference would only exist here ifT
were a completely new template type parameter subject to type deduction. (As the OP has already noticed in the comment above.) \$\endgroup\$Quuxplusone– Quuxplusone2015年07月20日 17:27:07 +00:00Commented Jul 20, 2015 at 17:27
Explore related questions
See similar questions with these tags.
virtual
destructor is inheritance. Do other classes inherit publicly fromAsyncQueue
? If not, then don't give it a virtual destrutor and possibly also mark the class asfinal
. \$\endgroup\$