I am trying to write a SynchronizedQueue
wrapper class to provide a simple synchronized interface to underlying standard std::queue
.
Could you have a review and let me know if any pitfalls and improvements you see ?
Too many mutexes/locks ? I wanted to to use condition variables to wait rather than mutexes for push & pop notifications.
Any insights is highly appreciated as I have rarely written multi-threaded programs.
#pragma once
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdint>
template<typename T>
class SynchronizedQueue
{
public:
SynchronizedQueue(size_t maxItems)
: synchronizedQueue_()
, queueMutex_()
, pushMutex_()
, popMutex_()
, pushCV_()
, popCV_()
, MAX_ITEMS(maxItems)
{
}
void push(const T& item)
{
std::unique_lock<std::mutex> scopedLock(pushMutex_);
while (isFull())
{
std::unique_lock<std::mutex> pushLock(queueMutex_);
popCV_.wait(pushLock);
}
{
std::unique_lock<std::mutex> scopedLock(queueMutex_);
synchronizedQueue_.push(item);
pushCV_.notify_one();
}
}
T pop()
{
std::unique_lock<std::mutex> scopedLock(popMutex_);
while (isEmpty())
{
std::unique_lock<std::mutex> scopedLock(queueMutex_);
pushCV_.wait(scopedLock);
}
T t;
{
std::unique_lock<std::mutex> scopedLock(queueMutex_);
t = synchronizedQueue_.front();
synchronizedQueue_.pop();
popCV_.notify_one();
}
return t;
}
private:
bool isEmpty()
{
std::unique_lock<std::mutex> scopedLock(queueMutex_);
return synchronizedQueue_.empty();
}
bool isFull()
{
std::unique_lock<std::mutex> scopedLock(queueMutex_);
return synchronizedQueue_.size() == MAX_ITEMS;
}
std::queue<T> synchronizedQueue_;
std::mutex queueMutex_, pushMutex_, popMutex_;
std::condition_variable pushCV_, popCV_;
size_t MAX_ITEMS;
};
1 Answer 1
Synchronization
pushMutex_
andpopMutex_
aren't needed. Even worse, they prevent other threads from waiting on a condition variable! Just lockqueueMutex_
instead (that also prevents having to re-lockqueueMutex_
).Assume that the queue is full, and thread 1 and 2 both want to push an item into the queue. Thread 1 was faster and got the lock on
pushMutex_
, so it proceeds to wait onpushCV_
. Thread 2 in the meanwhile spins trying to lockpushMutex_
- which is still held by thread 1!Avoid using
std::condition_variable::notifiy_one()
while still holding the corresponding lock! The notified thread will wake up, and the first action it has to do is reacquiring the lock that is still hold by the caller ofnotify_one()
, so it will instantly block again (this time on acquiring the lock though, notwait()
).
Implementation
The outer
scopedLock
inpush()
andpop()
get shadowed by the innerscopedLock
. This can lead to extra confusion in an already complex code!Since
MAX_SIZE
isn't supposed to change once the object is initialized, consider marking it asconst
. (It can still be set with the initialization list of the constructor, but not changed afterwards.)
Further considerations
- Maybe add fast returning
bool try_push(const T&)
andbool try_pop(T&)
member functions, so users of the queue don't have to wait if they don't need to?
-
\$\begingroup\$ I agree with all the points, but need some clarification w.r.t to point 1 (Synchronization). If I understand, you're suggesting to remove the first lock on pushMutex_ & keep everything unchanged. Consider there is only 1 empty slot in queue and, t1 and t2 want to insert the items. Say t1 checks & finds it's not full & before t1 inserts t2 also checks & finds it's not full. Say, t1 pushes an item, now t2 waits for the lock on queueMutex_. After the scoped lock on queue mutex is released for t1, t2 also inserts the item adding > MAX_ITEMS, which violates the contract, isn't it? \$\endgroup\$Uchia Itachi– Uchia Itachi2017年11月17日 15:51:24 +00:00Commented Nov 17, 2017 at 15:51
-
\$\begingroup\$ @UchiaItachi: No, I suggest locking
queueMutex_
instead ofpushMutex_
orpopMutex_
(and removing the inner locks forqueueMutex_
). This would require some changes soisEmpty()
/isFull()
still work (inline them, or remove the locks inside them, or use astd::recursive_,mutex
, ...). \$\endgroup\$hoffmale– hoffmale2017年11月17日 16:06:53 +00:00Commented Nov 17, 2017 at 16:06 -
\$\begingroup\$ Yeah, thought so that the empty() and full() would've to be changed if we removed the lock before cv. Thanks! \$\endgroup\$Uchia Itachi– Uchia Itachi2017年11月17日 16:14:10 +00:00Commented Nov 17, 2017 at 16:14
-
\$\begingroup\$ @LokiAstari: I'm not sure what you are trying to tell me. Yeah, I "misattributed" the reacquiring of the lock to the waiting thread instead of the
condition_variable
(it's thecondition_variable
s job to re-acquire the lock before continuing the waiting thread). Either way, the thread can only continue after the lock has been reacquired. Is that what you were trying to convey? \$\endgroup\$hoffmale– hoffmale2017年11月18日 01:05:48 +00:00Commented Nov 18, 2017 at 1:05 -
\$\begingroup\$ @LokiAstari: Ok, my choice of words was poor (I didn't intend to mean
wait()
withwait again
, just that it will instantly block again). I've amended my answer for that. That said, I don't get where you took theincrements a count
from, the standard has nothing to say on that. (Maybe you meant something implementation specific?) \$\endgroup\$hoffmale– hoffmale2017年11月18日 04:39:32 +00:00Commented Nov 18, 2017 at 4:39
Explore related questions
See similar questions with these tags.