7
\$\begingroup\$

One of the features that is missing from C++11 are lockless queues. Therefore I have decided to write one myself.

A few remarks:

  • I know the code isn't technically lockless. However the lock is only used for a blocking call and can be omitted.

  • I have tested the code and it seems to work. However I found I had some trouble coming up with good tests. So if anybody has a good suggestion I'd be happy to hear them.

  • I'm aware of there being a good implementation for this in the Boost library. However I don't want to include Boost in all my projects. Just for this.

#include <atomic>
#include <memory>
#include <condition_variable> 
namespace threading
{
 template<typename T>
 class lockless_queue
 {
 private:
 template<typename U>
 struct node
 {
 friend class lockless_queue;
 node(const U data) :
 data(new U(data)),
 next(nullptr),
 isDummy(false)
 {}
 private:
 node(bool isDummy) :
 data(nullptr),
 next(nullptr),
 isDummy(isDummy)
 {}
 public:
 const bool isDummy;
 std::shared_ptr<U> data;
 std::shared_ptr<node<U>> next;
 };
 public:
 lockless_queue()
 :
 m_head(new node<T>(true)),
 m_running(true)
 {}
 ~lockless_queue()
 {
 m_running = false;
 m_newDataWaiter.notify_all();
 }
 //adds a new element to the end of the array
 void produce(const T &&data)
 {
 //bool indicating whether a notification should be sent after adding
 bool l_notifyUponAdding;
 //the new node to be added at the end of the array
 std::shared_ptr<node<T>> l_newNode(new node<T>(std::forward<const T&&>(data)));
 //pointer to the last node
 std::shared_ptr<node<T>> l_lastNode(std::atomic_load(&m_head));
 //value to compare the next of the last node with
 std::shared_ptr<node<T>> l_expectedNullPointer;
 //notify if this isn't the only node
 l_notifyUponAdding = l_lastNode->isDummy;
 do
 {
 l_expectedNullPointer.reset();
 while (l_lastNode->next)
 {
 l_lastNode = std::atomic_load(&(l_lastNode->next));
 }
 } while (!std::atomic_compare_exchange_weak(&(l_lastNode->next), &l_expectedNullPointer, l_newNode));
 if (l_notifyUponAdding)
 m_newDataWaiter.notify_one();
 }
 //Removes an element from the end of the array
 std::shared_ptr<T> consume(bool blockingCall = false)
 {
 //the pointer to the element we will consume
 std::shared_ptr<node<T>> l_head = std::atomic_load(&m_head);
 std::shared_ptr<node<T>> l_snack = std::atomic_load(&(l_head->next));
 do
 {
 //Check if the first node is null
 if (!l_snack)
 { //and if it is :
 if (blockingCall && m_running)//And this is a blocking call,
 {
 std::unique_lock<std::mutex> l_newDataWaiterLock(m_newDataWaiterMutex);
 while (!l_head->next)
 {
 m_newDataWaiter.wait(l_newDataWaiterLock);//we block until
 if (!this || !m_running)//break if the object was destroyed during the wait
 return nullptr;
 l_snack = std::atomic_load(&(l_head->next));
 }// the load yields a head that is not null(to avoid unnecessary calls on spurious wake ups)
 }
 else//And this is not a blocking call we 
 {
 return nullptr;
 }
 }
 }
 /*Not that if the atomic CAS fails The new l_snack gets updated. Since it might also be updated to nullptr if another
 thread has consumed the last node. We will have to check for this again. Hence the do while loop
 */
 while (!std::atomic_compare_exchange_weak(&(l_head->next), &l_snack, l_snack->next));
 if (l_snack)
 return l_snack->data;
 else
 return std::shared_ptr<T>();
 }
 private:
 //should be used as atomic
 std::shared_ptr<node<T>> m_head;
 std::mutex m_newDataWaiterMutex;
 std::condition_variable m_newDataWaiter;
 bool m_running;
 };
}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked May 13, 2015 at 9:49
\$\endgroup\$
5
  • 1
    \$\begingroup\$ Your code doesn't compile. The template argument of the nested class is aliasing the template argument of the enclosing class. This is not legal. \$\endgroup\$ Commented May 13, 2015 at 12:29
  • \$\begingroup\$ @EmilyL. Actually on my compiler it does compile, I did try this as a matter of fact:). But you're right I'll fix it:). Thank you:) \$\endgroup\$ Commented May 13, 2015 at 12:57
  • \$\begingroup\$ You might be interested in: drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/… \$\endgroup\$ Commented May 13, 2015 at 13:47
  • \$\begingroup\$ Note: All C++ container are lock-less (they don't take multi-threading into account and expect you to take the appropriate steps). Also lockless does not mean faster. You should write a locking version of queue as a reference and do throughput speed tests to make sure it attains the speed benefits you want (in a multi-threaded consumer and multi-threaded producer scenarios). If it is not faster than a queue that uses locks you may as well use the locking version as they are easier to get correct. \$\endgroup\$ Commented May 13, 2015 at 18:40
  • \$\begingroup\$ @Loki That's true, I've updated the title to reflect the content better. Actually that is a nice idea:) About the mutex version, that actually is a nice idea:) I will write a mutex version and post the differences:). \$\endgroup\$ Commented May 13, 2015 at 18:55

1 Answer 1

9
\$\begingroup\$

