I have some kind of high thoughput message handler receiving ordered messages. Now the task is - adding monitoring stuff in it in order to recognize wrong input messages, more exactly: it must notify about missed messages and just ignore the stale ones (with smaller order).
Due to high thoughput - it mustn't block.
So it must:
- not use locks
- find missing messages
- find wrong ordered messages
Let's have some long counter in message for ordering - here is my initial version:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicLong messageCounter = new AtomicLong(0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final long expected = messageCounter.getAndSet(message.getCounter()) + 1;
if (expected == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expected > message.getCounter()) {
/* wrong message, attempt to restore the sequence to prevent an error on next good message
* TODO: fix ABA problem here
*/
messageCounter.compareAndSet(message.getCounter(), expected - 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
message.getCounter(), expected));
// some other notifying stuff...
} else if (expected < message.getCounter()) {
log.error(String.format("got forward message %s while expected %s, missed: %s",
message.getCounter(), expected, message.getCounter() - expected));
// some other notifying stuff...
}
return false;
}
private void processBusinessStuff(YetAnotherMessage message) {
log.info(String.format("process message %s", message.getCounter()));
// some business logic...
}
}
How is it? Are there any races/concurrency issues I missed?
P.S. Next step is fixing ABA problem there. I'm thinking about AtomicStampedReference[Long] for that.
1 Answer 1
I think that there is a certain amount of fuzziness to the question but I'll make a couple of suggestions and perhaps they will help.
First of all, I would recommend giving serious consideration to using Akka actors (http://akka.io/). Actors are a very straightforward way to handle multi-threaded processing without locks.
Just to be clear, I think that you are saying that the program receives messages that have an order but does not necessarily receive them IN order and so it needs to restore the correct order before they can be processed.
If this is true, then I would recommend using TWO actors. The first actor would get the messages from wherever they are coming from and dealing with putting them into the correct order. I assume that it would be ok to cache out-of-order messages until it finds the next message to be processed, perhaps on a priority heap.
Once the first actor finds the next message to be processed, it would send it on to the second actor who would then handle the actual processing (by calling processBusinessStuff in your example).
-
\$\begingroup\$ I've heard about Akka - it's cool and fancy now, but for this task - seems overkill: I need just add some monitoring to existing handler. Anyway current solution with just 2 CAS instructions is simpler and in-place (no additional memory for cache or messsage queues) \$\endgroup\$yetanothercoder– yetanothercoder2012年03月26日 10:50:47 +00:00Commented Mar 26, 2012 at 10:50
-
\$\begingroup\$ Is the third paragraph of my answer a correct description of the nature of the problem? \$\endgroup\$Donald.McLean– Donald.McLean2012年03月26日 12:48:30 +00:00Commented Mar 26, 2012 at 12:48
-
\$\begingroup\$ not exactly: it must notify about missed messages and just ignore the stale ones (with smaller order) \$\endgroup\$yetanothercoder– yetanothercoder2012年03月26日 13:03:07 +00:00Commented Mar 26, 2012 at 13:03
-
\$\begingroup\$ You're right then. That's a much simpler problem. \$\endgroup\$Donald.McLean– Donald.McLean2012年03月26日 13:07:44 +00:00Commented Mar 26, 2012 at 13:07