As an exercise in multithreaded programming, I am trying to implement a basic FIFO task queue. For this, I also implement simplified version of my lock_guard
, from STL. Here is the code:
#include <mutex>
#include <iostream>
#include <thread>
#include <chrono>
#include <optional>
namespace cho {
template <typename M>
struct lock_guard {
lock_guard() noexcept = default;
lock_guard(M& m) noexcept {
mtx = &m;
mtx->lock();
}
lock_guard(lock_guard const& rhs) = delete;
lock_guard(lock_guard && rhs) = delete;
lock_guard& operator=(lock_guard const& rhs) = delete;
lock_guard& operator=(lock_guard && rhs) = delete;
~lock_guard() {
mtx->unlock();
}
private:
M *mtx = nullptr;
};
template <typename T>
struct queue {
queue() noexcept = default;
~queue() {
queue_node *cur = head;
while(cur) {
queue_node *tmp = cur;
cur = cur->next;
delete tmp;
}
}
void push(T const& elem) {
queue_node *node = new queue_node{};
T *new_elem = new T(elem);
node->data = *new_elem;
node->next = nullptr;
if(head == tail)
one_elem_mtx.lock();
lock_guard<std::mutex> grd(push_mtx);
if(head) {
tail->next = node;
} else {
head = node;
}
tail = node;
one_elem_mtx.unlock();
}
auto pop() {
if(!head) {
return std::optional<T>{std::nullopt};
}
if(head == tail)
one_elem_mtx.lock();
lock_guard<std::mutex> grd(pop_mtx);
std::optional<T> ret{head->data};
queue_node *c = head;
head = head->next;
delete c;
one_elem_mtx.unlock();
return ret;
}
private:
struct queue_node {
T data = T{};
queue_node *next = nullptr;
};
queue_node *head = nullptr;
queue_node *tail = nullptr;
std::mutex push_mtx = std::mutex{};
std::mutex pop_mtx = std::mutex{};
std::mutex one_elem_mtx = std::mutex{};
};
}
struct log_task {
void operator()(char const* msg) {
std::cout << "Log: " << msg << "#" << log_id++<< "\n" << std::flush;
}
private:
static std::size_t log_id;
};
std::size_t log_task::log_id = 0;
int main() {
cho::queue<log_task> log_queue;
std::thread task_creator1([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(300ms);
log_queue.push(log_task());
}
});
std::thread task_creator2([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
log_queue.push(log_task());
}
});
std::thread task_consumer1([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(300ms);
std::optional<log_task> t = log_queue.pop();
if(t)
t.value()("log_message");
}
});
std::thread task_consumer2([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
std::optional<log_task> t = log_queue.pop();
if(t)
t.value()("log_message");
}
});
task_creator1.join();
task_creator2.join();
task_consumer1.join();
task_consumer2.join();
return 0;
}
lock_guard
and queue
implementation lives in my cho
namespace. For queue, I only provide basic push
and pop
, not a production-ready implementation by any means. Then, I implement a logger to be used as test functor in my task queue, called log_task
. My queue push and pop operations are guarded by mutexes to avoid messing queue pointers. I want to be able to push tasks to the queue from multiple threads and consume those tasks from multiple threads as well. For this reason, I hold two different mutexes: push_mtx
and pop_mtx
. On the other hand, I realized that if there is only one element, they can still mess the values since head
and tail
points to the same node. I added another one_elem_mtx
that needs to be locked just on this occasion.
What are code smells you can point? I am, I don't know why, not happy with using three mutexes and feel like there is something wrong there.
I played with delays in
sleep_for()
to try different values and my tests are working fine. But still, can you see any chance of deadlock or livelock?I've seen many implementations on other questions that use only one mutex. So when one thread pushes, popping threads need to wait as well and vice versa. This seemed like a performance penalty to me but I am not sure. Does my approach indicate a bad design and should I abandon concurrent push/pop?
1 Answer 1
lock_guard
The destructor never checks if
mtx == nullptr
, which will cause problems iflock_guard
got default-constructed.The
lock_guard(M&)
constructor cannot benoexcept
, aslock
on a standard library compatible mutex class is allowed to throw.However, it could be made conditionally
noexcept
in caseM::lock
isnoexcept
itselflock_guard(M& m) noexcept(noexcept(std::declval<M>().lock())) : mtx{&m} { mtx->lock(); }`
lock_guard(M&)
should be madeexplicit
in order to prevent unwanted conversions.
Any specific reasons why std::lock_guard
or std::unique_lock
couldn't be used?
queue
Memory leak:
push(const T&)
neverdelete
snew_elem
.Actually, why allocate
new_elem
at all? The whole first 4 lines could be simplified to:auto node = new queue_node{ elem, nullptr };
push(const T&)
andpop()
may cause undefined behavior by callingone_elem_mtx.unlock()
if it hasn't been locked before.pop()
doesn't updatetail
in case the last element got removed and lets it dangle instead.This will cause undefined behavior upon the next call to
push(const T&)
.This also means that
one_elem_mtx
will not be locked in calls topush(const T&)
orpop()
, sincehead != tail
while the queue is empty again.Race condition:
one_elem_mtx
needs to be acquired after the lock onpop_mtx
/push_mtx
.Reason: Assume we have a queue with two elements in it, and two threads A and B who both want to execute
pop
. Thread A executes until just afterif(head == tail)
(which of course right now evaluates tofalse
) and then gets interrupted by the OS. Thread B runspop()
to completion in the meantime, leaving the queue at one object. Now, assuming the missingtail
update mentioned above gets added, we have a potential data race on access totail
if another Thread C were to runpush(const T&)
.Thread starvation: Once the race condition is fixed, if
pop()
gets called much more frequently thanpush(const T&)
, threads waiting onpop()
might starve threads trying topush(const T&)
from getting access toone_elem_mtx
. Maybe makepop
a blocking operation (using astd::condition_variable
to notify if new elements got inserted)?For a production ready queue, you might want to think about adding a maximum capacity (so the queue doesn't grow too large if elements get added faster than they get removed).
An overload
push(T&&)
might be nice.
log_task
operator()(const char*)
is not thread safe: access to thestatic
log_id
isn't synchronized. Maybe add a lock, or makelog_id
astd::atomic<std::size_t>
?
main
Waiting for some hundred milliseconds on such a simple task that might otherwise take nanoseconds (or microseconds at most) to execute? This might make the whole "test" obsolete, especially since the chance of finding race conditions is pretty much zero.
t.value()
checks again whether thestd::optional
contains a value (and throws an exception if not). Since at that pointt
has already been checked for emptiness, why not use*t
to access the value directly?
General stuff
Any specific reason for not using
std::unique_ptr
?auto
could be used more often.There's no way to stop the program other than forcefully terminating the process. This might surprise some people trying it.
Q & A
See above.
I can't see any right now.
It is a performance penalty, yes, though that might not matter if
push
andpop
don't get called too frequently (e.g. because task creation and task execution take some time). It also makes reasoning about the whole set of operations a lot easier: If all access tohead
andtail
is secured behind one single mutex, there are some advantages:No race condition on either can occur.
No deadlock based upon locking order can occur (unless you make recursive calls).
No special care needed for edge cases (empty/full queue, or off by one element).
This makes it much easier to verify and trust that such a queue is working correctly.Now, if this performance penalty is too much, often mutexes get skipped entirely in favor of lock-free queues. These require some more expert knowledge, though.
-
\$\begingroup\$ The reason to avoid
std::lock_guard
andstd::unique_lock
was to implement those myself as an exercise. Thanks for the points mentioned. \$\endgroup\$meguli– meguli2018年08月15日 18:40:47 +00:00Commented Aug 15, 2018 at 18:40