-
Notifications
You must be signed in to change notification settings - Fork 585
Multiple consumers on same channel; message consumption order? #732
-
Let's suppose I add consumer c1 processing queue q1 and consumer c2 processing q2 on the same channel (c1 declared before c2); the ack is manual and the prefetch count for c1 is pc1=5 while for c2 is pc2=8. Both queues are initially empty but at some same moment 2 parallel producers start sending each 800 messages at same constant sending rate, let's say a fast one.
I'm trying to find, for a java client, what threads are used to make things work.
q1) What thread is downloading the messages? How does it know how many messages to get? especially if many same queue consumers exist with different prefetch count (on different channels)
q2) How the channel is dispatching the available messages to consumers? e.g. 3 messages available for c1 and 5 for c2
Beta Was this translation helpful? Give feedback.
All reactions
This belongs to the Java client repository then.
You should not depend on any particular dispatch order of consumers, on a single channel or across multiple ones. It's like depending on a specific thread interleaving order.
All inbound protocol threads are read off the socket using the general socket API or NIO. Each carries a channel ID (number),
so connections know what channel to dispatch to.
Channels dispatch all inbound protocol methods to a JDK execution service (you can provide your own) in a way that guarantees that they are dispatched in the same order as received. Obviously actual processing of different frames can take a different amount of time and resources, so we cannot assu...
Replies: 2 comments 12 replies
-
This belongs to the Java client repository then.
You should not depend on any particular dispatch order of consumers, on a single channel or across multiple ones. It's like depending on a specific thread interleaving order.
All inbound protocol threads are read off the socket using the general socket API or NIO. Each carries a channel ID (number),
so connections know what channel to dispatch to.
Channels dispatch all inbound protocol methods to a JDK execution service (you can provide your own) in a way that guarantees that they are dispatched in the same order as received. Obviously actual processing of different frames can take a different amount of time and resources, so we cannot assume that they also complete in that order.
Consumers are looked up using delivery's consumer tag value and their handler methods are invoked in the aforementioned execution service context.
Prefetch is a channel-level feature first and foremost, consumer-level prefetch is supported but rarely used. Regardless, this value is only used by RabbitMQ nodes to decide whether a delivery
can be sent at the next "tick" (a queue run in RabbitMQ server codebase parlance). Prefetch does not change anything
about how client libraries work.
Beta Was this translation helpful? Give feedback.
All reactions
-
Well, it is; the fact that RMQ documentation bothers to specify it (though obvious) makes me think there's something I miss; and in fact it is something special about RMQ one could miss:
WorkPoolRunnable.run() = { ... for (Runnable runnable : block) { runnable.run(); } ... }
in certain conditions this code will run in the same thread multiple Consumer.handleDelivery
calls.
On the other hand Each Channel has its own dispatch thread part is still confusing; why own when in fact it's a thread allocated by ExecutionService
hence not owned by the Channel?
Beta Was this translation helpful? Give feedback.
All reactions
-
@adrhc you keep mentioning this "Each Channel has its own dispatch thread" line without mentioning where it's coming from. I could not find that line in this repo or the website one using GItHub search and ag
/grep
locally.
The Channels design doc is from 2010. Docs can easily get out of date in over a decade of changes. You are welcome to submit a PR with corrections.
Beta Was this translation helpful? Give feedback.
All reactions
-
Messages for a given channel are dispatched to the application code sequentially (one after the other), but there's no guarantee this happens in the same thread.
Beta Was this translation helpful? Give feedback.
All reactions
-
@michaelklishin check Receiving Messages by Subscription ("Push API") which is a section from Java Client API Guide; just search for "Each Channel has its own dispatch thread". Sorry, I usually read very carefully the documentation and I believe what is written in it hence Channel has its own dispatch thread is very intriguing for me.
Beta Was this translation helpful? Give feedback.
All reactions
-
OK, updated in rabbitmq/rabbitmq-website@05fc4cc
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1
-
In your example, the order of message consumption by two consumers with different prefetch values should be considered undefined.
It won't be easy to reason about. Both will get deliveries in natural batches (assuming they acknowledge at the same rate) and
one will have more messages in flight than the other at most points in time.
You have never stated the end goal but if it is to build a priority mechanism for consumers on top of prefetch, this is not going to
work well. Not only consumer priority and single active consumer are more obvious choices for most cases, I personally find
any design that depends on priority of consumers on a single queue to be very questionable. Priorities of consumers on different
queues aren't much better but at least I can recall some cases where it made some sense.
Beta Was this translation helpful? Give feedback.