General comments

  • The node structure is defined as a struct with private fields. This makes it a class. If it has private fields, prefer a class.
  • The node structure is an implementation detail that is not exposed, you do not need to bother with private here. Just remove the friend declaration and remove the private and public declarations. Keep it simple silly ;)
  • Writing lock-free data structures is difficult and error-prone at best with risk for very subtle bugs and race conditions that only occur very rarely. Are you really sure you need a lock-free queue? Do you have profiling data to back this up? You mentioned boost was out of the question, but for your own mental health and hair growth, please do consider using a well tested lock-free queue implemented by experts.
  • The use of template<typename U> is unnecessary. The nested class is automatically a template class with the same parameters as the enclosing class. Simply change node<T> to node and remove template<typename U> from the class declaration.

Also, I'm pretty sure that this:

if (!this || !m_running) //break if the object was destroyed during the wait

is undefined behaviour. If the object has been destroyed, there is nothing that says that this will have been set to nullptr in fact I'd wager it won't. At any rate as your waiters are reading from this you need to inhibit destruction of this until all waiters have returned. Otherwise you risk reading freed memory.

You should initialize all variables when they are declared:

bool l_notifyUponAdding;

should be:

bool l_notifyUponAdding = l_lastNode->isDummy;

API and Naming

To me the names of produce and consume are not very apt as they don't reflect the way I think about a queue and they don't match the naming of the STL queue.

I would much prefer if your class implemented the same API as std::queue where applicable. Or at least used the same terminology such as push and pop.

Performance

This:

 std::shared_ptr<node> l_newNode(new node(std::forward<const T&&>(data)));

should be:

 auto l_newNode = std::make_shared<node>(std::forward<const T&&>(data));

this only does one memory allocation and gives you better performance when using the shared_ptr as the reference count will be allocated together with the data.

Which brings me to my next point:

Use Forwarding Reference Correctly

This:

void produce(const T &&data){
 ...
 std::shared_ptr<node> l_newNode(new node(std::forward<const T&&>(data)));

really should be:

void produce(T&& data){
 ...
 std::shared_ptr<node> l_newNode(new node(std::forward<T&&>(data)));

in the template context T&& denotes a forwarding reference (universal reference to some). And will take the correct type depending on how it is called.

Edit

You should also properly forward the argument in the node constructor:

node(U&& data)
 :data(new U(std::forward<U>(data)))

Thread Safety

I'm not going to review thread safety as I'm not confident enough in the correct behaviour of the code.

Addendum: Graceful Shutdown

Requested in comments. To make a graceful shutdown when you may have other threads waiting on data on the queue you need two things:

  • Ability to determine if there are any waiters.
  • Defer destruction until no one is waiting.

I haven't tested the following but it shows the concept:

#include <atomic>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <thread>
class counter_guard{
public:
 counter_guard(std::atomic<int>& a)
 : v(a) { v++; }
 ~counter_guard(){ v--; }
private:
 std::atomic<int>& v;
};
class blocking_pipe{
public:
 ~blocking_pipe(){
 m_enabled = false;
 m_signal.notify_all();
 // Busy wait or you can use another condition_variable
 while (0 != m_users){
 std::this_thread::yield();
 }
 }
 void push(int val){
 counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
 assert(m_enabled); // It's the users responsibility to not push to a pipe being destroyed.
 std::lock_guard<std::mutex> lg(m_mutex);
 m_queue.push(val); 
 }
 int pop(){
 counter_guard cg(m_users); // Prevents "this" from being destroyed until we leave the function body.
 assert(m_enabled); // It's the users responsibility to not pop a pipe being destroyed.
 std::unique_lock<std::mutex> lg(m_mutex);
 m_signal.wait(lg, [this](){ return !m_enabled || !m_queue.empty(); });
 if (!m_queue.empty()){
 // Here m_enabled might be false, but the destructor has not ran yet (we hold a user count)
 // so we can still return useful data to the caller.
 auto ans = m_queue.front();
 m_queue.pop();
 return ans;
 }
 else{
 // This means m_enabled == false definitively.
 throw std::exception("Pipe severed!"); // non-standard VS2013 constructor
 }
 }
private:
 std::queue<int> m_queue;
 std::atomic<bool> m_enabled{ true };
 std::atomic<int> m_users{0};
 std::condition_variable m_signal;
 std::mutex m_mutex;
};
answered May 13, 2015 at 13:03
\$\endgroup\$
10
  • \$\begingroup\$ Thank you very much for the fast and extensive review. On most accounts I agree with you:). I will upload an updated version of the code as soon as I can:). About the naming: I also intially named the functions push and pop to. But a pop function usually doesn't return a value. That's also a difference between my implementation and std::queue. SInce std::queue has both pop and begin function. Then finally, what do you mean by your very last sentence? \$\endgroup\$ Commented May 13, 2015 at 14:18
  • \$\begingroup\$ To be honest, I'm not sure I got cut off in my thoughts lol. I'll just remove it. I would probably have used raw pointers to avoid the reference counting and all the memory barriers it entails, as it is all internal to the class but that is just me. \$\endgroup\$ Commented May 13, 2015 at 14:22
  • \$\begingroup\$ Haha I already had a shrude suspision that was just half a sentence. However, shared pointers are actually the only way to implement a queue like this as far as i know. This is a very interesting talk about that:youtube.com/watch?v=CmxkPChOcvw \$\endgroup\$ Commented May 13, 2015 at 14:25
  • \$\begingroup\$ If I am not mistaken, std::make_shared also helps with exception safety. It could be interesting to add this point to the std::make_shared paragraph. \$\endgroup\$ Commented May 13, 2015 at 14:26
  • \$\begingroup\$ Also the part about the if (!this) I know it's not the best and i'm pretty sure it's bound to fail. However, the problem I had that was that when an external call to consume is made, and the wait is used. It is impossible to make sure the thread has exited properly. Would you happen to have a bright idea for that:)? \$\endgroup\$ Commented May 13, 2015 at 14:27

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.