0

I have this little simple queue where a one task read from a file into the queue and several tasks unpack the content. I works for a while, but eventually crashes because the queue is empty even though I in the line before check that the queue is not empty! (see the comment in the code)

#pragma once
#include <optional>
#include <future>
#include <queue>
template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
 using optional_queue_pair = std::optional<T>;
 bool put(optional_queue_pair&& p)
 {
 if (my_queue.size() >= MAX_SIZE) {
 std::unique_lock my_lock(my_mutex);
 my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
 }
 std::lock_guard<std::mutex> my_lock(my_mutex);
 my_queue.push(std::move(p));
 my_cv_add.notify_one();
 return true;
 }
 using optional_queue_pair = std::optional<T>;
 optional_queue_pair get()
 {
 if (my_queue.size() == 0) {
 std::unique_lock my_lock(my_mutex);
 my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
 }
 std::scoped_lock my_lock(my_mutex);
 optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
 my_queue.pop();
 my_cv_remove.notify_one();
 return ret;
 }
private:
 using optional_queue_pair = std::optional<T>;
 std::queue<optional_queue_pair> my_queue;
 std::mutex my_mutex;
 std::condition_variable my_cv_add;
 std::condition_variable my_cv_remove;
};

I have tried to guard from this, but apparently I do not understand the details of the mutex locking!

asked Nov 10, 2023 at 16:54

2 Answers 2

2

This is wrong:

if (my_queue.size() == 0) {
 std::unique_lock my_lock(my_mutex);
 my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}

You should lock the mutex first to check my_queue.size(). It is possible that size return > 0 but at the same time another thread pop from my_queue.

The correct would be:

std::unique_lock my_lock(my_mutex);
if (my_queue.size() == 0) {
 my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}

UPD:

Thanks Ahmed AEK, no need in "if". We can do it much simplier:

std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });

See documentation: https://en.cppreference.com/w/cpp/thread/condition_variable

answered Nov 10, 2023 at 21:40
Sign up to request clarification or add additional context in comments.

4 Comments

at one moment two threads waiting on the condition variable will wakeup, and see the predicate true, and will be trying to modify the queue without the protection of the mutex, this is catastrophic.
no. From the docs (en.cppreference.com/w/cpp/thread/condition_variable) see how wait() with predicate works: 1. releases the mutex and waits. 2. wakes up 3. acquires the mutex again (before check condition). Only one thread will get the mutex and check condition. Other threads will wait for the mutex.
you are correct, well in that case you can remove the if altogether.
it seems you are right
0

condition variables may awake multiple threads when you use notify_one spurious wakeup

you need to check again that the queue is not empty after you lock the mutex, two threads may have awaken and are waiting on the lock, only the first one will find something in the queue, the other will find it empty, you also need to put the whole thing in a while (true) because the queue is not allowed to fail.

optional_queue_pair get()
{
 while (true) // retry until succeed
 {
 if (my_queue.size() == 0) {
 std::unique_lock my_lock(my_mutex);
 my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
 }
 std::lock_guard my_lock(my_mutex); // two threads awaken one waiting on lock
 if (my_queue.size() != 0) // first thread sees true, second sees false
 {
 optional_queue_pair ret = std::move(my_queue.front());
 my_queue.pop();
 my_cv_remove.notify_all(); // because notify_one may not awaken the producer.
 return ret;
 }
 }
}

for production i would probably use something like boost lockfree queue which is multiple producer-multiple consumer lock-free queue and extensively tested instead of rolling your own (slower) implementation of a multi-consumer queue.

answered Nov 10, 2023 at 17:02

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.