-
Notifications
You must be signed in to change notification settings - Fork 585
channel.addConfirmListener() , interface ackCallback Some callbacks are missing ? #757
-
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 2 comments 4 replies
-
This is my code, channel.addConfirmListener() ackCallback Some callbacks will be lost, The message is indeed sent to the rabbitmq server and can be consumed normally , But I sleep for 2ms after sending the message, and all ack callbacks can be received,
I don't know if this is an error in my code or a rabbitmq bug
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.log4j.Log4j2; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; @Log4j2 public class 异步确认发布{ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(""); connectionFactory.setPort(7005); connectionFactory.setUsername(""); connectionFactory.setPassword(""); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 开启确认发布 AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("hello", true, false, false, null); // 异步确认发布消息 回调 channel.addConfirmListener( (deliveryTag, multiple) -> { log.info("消息deliveryTag=>{}, send successful", deliveryTag); }, (deliveryTag, multiple) -> { log.info("消息deliveryTag=>{}, fail in send", deliveryTag); } ); for (int i = 0; i < 5; i++) { String message = "Hello World!!! " + i; channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8)); } } }
The console shows some callbacks missing
17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful 17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful
But I sleep for 2ms after sending the message, and all callbacks can be received
example code
for (int i = 0; i < 5; i++) { String message = "Hello World!!! " + i; channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8)); Thread.sleep(2); // I sleep for 2ms after sending the message, and all ack callbacks can be received }
console log
17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful 17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful 17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful 17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful 17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful
My RabbitMQ Server Version is 3.9.14 (No configuration has been modified. The default configuration is used), Erlang 24.3.2 ,
Maven Project dependency in
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.18.RELEASE</version> </dependency>
I tried to prevent the main thread from shutting down, but it doesn't seem to be the reason for the main thread to shut down, because the main thread won't shut down automatically once the connection is created
Beta Was this translation helpful? Give feedback.
All reactions
-
RabbitMQ can acknowledge multiple published messages at a time, in that case the multiple
flag on basic.ack
will be set to true
.
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1
-
Thank your answer very much from the bottom of my heart. I'm not very understand what you mean.
like this?
channel. waitForConfirms();
I want to use it
channel.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)
To confirm which messages have been received by RabbitMQ Server,
Beta Was this translation helpful? Give feedback.
All reactions
-
You assume that one published message will result in one confirmation sent by RabbitMQ. That's not necessarily the case. RabbitMQ can acknowledge two, three or N number of messages at once.
For example, for 10 published messages under load you can get only one acknowledgement back and it will have the multiple
property set to true
.
Beta Was this translation helpful? Give feedback.
All reactions
-
Thank you very much for your answer, which has solved my problem
Because in com.rabbitmq.client.ConfirmCallback No explanation was found for the multiple parameter, Mistakenly think channel.addConfirmListener Call back every time you send it,
I'm watching https://www.rabbitmq.com/tutorials/tutorial-seven-java.html The next explanation came to understand。
multiple: this is a boolean value. If false, only one message is confirmed/nack-ed, if true, all messages with a lower or equal sequence number are confirmed/nack-ed.
Console1:
11:43:54.969 [main] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - #method<confirm.select-ok>() 11:43:54.989 [main] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - #method<queue.declare-ok>(queue=hello, message-count=28, consumer-count=0) 11:43:54.999 [main] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - 耗时17 11:43:55.005 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - 消息deliveryTag=>2, multiple=>true, 内容=>Hello World!!! 1, 发送成功
The Console1 log display multiple=true
Console2:
11:50:45.940 [main] INFO me.yaohu.me.rabbitmq.consumer.发布确认.异步确认发布 - #method<confirm.select-ok>() 11:50:45.955 [main] INFO me.yaohu.me.rabbitmq.consumer.发布确认.异步确认发布 - #method<queue.declare-ok>(queue=hello, message-count=30, consumer-count=0) 11:50:45.992 [AMQP Connection 27.11.210.232:7005] INFO me.yaohu.me.rabbitmq.consumer.发布确认.异步确认发布 - 消息deliveryTag=>1, multiple=>false, 内容=>Hello World!!! 0, 发送成功 11:50:46.010 [main] INFO me.yaohu.me.rabbitmq.consumer.发布确认.异步确认发布 - 耗时64 11:50:46.018 [AMQP Connection 27.11.210.232:7005] INFO me.yaohu.me.rabbitmq.consumer.发布确认.异步确认发布 - 消息deliveryTag=>2, multiple=>false, 内容=>Hello World!!! 1, 发送成功
The Console2 log display multiple=false
multiple: this is a boolean value. If false, only one message is confirmed/nack-ed, if true, all messages with a lower or equal sequence number are confirmed/nack-ed.
Beta Was this translation helpful? Give feedback.
All reactions
-
Some frames will ack multiple messages at once (so, multile=true
), others just a single message (multiple-false
). I'm not sure if there are more questions at this point?
Beta Was this translation helpful? Give feedback.