As an exercise in multithreaded programming, I am trying to implement a basic FIFO task queue. For this, I also implement simplified version of my lock_guard, from STL. Here is the code:
#include <mutex>
#include <iostream>
#include <thread>
#include <chrono>
#include <optional>
namespace cho {
template <typename M>
struct lock_guard {
lock_guard() noexcept = default;
lock_guard(M& m) noexcept {
mtx = &m;
mtx->lock();
}
lock_guard(lock_guard const& rhs) = delete;
lock_guard(lock_guard && rhs) = delete;
lock_guard& operator=(lock_guard const& rhs) = delete;
lock_guard& operator=(lock_guard && rhs) = delete;
~lock_guard() {
mtx->unlock();
}
private:
M *mtx = nullptr;
};
template <typename T>
struct queue {
queue() noexcept = default;
~queue() {
queue_node *cur = head;
while(cur) {
queue_node *tmp = cur;
cur = cur->next;
delete tmp;
}
}
void push(T const& elem) {
queue_node *node = new queue_node{};
T *new_elem = new T(elem);
node->data = *new_elem;
node->next = nullptr;
if(head == tail)
one_elem_mtx.lock();
lock_guard<std::mutex> grd(push_mtx);
if(head) {
tail->next = node;
} else {
head = node;
}
tail = node;
one_elem_mtx.unlock();
}
auto pop() {
if(!head) {
return std::optional<T>{std::nullopt};
}
if(head == tail)
one_elem_mtx.lock();
lock_guard<std::mutex> grd(pop_mtx);
std::optional<T> ret{head->data};
queue_node *c = head;
head = head->next;
delete c;
one_elem_mtx.unlock();
return ret;
}
private:
struct queue_node {
T data = T{};
queue_node *next = nullptr;
};
queue_node *head = nullptr;
queue_node *tail = nullptr;
std::mutex push_mtx = std::mutex{};
std::mutex pop_mtx = std::mutex{};
std::mutex one_elem_mtx = std::mutex{};
};
}
struct log_task {
void operator()(char const* msg) {
std::cout << "Log: " << msg << "#" << log_id++<< "\n" << std::flush;
}
private:
static std::size_t log_id;
};
std::size_t log_task::log_id = 0;
int main() {
cho::queue<log_task> log_queue;
std::thread task_creator1([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(300ms);
log_queue.push(log_task());
}
});
std::thread task_creator2([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
log_queue.push(log_task());
}
});
std::thread task_consumer1([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(300ms);
std::optional<log_task> t = log_queue.pop();
if(t)
t.value()("log_message");
}
});
std::thread task_consumer2([&]() {
while(true) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
std::optional<log_task> t = log_queue.pop();
if(t)
t.value()("log_message");
}
});
task_creator1.join();
task_creator2.join();
task_consumer1.join();
task_consumer2.join();
return 0;
}
lock_guard and queue implementation lives in my cho namespace. For queue, I only provide basic push and pop, not a production-ready implementation by any means. Then, I implement a logger to be used as test functor in my task queue, called log_task. My queue push and pop operations are guarded by mutexes to avoid messing queue pointers. I want to be able to push tasks to the queue from multiple threads and consume those tasks from multiple threads as well. For this reason, I hold two different mutexes: push_mtx and pop_mtx. On the other hand, I realized that if there is only one element, they can still mess the values since head and tail points to the same node. I added another one_elem_mtx that needs to be locked just on this occasion.
What are code smells you can point? I am, I don't know why, not happy with using three mutexes and feel like there is something wrong there.
I played with delays in
sleep_for()to try different values and my tests are working fine. But still, can you see any chance of deadlock or livelock?I've seen many implementations on other questions that use only one mutex. So when one thread pushes, popping threads need to wait as well and vice versa. This seemed like a performance penalty to me but I am not sure. Does my approach indicate a bad design and should I abandon concurrent push/pop?
1 Answer 1
lock_guard
The destructor never checks if
mtx == nullptr, which will cause problems iflock_guardgot default-constructed.The
lock_guard(M&)constructor cannot benoexcept, aslockon a standard library compatible mutex class is allowed to throw.However, it could be made conditionally
noexceptin caseM::lockisnoexceptitselflock_guard(M& m) noexcept(noexcept(std::declval<M>().lock())) : mtx{&m} { mtx->lock(); }`lock_guard(M&)should be madeexplicitin order to prevent unwanted conversions.
Any specific reasons why std::lock_guard or std::unique_lock couldn't be used?
queue
Memory leak:
push(const T&)neverdeletesnew_elem.Actually, why allocate
new_elemat all? The whole first 4 lines could be simplified to:auto node = new queue_node{ elem, nullptr };push(const T&)andpop()may cause undefined behavior by callingone_elem_mtx.unlock()if it hasn't been locked before.pop()doesn't updatetailin case the last element got removed and lets it dangle instead.This will cause undefined behavior upon the next call to
push(const T&).This also means that
one_elem_mtxwill not be locked in calls topush(const T&)orpop(), sincehead != tailwhile the queue is empty again.Race condition:
one_elem_mtxneeds to be acquired after the lock onpop_mtx/push_mtx.Reason: Assume we have a queue with two elements in it, and two threads A and B who both want to execute
pop. Thread A executes until just afterif(head == tail)(which of course right now evaluates tofalse) and then gets interrupted by the OS. Thread B runspop()to completion in the meantime, leaving the queue at one object. Now, assuming the missingtailupdate mentioned above gets added, we have a potential data race on access totailif another Thread C were to runpush(const T&).Thread starvation: Once the race condition is fixed, if
pop()gets called much more frequently thanpush(const T&), threads waiting onpop()might starve threads trying topush(const T&)from getting access toone_elem_mtx. Maybe makepopa blocking operation (using astd::condition_variableto notify if new elements got inserted)?For a production ready queue, you might want to think about adding a maximum capacity (so the queue doesn't grow too large if elements get added faster than they get removed).
An overload
push(T&&)might be nice.
log_task
operator()(const char*)is not thread safe: access to thestaticlog_idisn't synchronized. Maybe add a lock, or makelog_idastd::atomic<std::size_t>?
main
Waiting for some hundred milliseconds on such a simple task that might otherwise take nanoseconds (or microseconds at most) to execute? This might make the whole "test" obsolete, especially since the chance of finding race conditions is pretty much zero.
t.value()checks again whether thestd::optionalcontains a value (and throws an exception if not). Since at that pointthas already been checked for emptiness, why not use*tto access the value directly?
General stuff
Any specific reason for not using
std::unique_ptr?autocould be used more often.There's no way to stop the program other than forcefully terminating the process. This might surprise some people trying it.
Q & A
See above.
I can't see any right now.
It is a performance penalty, yes, though that might not matter if
pushandpopdon't get called too frequently (e.g. because task creation and task execution take some time). It also makes reasoning about the whole set of operations a lot easier: If all access toheadandtailis secured behind one single mutex, there are some advantages:No race condition on either can occur.
No deadlock based upon locking order can occur (unless you make recursive calls).
No special care needed for edge cases (empty/full queue, or off by one element).
This makes it much easier to verify and trust that such a queue is working correctly.Now, if this performance penalty is too much, often mutexes get skipped entirely in favor of lock-free queues. These require some more expert knowledge, though.
-
\$\begingroup\$ The reason to avoid
std::lock_guardandstd::unique_lockwas to implement those myself as an exercise. Thanks for the points mentioned. \$\endgroup\$meguli– meguli2018年08月15日 18:40:47 +00:00Commented Aug 15, 2018 at 18:40