Job Title: Sarcastic Architect
Hobbies: Thinking Aloud, Arguing with Managers, Annoying HRs,
Calling a Spade a Spade, Keeping Tongue in Cheek
[[
This is Chapter 16(d) from “beta” Volume V of the upcoming book “Development&Deployment of Multiplayer Online Games”, which is currently being beta-tested. Beta-testing is intended to improve the quality of the book, and provides free e-copy of the “release” book to those who help with improving; for further details see “Book Beta Testing“. All the content published during Beta Testing, is subject to change before the book is published.
To navigate through the book, you may want to use Development&Deployment of MOG: Table of Contents.]]
We’ve already discussed things related to sockets; now let’s discuss the stuff which is often needed (in particular, it is of Utmost Importance when implementing Reactors), but which is not that common to be universally available as a part of operating system.
I’m speaking about queues in the context of inter-thread communications (where “threads” are usual preemptive threads able to run on different cores, and not cooperative ones a.k.a. fibers). And not only just about "some" implementation of queue, but about queues which have certain properties desirable for our Reactors a.k.a. ad-hoc Finite State Machines a.k.a. Event-Driven Programs.
Simple MWSR Queue
What we usually need from our Queue, is an ability to push asynchronous messages/events there (usually from different threads), and to get them back (usually from one single thread) – in FIFO order, of course. Such Multiple-Writer-Single-Reader queues are known as MWSR queues. In our case, reading from an empty queue MUST block until something appears there; this is necessary to avoid polling. On the other hand, writing MAY block if the queue is full, though in practice this should happen Really Rarely.
Let’s consider the following simple implementation (with no blocking, as our queue cannot become "full"):
template <class Collection>
class MWSRQueue {
private:
std::mutex mx;
std::condition_variable waitrd;
Collection coll;
bool killflag = false;
public:
using T = Collection::value_type;
MWSRQueue() {
}
void push_back(T&& it) {
//as a rule of thumb, DO prefer move semantics for queues
// it reduces the number of potential allocations
// which happen under the lock(!), as such extra
// unnecessary allocations due to unnecessary copying
// can have Bad Impact on performance
// because of significantly increased mutex contention
{//creating scope for lock
std::unique_lock<std::mutex> lock(mx);
coll.push_back(std::move(it));
}//unlocking mx
waitrd.notify_one();
//Yep, notifying outside of lock is usually BETTER.
// Otherwise the other thread would be released
// but will immediately run into
// our own lock above, causing unnecessary
// (and Very Expensive) context switch
}
pair<bool,T> pop_front() {
//returns pair<true,popped_value>,
// or – if the queue is being killed - <false,T()>
std::unique_lock<std::mutex> lock(mx);
while(coll.size() == 0 && !killflag) {
waitrd.wait(lock);
}
if(killflag)
return pair<bool,T>(false,T());
//creates an unnecessary copy of T(),
// but usually we won’t care much at this point
assert(coll.size() > 0);
T ret = std::move(coll.front());
coll.pop_front();
lock.unlock();
return pair<bool,T>(true, std::move(ret));
}
void kill() {
{//creating scope for lock
std::unique_lock<std::mutex> lock(mx);
killflag = true;
}//unlocking mx
waitrd.notify_all();
}
};
[[TODO!:test!]]
This is a rather naïve implementation of MWSR queues, but – it will work for quite a while, and it uses only very standard C++11, so it will work pretty much everywhere these days. More importantly, it does implement exactly the API which you need: you can push the items back from other threads, you can read your items from a single thread, and you can request that the wait (if any) is aborted, so your thread can terminate (for example, if you want to terminate your app gracefully). Moreover, our queue provides the whole API which you’ll ever need from your queue; this IS important as it means that you can re-implement your queue later if necessary in a better-performing manner, and nobody will notice the difference.
[画像:Hare thumb up:]“A nice (though side) implementation detail is that our template class MWSRQueue can use any collection which implements usual-for-std-containers functions push_back(), pop_front(), and front().A nice (though side) implementation detail is that our template class MWSRQueue can use any collection which implements usual-for-std-containers functions push_back(), pop_front(), and front(). It means that you can use std::list<> or std::deque<> directly, or make your own class which satisfies this API (for example, you can make your own prioritized queue1). Oh BTW, and (by pure accident) it seems to be exception-safe too (even in a strong sense2).
OTOH, this naïve implementation has several significant drawbacks, which MAY come into play as soon as we become concerned about performance and reliability. Let’s see these drawbacks one by one.
1 Note that std::priority_queue<> as such does NOT guarantee the order in case of elements with equal priority, so to make a FIFO-queue-with-priority out of it, you’ll need to make another adapter which assigns number-of-item-since-very-beginning as one of the parameters (and then sort by tuple (priority, number_of_item_since_very_beginning) – and DON’T forget about potential wraparounds too! – that is, unless you’re using uint64_t as your number_of_item_since_very_beginning, when in most practical cases you can demonstrate that wraparound will never happen
2 assuming that your type T has a moving constructor with a no-throw guarantee, which it usually does
Fixed-Size Queues
As our class MWSRQueue above is organized, queue size may grow indefinitely. This might look as a Good Thing from theoretical point of view ("hey, we don’t put any limits on our Queue"), but in the real world it often causes severe issues 🙁 . For example, if for some reason one of your servers/Reactors starts to delay processing (or even hangs), such infinite-sized queues can easily eat up all the available RAM, causing swap or denial of allocations, and potentially affecting MUCH more players than it should.
Flow Control
Let’s consider what will happen in the case of one of the Reactors hanging/slowing-down if we limit the size of ALL our queues within the system.
If we limit sizes of ALL our queues, AND all our connections are TCP, then in case of severe overload the following scenario will unfold. First, one queue (the one close to the slow Reactor) will get full; in turn, queue being full will cause TCP thread which fills it, to block.3 Then, the TCP thread on the other side of TCP connection will find that it cannot push data into TCP, so it will block too. Then, the queue which feeds that TCP thread on pushing side, will get full. Then, the sending Reactor’s supposedly-non-blocking function sendMessage(), will be unable to push the message into the queue-which-just-became-full, so our supposedly-non-blocking Reactor will block.
[画像:Surprised hare:]“As we can see, when working with all flow-controlled transports, severe delays tend to propagate from the target to the source. As we can see, when working with all flow-controlled transports (TCP is flow-controlled, and fixed-size queue is flow-controlled too), severe delays tend to propagate from the target to the source. Whether it is good or not – depends on specifics of your system, though from what I’ve seen, in most cases such propagating delays are at least not worse than exhausting RAM which happens in case of infinite queues.
Also, it gives us back the control over what-we-want-to-do in case of such a problem. For example, to avoid one Reactor which processes messages from pretty much independent channels and feeding them to different Reactors, from blocking all the channels in case of one of the target Reactors being slow or hanged, we MAY be able to "teach" our single Reactor to postpone just messages from the affected channel, while working with the other channels as usual. Implementing it would require two things: (a) adding a trySendMessage() function, which tries to send message and returns “send wait handle” if the sending is unsuccessful, and (b) adding a list_of_wait_handles parameter to pop_front() function, with understanding that if some space becomes available in any of “send wait handle”s, pop_front() stops the wait and returns “send wait handle” to the caller (and then infrastructure code will need to send a message/call a callback or continuation from our Reactor).
3 in case when there is no TCP between Reactors so that Reactors are interacting directly, sending Reactor’s supposedly-non-blocking sendMessage() will block immediately, as described below
Dropping Packets
When dealing with messages coming over TCP or over internal communications, we’re usually relying on ALL the messages being delivered (and in order too); that’s why dropping messages is usually not an option on these queues.4
[画像:Hare pointing out:]“for UDP packets, there is always an option to drop them if the incoming queue is fullHowever, for UDP packets, there is always an option to drop them if the incoming queue is full;5 this is possible because any UDP packets can be dropped anyway, so that our upper-level protocols need to handle dropped packets regardless of us dropping some packets at application level. Moreover, we can implement a selective packet drop if we feel like it (for example, we can drop less important traffic in favor of more important one).
4 strictly speaking, if you DO implement reliable inter-Server communications as described in Chapter III, you MAY be able to force-terminate TCP connection, AND to drop all the messages from that connection from the queue too. Not sure whether it is ever useful though.
5 Or even almost-full, see, for example,
[RED] family of congestion avoidance algorithms
Full Queues are Abnormal. Size. Tracking
[画像:Judging hare:]“full queues SHOULD NOT happen during normal operationRegardless of the choice whether-to-block-or-to-drop outlines above, full queues SHOULD NOT happen during normal operation; they’re more like a way to handle scenarios when something has Already Went Wrong, and to recover from them while minimizing losses. That’s why it is Really Important to keep track of all the queue blocks (due to the queue being full), and to report it to your monitoring system; for this purpose, our queues should provide counters so that infrastructure code can read them and report to a system-wide monitor (see more on monitoring in Vol.3).
Now let’s discuss a question of maximum size of our fixed-size queues. On the one hand, we obviously do NOT want to have any kind of swapping because of the memory allocated to our fixed-size queues. On the other hand, we cannot have our queues limited to maximum size of 2 or 3. If our queue is too small, then we can easily run into scenarios of starvation, when our Reactor is effectively blocked by the flow control mechanisms from doing things (while there is work somewhere in the system, it cannot reach our Reactor). In the extreme cases (and ultra-small sizes like 2 or 3), it is possible even to run into deadlocks (!).6
My recommendation when it comes to maximum size of the queues, goes as follows:
- DO test your system with all queue sizes set to 1
- see whether you have any deadlocks
- if yes – DO understand whether you really need those dependencies which are causing deadlocks
- if yes – DO establish such limits on minimum queue sizes, which guarantee deadlock-free operation
- Start with maximum size of between 100 and 1000; most of the time, it should be large enough to stay away from blocks and also to avoid allocating too much memory for them.
- DO monitor maximum sizes in production (especially "queue is full" conditions), and act accordingly
6 There is a strong argument that deadlocks SHOULD NOT happen even with all queue sizes == 1. I would not say that this qualifies as a firm rule, however, I do agree that if using flow-controlled queues, you SHOULD test your system with all queue sizes set to 1, see below
Implementing Fixed-Size Queue with Flow Control
Now, after we’ve specified what we want, we’re ready to define our own Fixed-Size Queues. Let’s start with a Fixed-Size Queue with Flow Control:
template <class FixedSizeCollection>
class MWSRFixedSizeQueueWithFlowControl {
private:
std::mutex mx;
std::condition_variable waitrd;
std::condition_variable waitwr;
FixedSizeCollection coll;
bool killflag = false;
//stats:
int nfulls = 0;
size_t hwmsize = 0;//high watermark on queue size
public:
using T = FixedSizeCollection::value_type;
MWSRFixedSizeQueueWithFlowControl() {
}
void push_back(T&& it) {
//if the queue is full, BLOCKS until some space is freed
{//creating scope for lock
std::unique_lock<std::mutex> lock(mx);
while(coll.is_full() && !killflag) {
waitwr.wait(lock);
++nfulls;
//this will also count spurious wakeups,
// but they’re supposedly rare
}
if(killflag)
return;
assert(!coll.is_full());
coll.push_back(std::move(it));
size_t sz = coll.size();
hwmsize = max(hwmsize,sz);
}//unlocking mx
waitrd.notify_one();
}
pair<bool,T> pop_front() {
std::unique_lock<std::mutex> lock(mx);
while(coll.size() == 0 && !killflag) {
waitrd.wait(lock);
}
if(killflag)
return pair<bool,T>(false,T());
assert(coll.size() > 0);
T ret = std::move(coll.front());
coll.pop_front();
lock.unlock();
waitwr.notify_one();
return pair<bool,T>(true, std::move(ret));
}
void kill() {
{//creating scope for lock
std::unique_lock<std::mutex> lock(mx);
killflag = true;
}//unlocking mx
waitrd.notify_all();
waitwr.notify_all();
}
};
Implementing Fixed-Size Queue with a Drop Policy
And here goes a Fixed-Size Queue with a Drop Policy:
template <class FixedSizeCollection, class DropPolicy>
// DropPolicy should have function
// pushAndDropOne(T&& t, FixedSizeCollection& coll)
// it MAY either to skip t,
// OR to drop something from coll, while pushing t
class MWSRFixedSizeQueueWithDropPolicy {
private:
DropPolicy drop;
std::mutex mx;
std::condition_variable waitrd;
FixedSizeCollection coll;
bool killflag = false;
//stats:
int ndrops = 0;
size_t hwmsize = 0;//high watermark on queue size
public:
using T = FixedSizeCollection::value_type;
MWSRFixedSizeQueueWithDropPolicy(const DropPolicy& drop_)
: drop(drop_) {
}
void push_back(T&& it) {
//if the queue is full, calls drop.pushAndDropOne()
{//creating a scope for lock
std::unique_lock<std::mutex> lock(mx);
if(coll.is_full()) {//you MAY want to use
// unlikely() here
++ndrops;
drop.pushAndDropOne(it, coll);
return;
}
assert(!coll.is_full());
coll.push_back(std::move(it));
size_t sz = coll.size();
hwmsize = max(hwmsize,sz);
}//unlocking mx
waitrd.notify_one();
}
pair<bool,T> pop_front() {
std::unique_lock<std::mutex> lock(mx);
while(coll.size() == 0 && !killflag) {
waitrd.wait(lock);
}
if(killflag)
return pair<bool,T>(false,T());
assert(coll.size() > 0);
T ret = std::move(coll.front());
coll.pop_front();
lock.unlock();
return pair<bool,T>(true, std::move(ret));
}
void kill() {
{//creating scope for lock
std::unique_lock<std::mutex> lock(mx);
killflag = true;
}//unlocking mx
waitrd.notify_all();
}
};
Performance Issues
As we’re running our system, we MAY run into performance issues; sometimes, it is those queues which cause us trouble.
[画像:Surprised hare:]“With queues-implemented-over-mutexes like the ones we’ve written above, the most annoying thing performance-wise is that there is a chance that the OS’s scheduler can force the preemptive context switch right when the thread-being-preempted-is-owning-our-mutex.With queues-implemented-over-mutexes like the ones we’ve written above, the most annoying thing performance-wise is that there is a chance that the OS’s scheduler can force the preemptive context switch right when the thread-being-preempted-is-owning-our-mutex. This will cause quite a few context switches going back and forth. Such unnecessary context switches have a Big Fat impact on the performance 🙁 (as discussed in [TODO], context switch can cost up to a million CPU clocks7).
7 Most of the time, such Bad Cases won’t apply to the kind of context switches we’re discussing here, but several context switches each costing 10K CPU clocks, is already Pretty Bad
To deal with it, two approaches are possible. Approach #1 would be simply to
Reduce Time Under Lock
As we reduce the time spent under the mutex lock, chances of that unfortunate-context-switch can be reduced to almost-zero (if we’re doing a Really Good Job, time-under-lock can be as little as a hundred CPU clocks under the lock, so chances of being forced-switched there, become very minimal). And without the lock being occupied, the time to acquire/release the lock usually becomes just two atomic/LOCK/Interlocked operations (and you cannot really do better than that).
Removing Allocations from Under the Lock
A mathematician is asked "how to boil water?" His answer goes as follows:
Let’s consider two cases. In the first case, there is no water in the kettle.
Then, we need to light a fire, put some water into the kettle,
place the kettle over the fire, and wait for some time.
In the second case, there is water in the kettle.
Then we need to pour the water out, and the problem is reduced to the previous case.
— A mathematician who Prefers to stay Anonymous —
Now, let’s see what we can do to reduce time under the lock. If we take a closer look at our class MWSRQueue, we’ll realize that all the operations under the lock are very minimal, except for potential allocations (and/or O(N) operations to move things around).
The problem is that none of the existing std:: containers provides a guarantee that there are neither allocations/deallocations nor O(N) operations within their respective push_back() and pop_front() operations.
std::list<>::push_back()/pop_front()
Allocation/deallocation; some implementations MAY use cache or pool allocations, but such optimizations are implementation-specific 🙁
std::vector<>::erase(begin()) (as a replacement for pop_front())
O(N)
std::deque<>::push_back()/pop_front()
Allocation/deallocation; some implementations MAY use cache or pool allocations, but such optimizations are implementation-specific 🙁
I know of two ways how to deal with this problem. First, it is possible to use some kind of pool allocation and feed pool allocator to std::list<> or std::deque<> (effectively guaranteeing that all the items are always taken from the pool and nothing else). However, IMO this solution, while workable, looks too much as a way mathematician gets the kettle boiled (see epigraph to this subsection).
Instead, I suggest to do the following:
- If you need an infinite-size queue, you can use “intrusive lists” (allocating list elements outside the mutex lock, and reducing contention)
- If you need a fixed-size queue, then you can create your own Collection based on circular buffer along the following lines:
template<class T, size_t maxsz_bits>
class CircularBuffer {
static constexpr size_t bufsz = 1 << maxsz_bits;
static constexpr size_t maxsz = bufsz - 1;
//-1 to make sure that head==tail always means ‘empty’
static constexpr size_t mask = maxsz;
size_t head = 0;
size_t tail = 0;
alignas(T) uint8_t buffer[bufsz*sizeof(T)];
//Having buffer as T[bufsz] is possible
// IF we'll replace placement move constructors with move assignments
// AND drop explicit destructor calls
//However, it will require T to have a default constructor,
// so at the moment I prefer to deal with pure buffers
// and to have the only requirement that T is move-constructible
public:
size_t size() {
return head – tail +
(((size_t)(head>=tail)-(size_t)1) & bufsz);
//trickery to avoid pipeline stalls via arithmetic
//supposedly equivalent to:
//if(head >= tail)
// return head – tail;
//else
// return head + bufsz - tail;
}
void push_back(T&& t) {
assert(size() < maxsz);
new(tbuffer(head)) T(std::move(t));
head = ( head + 1 ) & mask;
}
T pop_front() {
assert(size() > 0);
T* ttail = tbuffer(tail);
T ret = std::move(*ttail);
ttail->~T();
tail = ( tail + 1 ) & mask;
return ret;
}
private:
T* tbuffer(size_t idx) {
return reinterpret_cast<T*>(buffer + (idx*sizeof(T)));
}
};
Removing locks completely
The second approach is MUCH more radical – it is the one to remove locks completely. And at the first glance, it seems that it is easy to find an appropriate “lockless queue” library. However, there is a caveat:
We do NOT really need “completely lockless queue”. What we need, is a “queue which is lockless until it becomes empty or full”
In other words, our (almost)-lockless queue still needs to lock (otherwise we’d need to poll it, which puts us in a predicament between sacrificing latency and burning CPU cycle in a MUCH worse manner than any losses from the very-infrequent-context-switches on barely-loaded-locks).
[画像:Hare thumb down:]“Unfortunately, I do NOT know of any readily-available library which supports such 'blocking-only-when-necessary' queuesUnfortunately, I do NOT know of any readily-available library which supports such “blocking-only-when-necessary” queues 🙁 . Writing such a thing yourself is certainly possible, but keep in mind that it is going to be a Really Major Effort even if you’re proficient in writing synchro primitives 🙁 (and Even More Major Effort to debug/test it and to prove its correctness8). Overall, if considering complexity of writing such a “blocking-only-when-necessary” queue from the point of view of exercises from Knuth’ “The Art of Computer Programming”, I would rate is around 40 🙁 (with “50” being a “non-proven-yet theorem”).
One library which I didn’t try myself, but which MAY help in converting lockless algorithms into lock-when-necessary ones, is [EventCount] from Facebook’s folly library. Let me know whether it worked for you 🙂 .
8 yes, for non-trivial primitives such proof is necessary, even if it is done by an exhaustive analysis of all the context switches in all the substantially different points – of course, not forgetting about those nasty ABA problems
Waiting for Other Stuff
More often than not, in addition to waiting for incoming events, we MAY want to wait for “something else”. Examples of these “something else” things range from “something coming in from socket” to “user moving mouse”.
Of course, we could dedicate a thread to wait for several sockets (user input, DNS response, whatever-else) and pushing the result to one of our MWSR Queues, but it means extra context switches, and therefore is not always optimal.
In such cases, we MAY want to use some OS-specific mechanism which allows to wait for several such things simultaneously. Examples of such mechanisms include:
- [画像:Hare with an idea:]“To deal with those very-occasional other events (which cannot be handled via select()/poll()/epoll()), a separate anonymous pipe (or equivalent) can be created, which can be listened by the very same select()-like function.(not exactly that OS-specific, but still different enough to be mention here): using select() (poll()/epoll()) as a queue. If MOST of your IO is sockets, and everything-else (like “a message coming in from another thread”) happens very occasionally, then it often makes sense to use select() etc. to deal with sockets – and with anything else too (with absolutely no mutexes etc. in sight). To deal with those very-occasional other events (which cannot be handled via select()/poll()/epoll() because they’re not file handles, or because they’re regular files(!)), a separate anonymous pipe (or equivalent) can be created, which can be listened by the very same select()-like function. Bingo! Most of the things are handled with select()/poll()/epoll()/… without any unnecessary context switches, and the very-occasional stuff is occasional enough to ignore the associated (usually not-too-bad) overhead of sending it over the pipe.
- On Linux, instead of pipe, you can (and IMHO SHOULD) use eventfd() instead of anonymous pipe, to get an improvement in performance. For thread-to-thread communications, it makes select()-etc.-based queues rather efficient.
- Note however, that this approach does NOT work too well performance-wise when most of your events CANNOT be handled by select()-like function directly (and need to be simulated over that pipe). While such a thing WILL work, the time spent on simulating events over pipes, can become substantial :-(.
- kqueue(). On BSD, kqueue() allows to wait not only on file handles, and provides more flexibility than epoll(), and occasionally allows to avoid an extra-thread-with-an-anonymous-pipe which would be necessary otherwise.
- Win32 WaitForMultipleObjects(). WaitForMultipleObjects() can wait both for sockets and for “events”. This can be used to build a queue which can handle both sockets etc. and other stuff – all without those unnecessary context switches.[[TODO:MsgWaitForMultipleObjects()]]
- Win32 thread queues. Another Win32-specific mechanism is related to thread queues (and GetMessage() function). These come handy when you need to handle both Windows messages and something-else (especially when you need to do it in a UI thread).
On libuv
In a sense, [libuv] is The King when we speak about 3rd-party event handling libraries. It can take pretty much anything and make it asynchronous. However, being that universal comes at a price: libuv’s performance, while “pretty good”, is not “the best one possible”. In particular, the trickery described above, can often outperform libuv.
[[TODO: IPC/shared-memory]]
[[To Be Continued…
[画像:Tired hare:]This concludes beta Chapter 16 from the upcoming book “Development and Deployment of Multiplayer Online Games (from social games to MMOFPS, with social games in between)”. Stay tuned for beta Chapter 19, where we’ll start discussing RNG.]]
Acknowledgement
Cartoons by Sergey GordeevIRL from Gordeev Animation Graphics, Prague.