Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Process batch messages concurrently in one consumer #924

jalr4ever started this conversation in General
Discussion options

Hi, this's an SDK-using question. I am now using com.rabbitmq:amqp-client:5.14.2 in my project.

Which consume messages like this:

 public void listenAndCalculate() {
 Channel channel = getChannel();
 try {
 channel.basicConsume(PARAM_QUEUE, false, new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
 // do something...
 }
 } catch (IOException e) {
 log.error(e.getMessage(), e);
 }
 }

I found that the message would be only processed serially but not concurrently. I can find nothing to do but in spring, well in Spring-AMQP it provided a way like new SimpleRabbitListenerContainerFactory().setConcurrentConsumers(30);

But the problem is I don't want to use Spring things in my project, is there a way to process batch messages concurrently in one consumer?

You must be logged in to vote

Replies: 1 comment 3 replies

Comment options

Messages are delivered and dispatched to consumers one by one. What you then do with them — accumulate and batch process or do it one by one — is entirely up to you.

You must be logged in to vote
3 replies
Comment options

Some options are to register the same consumer logic in different channels and/or use some concurrent utilities like ExecutorService in the consumer itself.

Comment options

Some options are to register the same consumer logic in different channels and/or use some concurrent utilities like ExecutorService in the consumer itself.

Yeah, I am now using ExecutorService which makes blocking logic asynchronized, well because I am using the
same Connection and Channel in different threads inside ExecutorService, so I make a Lock for all threads when ack message.

But the problem is, should SDK do a better way for users to do so? Like Spring supports new SimpleRabbitListenerContainerFactory().setConcurrentConsumers(30);, which really easy to use.

Comment options

The Java client remains a low-level and generic library, we let advanced features like parallel processing to higher-level and sometimes more opinionated frameworks like Spring AMQP.

Generally speaking, we're open to providing small utilities that help developers' lives. The RPC classes are an example.

You can open a PR if you think of a parallel processing support class that other developers could reuse in their own applications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Converted from issue

This discussion was converted from issue #923 on January 06, 2023 10:42.

AltStyle によって変換されたページ (->オリジナル) /