3
\$\begingroup\$

I have some kind of high thoughput message handler receiving ordered messages. Now the task is - adding monitoring stuff in it in order to recognize wrong input messages, more exactly: it must notify about missed messages and just ignore the stale ones (with smaller order).

Due to high thoughput - it mustn't block.

So it must:

  1. not use locks
  2. find missing messages
  3. find wrong ordered messages

Let's have some long counter in message for ordering - here is my initial version:

public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
 private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
 private final AtomicLong messageCounter = new AtomicLong(0);
 @Override
 public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
 final long expected = messageCounter.getAndSet(message.getCounter()) + 1;
 if (expected == message.getCounter()) {
 processBusinessStuff(message);
 return true;
 } else if (expected > message.getCounter()) {
 /* wrong message, attempt to restore the sequence to prevent an error on next good message
 * TODO: fix ABA problem here
 */
 messageCounter.compareAndSet(message.getCounter(), expected - 1);
 log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
 message.getCounter(), expected));
 // some other notifying stuff...
 } else if (expected < message.getCounter()) {
 log.error(String.format("got forward message %s while expected %s, missed: %s",
 message.getCounter(), expected, message.getCounter() - expected));
 // some other notifying stuff...
 }
 return false;
 }
 private void processBusinessStuff(YetAnotherMessage message) {
 log.info(String.format("process message %s", message.getCounter()));
 // some business logic...
 }
}

(Here is a junit for that)

How is it? Are there any races/concurrency issues I missed?

P.S. Next step is fixing ABA problem there. I'm thinking about AtomicStampedReference[Long] for that.

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Mar 25, 2012 at 21:41
\$\endgroup\$
0

1 Answer 1

2
\$\begingroup\$

I think that there is a certain amount of fuzziness to the question but I'll make a couple of suggestions and perhaps they will help.

First of all, I would recommend giving serious consideration to using Akka actors (http://akka.io/). Actors are a very straightforward way to handle multi-threaded processing without locks.

Just to be clear, I think that you are saying that the program receives messages that have an order but does not necessarily receive them IN order and so it needs to restore the correct order before they can be processed.

If this is true, then I would recommend using TWO actors. The first actor would get the messages from wherever they are coming from and dealing with putting them into the correct order. I assume that it would be ok to cache out-of-order messages until it finds the next message to be processed, perhaps on a priority heap.

Once the first actor finds the next message to be processed, it would send it on to the second actor who would then handle the actual processing (by calling processBusinessStuff in your example).

answered Mar 26, 2012 at 5:08
\$\endgroup\$
4
  • \$\begingroup\$ I've heard about Akka - it's cool and fancy now, but for this task - seems overkill: I need just add some monitoring to existing handler. Anyway current solution with just 2 CAS instructions is simpler and in-place (no additional memory for cache or messsage queues) \$\endgroup\$ Commented Mar 26, 2012 at 10:50
  • \$\begingroup\$ Is the third paragraph of my answer a correct description of the nature of the problem? \$\endgroup\$ Commented Mar 26, 2012 at 12:48
  • \$\begingroup\$ not exactly: it must notify about missed messages and just ignore the stale ones (with smaller order) \$\endgroup\$ Commented Mar 26, 2012 at 13:03
  • \$\begingroup\$ You're right then. That's a much simpler problem. \$\endgroup\$ Commented Mar 26, 2012 at 13:07

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.