I have a simple lock-free queue, which supports multiple producers, but only one consumer. It is based on a constant-sized ring buffer and stores only pointers to a class and not actual instances. Also it uses pre-C++11 and GCC-specific functions.
Is is implemented properly or does it have a data race? I only care that it works properly on x64/x86 Linux.
// header
class LockFreeQueue {
public:
LockFreeQueue(unsigned long size);
~LockFreeQueue();
bool Enqueue(Element* element);
bool Dequeue(Element** element);
bool Peek(Element** element);
unsigned long Size();
private:
unsigned long size_;
volatile Element** elements_;
volatile unsigned long idx_r_; // index of read
volatile unsigned long idx_w_; // index of write
volatile unsigned long idx_w_cas_;
};
// impl
LockFreeQueue::LockFreeQueue(unsigned long size) {
size_ = size;
idx_r_ = 0;
idx_w_ = 0;
idx_w_cas_ = 0;
elements_ = (volatile Element**)(new Element*[size]);
}
LockFreeQueue::~LockFreeQueue() {
delete[] elements_;
}
bool LockFreeQueue::Enqueue(Element* element) {
while (true) {
unsigned long prev_idx_w = idx_w_cas_;
unsigned long next_idx_w = (prev_idx_w + 1) % size_;
if (next_idx_w == idx_r_) return false;
if (__sync_bool_compare_and_swap(&idx_w_cas_, prev_idx_w, next_idx_w)) {
elements_[prev_idx_w] = element;
while (!__sync_bool_compare_and_swap(&idx_w_, prev_idx_w, next_idx_w)) {}
break;
}
}
return true;
}
bool LockFreeQueue::Dequeue(Element** element) {
if (idx_r_ == idx_w_) return false;
unsigned long next_idx_r = (idx_r_ + 1) % size_;
*element = (Element*) elements_[idx_r_];
idx_r_ = next_idx_r;
return true;
}
bool LockFreeQueue::Peek(Element** element) {
if (idx_r_ == idx_w_) return false;
*element = (Element*) elements_[idx_r_];
return true;
}
unsigned long LockFreeQueue::Size() {
if (idx_w_ >= idx_r_) return idx_w_ - idx_r_;
return size_ - idx_r_ + idx_w_;
}
Note that I don't have any tests for the code. It also was written by someone else I cannot reach.
I know that volatile
alone does not guarantee correctness, but I wonder if is sufficient together with CAS.
EDIT:
Usage in existing code goes like this:
Producer thread (x4):
LockFreeQueue* theQueue = ...
while (true) {
Element* element = new Element();
// do some short work
//
while(!theQueue->Enqueue(element)) {}
}
Consumer thread:
LockFreeQueue* theQueue = ...
while (true) {
Element* element;
if (theQueue->Dequeue(&element)) {
// do some work with the element
//
delete element;
}
}
1 Answer 1
Some of the code seems to be missing. I needed to add a declaration of Element
to get it to compile. I wonder if this class should actually be a template, with Element
as parameter?
Constructor should initialise elements, rather than assigning:
LockFreeQueue::LockFreeQueue(unsigned long size)
: size_{size},
elements_{new volatile Element*[size]},
idx_r_{0},
idx_w_{0},
idx_w_cas_{0}
{
}
(I think that really ought to be new Element* volatile[size]
, since it's the pointers that should be volatile - really, atomic - not the pointed-to Element
objects)
Copying a queue would cause double-free of elements_
, so it's wise to prevent that:
LockFreeQueue(const LockFreeQueue&) = delete;
void operator=(const LockFreeQueue&) = delete;
Much depends on size_
not changing, so declare that const. The Size()
function ought to be const
:
unsigned long Size() const;
Most C++ programmers would expect that to be called size()
and return a std::size_t
, like a standard container.
I think Peek()
also should be const
.
The modular reduction % size_
might be better as a separate conditional subtraction, given that / size
is always 0 or 1. It's used often enough that it might we worth a small inline function next_index()
.
The volatile
modifier doesn't give you any thread-safety guarantees - use std::atomic<>
instead. Using the standard types and operations will make the code more portable and perhaps more performant too (if we can replace full barrier with appropriate read or write barrier). In any case, it appears that it's not the Element
objects that should be atomic, but rather our pointers to them:
std::atomic<Element*> *elements_;
Or better, releasing us from having to provide a destructor:
std::vector<std::atomic<Element*>> elements_;
Consider whether a smart pointer of some kind (perhaps even std::weak_ptr
) might be more appropriate than raw pointers for the contained elements. Or are we willing to impose a requirement on our users not to delete anything that's in the queue?
volatile
doesn't protect your variables from race conditions. You can read more about that here. \$\endgroup\$#include <atomic>
rather than relying on compiler-specific extensions. BTW, you should consider including your test code in the question, so we can see how you expect this to be used (and perhaps identify tests you've missed). \$\endgroup\$volatile
would be the right thing to use, it probably should have beenElement *volatile *elements
. \$\endgroup\$dequeue
if another thread dequeues between whenidx_r
is read and when it updates. \$\endgroup\$