I am currently reading the book C++ Concurrency in Action by Anthony Williams. In chapter 9, he implemented a lock-based work stealing queue and mentioned it is possible to implement a lock-free queue that allows the owner thread to push and pop at one end while other threads can steal entries from the other. So, I implemented it myself here. Basically, I have 3 methods: push
, try_pop_back
, and try_steal_front
.
push
will always add a new item to the back of the queuetry_pop_back
tries to pop an existing item from the back of the queuetry_steal_front
tries to steal from the front of the queue
This queue is thread local, and push
, try_pop_back
will always be accessed by single thread, however, try_steal_front
runs in multiple threads and will compete with push
and try_pop_back
. I wonder whether the code works good across all CPU architectures (e.g. Intel x86-64, AMD, ARM, etc.)
The code is also on GitHub. I have a simple test case there which could run this queue under multiple threads.
#include <atomic>
#include <array>
//#include "functionwrapper.h"
// push and pop are accessed by only one single thread
// thread local queue
// push/pop by this single thread could compelete with steal with multiple other threads
class LockFreeWorkStealingQueue {
private:
// using DataType = FunctionWrapper;
using DataType = int;
// change to be template argument in the future
static constexpr auto DEFAULT_COUNT = 2048u;
static constexpr auto MASK = DEFAULT_COUNT - 1u;
std::array<DataType, DEFAULT_COUNT> q;
std::atomic<unsigned int> lock_front{0};
std::atomic<unsigned int> lock_back{0};
public:
LockFreeWorkStealingQueue() {}
LockFreeWorkStealingQueue(const LockFreeWorkStealingQueue&) = delete;
LockFreeWorkStealingQueue& operator=(const LockFreeWorkStealingQueue&) = delete;
/**
* always add a new item to the back of the queue
* runs sequentially with try_pop_back
* runs parallel with multiple threads' try_steal_front
*/
void push(DataType data) {
auto bk = lock_back.load(std::memory_order_acquire);
// try resetting the lock_front and lock_back to prevent
// they being too large
if (bk == lock_front.load(std::memory_order_acquire)) {
lock_front.store(0, std::memory_order_release);
lock_back.store(0, std::memory_order_release);
}
q[bk & MASK] = std::move(data);
lock_back.fetch_add(1, std::memory_order_release);
}
/**
* tries to pop an existing item from the back of the queue
* runs sequentially with push
* runs parallel with multiple threads's try_steal_front
*/
bool try_pop_back(DataType& res) {
auto ft = lock_front.load(std::memory_order_acquire);
auto bk = lock_back.load(std::memory_order_acquire);
if (bk > ft) {
while(bk && !lock_back.compare_exchange_weak(bk, bk - 1, std::memory_order_release, std::memory_order_relaxed));
res = std::move(q[(bk - 1) & MASK]);
return true;
}
return false;
}
/**
* tries to steal from the front of the queue
* runs in multiple threads
*/
bool try_steal_front(DataType& res) {
auto ft = lock_front.load(std::memory_order_acquire);
auto bk = lock_back.load(std::memory_order_acquire);
// if there is only one item in the queue, try not steal
// if stealing, contention with try_pop_back, failed anyway
if (bk && ft < bk - 1) {
while(!lock_front.compare_exchange_weak(ft, ft + 1, std::memory_order_release, std::memory_order_relaxed));
// check again to see any changes by push or try_pop_back
if (ft < lock_back.load(std::memory_order_acquire)) {
res = std::move(q[ft & MASK]);
return true;
} else {
// nothing to steal, reset lock_front
lock_front.fetch_sub(1, std::memory_order_release);
}
}
return false;
}
};
-
\$\begingroup\$ I also add the github link and there is a simple test that runs this queue under multiple threads. \$\endgroup\$Elaine– Elaine2018年07月04日 18:50:07 +00:00Commented Jul 4, 2018 at 18:50
-
\$\begingroup\$ Yeah, thanks for the comments. I commented out the FunctionWrapper, and I make DataType as int. @Toby Speight but you can also find the definition in the github link I just provided in the description. \$\endgroup\$Elaine– Elaine2018年07月04日 19:04:44 +00:00Commented Jul 4, 2018 at 19:04
2 Answers 2
I'm not qualified to talk about memory orders, so I'll skip that part (except to note that I've never met anyone else who is, either, which is why I recommend going seq_cst
all the way). I'll just pretend all your orders are seq_cst
.
If I understand correctly, you have one "producer" thread and N "consumer" threads. The producer pushes items onto the back of the queue. The consumers steal items from the front of the queue. The producer also has one additional operation: he can pop items from the back of the queue.
I feel like your names could use some reshuffling. I would intuitively expect that the producer "owns" the back of the queue and the consumers "own" the front of the queue; this means that the name "steal" is wrong, because the consumers are merely popping off items that they are perfectly entitled to pop. (Compare std::queue::pop_front
.)
Meanwhile, the producer owns the back of the queue, so he's not "stealing" anything; but he's not "popping" either, not from the back of a queue! (If you're pushing and popping from the same end, it's not a queue but a stack, and it's not the back but the top.) I'd say that the producer's special operation is to take back an item that he'd just put on the queue a moment ago.
Putting it all together, I'd call the operations:
push
->push_back
try_pop_back
->try_take_back
try_steal_front
->try_pop_front
Your push
operation is not named try_push
, implying that it cannot fail. But the function's block comment doesn't explain what you expect to happen if the queue is full! Is it supposed to overwrite, ring-buffer-style? or just "undefined behavior"? I assume "undefined behavior", but the comment should explain.
The try_steal_front
(a.k.a. try_pop_front
) function looks like the most interesting one, multithreading-wise, so I'll dig into it.
bool try_steal_front(DataType& res) {
auto ft = lock_front.load(std::memory_order_acquire);
auto bk = lock_back.load(std::memory_order_acquire);
// if there is only one item in the queue, try not steal
// if stealing, contention with try_pop_back, failed anyway
if (bk && ft < bk - 1) {
while(!lock_front.compare_exchange_weak(ft, ft + 1, std::memory_order_release, std::memory_order_relaxed));
Isn't this cmpxchg
loop just a verbose way of saying ft = lock_front++
? (Or I suppose ft = lock_front.fetch_add(1, std::memory_order_release)
?)
// check again to see any changes by push or try_pop_back
if (ft < lock_back.load(std::memory_order_acquire)) {
This line scares me because it has an atomic operation on the same line with a bunch of other computation. I strongly recommend isolating atomic operations on their own lines, even when you think it's safe (as I think it is in this case).
Let me just rewrite it in my own inefficient style:
bool try_pop_front(DataType& res) {
unsigned ft = lock_front.load();
unsigned bk = lock_back.load();
if (bk && ft < bk - 1) {
ft = lock_front++;
bk = lock_back.load();
if (ft < bk) {
res = std::move(q[ft & MASK]);
return true;
} else {
// nothing to steal, reset lock_front
--lock_front;
}
}
return false;
}
Manipulating lock_front
even in the case that there's nothing to steal strikes me as probably dangerous... but I'm too lazy to find out for sure. I'd definitely recommend digging into that, though.
But I think I found my obligatory lock-free bug anyway! Look here in push
:
// try resetting the lock_front and lock_back to prevent
// they being too large
if (bk == lock_front.load(std::memory_order_acquire)) {
lock_front.store(0, std::memory_order_release);
lock_back.store(0, std::memory_order_release);
}
If I understand correctly, this code is run by the producer thread. Elsewhere in the program, you still have a bunch of consumer threads which are eagerly watching the front of the queue, ready to pull items off of it as soon as lock_front < lock_back
.
And what does the producer do here? It sets lock_front
to 0
while lock_back
is still very large! There's a brief instant where, to the consumers, it looks like Christmas: suddenly the queue is so full of items. And they'll begin pulling off those items... until the producer finally runs the next line and sets lock_back
to 0
(which will now be less than lock_front
).
You can patch this particular bug by just switching the order of these two lines: set lock_back
to 0
first. This fixes the immediate bug because the consumers are checking (bk && ...)
before they pull anything off.
However, I'm fairly confident that this just makes the bug more subtle. Now you need two consumers in order to see the bug. Both consumers check (bk && ...)
successfully. Then the first consumer pops an item, making the queue empty. Then the producer runs push
(up through the resetting of lock_back
but not lock_front
). Now the second consumer runs:
if (ft < lock_back.load(std::memory_order_acquire)) {
lock_back
is 0
and ft
is very large, so the second consumer skips to the "failure" branch. Now the producer finishes resetting lock_front
to 0
. And finally, the second consumer fails:
// nothing to steal, reset lock_front
lock_front.fetch_sub(1, std::memory_order_release);
So at the end of this cycle, we've got lock_back==0, lock_front==UINT_MAX
.
Now a third consumer comes in, and gets an early Christmas — er, wait, no, a famine! lock_back < lock_front
means the queue's size is "negative," right?
The producer thread is still suspended right before this line—
q[bk & MASK] = std::move(data);
lock_back.fetch_add(1, std::memory_order_release);
—and has no idea that all this is taking place.
I think this story holds together. Not 100% sure, though. You can never be 100% sure with lock-free code. :)
Corroborating evidence: Remember how I said that manipulating lock_front
seemed dangerous? That's exactly where we found our bug! The real red flag is related to my naming discussion. Remember that the producer owns the back of the queue; the consumers own the front of the queue. The one place we hit our data race is precisely when the producer tried to modify lock_front
— because the producer does not own the front of the queue!
-
\$\begingroup\$ Sharp-eyed! And also the follow-up analysis regarding * fairly confident that this just makes the bug more subtle*! I agree with you. When I worked on the code, I also suspected the resetting part in the push function, I guess I need to delete them for now, maybe find another better way to add it in. I just assume the queue is so large, then lock_front/lock_back will never lead to unsigned int overflow. \$\endgroup\$Elaine– Elaine2018年07月04日 23:16:17 +00:00Commented Jul 4, 2018 at 23:16
-
\$\begingroup\$ (Continued above) I guess I also need to change some names, lol. I'll keep the stealing name since it reflects this is a work stealing queue, not a general queue. But I totally agree your point if it's intended to be a general single-producer/multi-consumer queue. Many thanks! I will change my code in the next answer to reflect your comments :) \$\endgroup\$Elaine– Elaine2018年07月04日 23:19:43 +00:00Commented Jul 4, 2018 at 23:19
Commented out and change the code to reflect on Quuxplusone's comments. And many thanks to Quuxplusone :)
EDIT: add comments to explain undefined behaviors in push_back.
EDIT: Change fetch_add/sub to be acq rel in the stealing mode.
#include <atomic>
#include <array>
#include "functionwrapper.h"
// push and pop are accessed by only one single thread
// thread local queue
// push/pop by this single thread could compelete with steal with multiple other threads
class LockFreeWorkStealingQueue {
private:
using DataType = FunctionWrapper;
// change to be template argument in the future
static constexpr auto DEFAULT_COUNT = 2048u;
static constexpr auto MASK = DEFAULT_COUNT - 1u;
std::array<DataType, DEFAULT_COUNT> q;
std::atomic<unsigned int> lock_front{0};
std::atomic<unsigned int> lock_back{0};
public:
LockFreeWorkStealingQueue() {}
LockFreeWorkStealingQueue(const LockFreeWorkStealingQueue&) = delete;
LockFreeWorkStealingQueue& operator=(const LockFreeWorkStealingQueue&) = delete;
/**
* add a new item to the back of the queue
* runs sequentially with try_pop_back
* runs parallel with multiple threads' try_steal_front
*
* NOTE: If this makes the queue's size 2048 or more, the behavior is undefined or
* the contents are overwritten which will never get popped or stolen.
* If this is the 2^32th call to push, the behavior is undefined.
*/
void push_back(DataType data) {
auto bk = lock_back.load(std::memory_order_acquire);
// try resetting the lock_front and lock_back to prevent
// they being too large - commented out since it causes race conditions
//if (bk == lock_front.load(std::memory_order_acquire)) {
// lock_front.store(0, std::memory_order_release);
// lock_back.store(0, std::memory_order_release);
//}
q[bk & MASK] = std::move(data);
lock_back.fetch_add(1, std::memory_order_release);
}
/**
* tries to pop an existing item from the back of the queue
* runs sequentially with push
* runs parallel with multiple threads's try_steal_front
*/
bool try_pop_back(DataType& res) {
auto ft = lock_front.load(std::memory_order_acquire);
auto bk = lock_back.load(std::memory_order_acquire);
if (bk > ft) {
while(bk && !lock_back.compare_exchange_weak(bk, bk - 1, std::memory_order_release, std::memory_order_relaxed));
res = std::move(q[(bk - 1) & MASK]);
return true;
}
return false;
}
/**
* tries to steal from the front of the queue
* runs in multiple threads
*/
bool try_steal_front(DataType& res) {
auto ft = lock_front.load(std::memory_order_acquire);
auto bk = lock_back.load(std::memory_order_acquire);
// if there is only one item in the queue, try not steal
// if stealing, contention with try_pop_back, failed anyway
if (bk && ft < bk - 1) {
// while(!lock_front.compare_exchange_weak(ft, ft + 1, std::memory_order_release, std::memory_order_relaxed));
ft = lock_front.fetch_add(1, std::memory_order_acq_rel);
// check again to see any changes by push or try_pop_back
bk = lock_back.load(std::memory_order_acquire);
if (ft < bk) {
res = std::move(q[ft & MASK]);
return true;
} else {
// nothing to steal, reset lock_front
lock_front.fetch_sub(1, std::memory_order_acq_rel);
}
}
return false;
}
};
-
1\$\begingroup\$ You haven't yet addressed my "block comment" comment. "Always add a new item to the back of the queue" should be more like (I think) "Add a new item to the back of the queue. If this makes the queue's size 2048 or more, the behavior is undefined. If this is the 2^32th call to
push
, the behavior is undefined." \$\endgroup\$Quuxplusone– Quuxplusone2018年07月05日 17:06:46 +00:00Commented Jul 5, 2018 at 17:06 -
\$\begingroup\$ Ahhh, sorry, I missed that. Thanks @Quuxplusone Just added. I guess this version should be correct if we do not consider the undefined behaviors in push_back and memory_order issues. I will work on verifying the memory order stuff and also trying to eliminate the undefined behaviors if possible. Thanks again :) \$\endgroup\$Elaine– Elaine2018年07月05日 21:04:25 +00:00Commented Jul 5, 2018 at 21:04
-
1\$\begingroup\$ "I guess this version should be correct" — I didn't say that! ;) I'd still bet money there's a bug in it, because there's always a bug in lock-free code. But having grabbed the low-hanging fruit already, I'm quitting this question while I'm ahead. :) \$\endgroup\$Quuxplusone– Quuxplusone2018年07月06日 00:49:13 +00:00Commented Jul 6, 2018 at 0:49
Explore related questions
See similar questions with these tags.