-
Notifications
You must be signed in to change notification settings - Fork 585
close method of the AMQConnection class #1083
-
Describe the bug
Regarding the close method of the AMQConnection class in RabbitMQ 5.9.x-stable version,
when the value of sync is false, the channels managed by the ChannelManager class are not destroyed.
When using RabbitMQ in nio mode, calling the close method will cause the event loop thread to block.
This is the method details:
public void close(int closeCode,
String closeMessage,
boolean initiatedByApplication,
Throwable cause,
int timeout,
boolean abort)
throws IOException
{
boolean sync = !(Thread.currentThread() == mainLoopThread);
try {
AMQP.Connection.Close reason =
new AMQP.Connection.Close.Builder()
.replyCode(closeCode)
.replyText(closeMessage)
.build();
final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
if(sync){
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
@Override
public AMQCommand transformReply(AMQCommand command) {
AMQConnection.this.finishShutdown(sse);
return command;
}};
_channel0.quiescingRpc(reason, k);
k.getReply(timeout);
} else {
_channel0.quiescingTransmit(reason);
}
} catch (TimeoutException tte) {
if (!abort) {
ShutdownSignalException sse = new ShutdownSignalException(true, true, null, this);
sse.initCause(cause);
throw sse;
}
} catch (ShutdownSignalException sse) {
if (!abort)
throw sse;
} catch (IOException ioe) {
if (!abort)
throw ioe;
} finally {
if(sync) {
_frameHandler.close();
}
}
}
Reproduction steps
1.In the "processControlCommand" method of the "AMQConnection" class, when the "AMQP.Connection.Blocked" command is received, the "handleBlocked" method of the "BlockedListener" class is called. If an exception occurs at this point, it will trigger a call to the "close" method.
This is the method details:
public boolean processControlCommand(Command c) throws IOException
{
// Similar trick to ChannelN.processAsync used here, except
// we're interested in whole-connection quiescing.
// See the detailed comments in ChannelN.processAsync.
Method method = c.getMethod();
if (isOpen()) {
if (method instanceof AMQP.Connection.Close) {
handleConnectionClose(c);
return true;
} else if (method instanceof AMQP.Connection.Blocked) {
AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
try {
for (BlockedListener l : this.blockedListeners) {
l.handleBlocked(blocked.getReason());
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else if (method instanceof AMQP.Connection.Unblocked) {
try {
for (BlockedListener l : this.blockedListeners) {
l.handleUnblocked();
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else {
return false;
}
} else {
if (method instanceof AMQP.Connection.Close) {
// Already shutting down, so just send back a CloseOk.
try {
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
} catch (IOException ignored) { } // ignore
return true;
} else if (method instanceof AMQP.Connection.CloseOk) {
// It's our final "RPC". Time to shut down.
_running = false;
// If Close was sent from within the MainLoop we
// will not have a continuation to return to, so
// we treat this as processed in that case.
return !_channel0.isOutstandingRpc();
} else { // Ignore all others.
return true;
}
}
}
Expected behavior
Non-blocking event loop thread.
Additional context
No response
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 1 comment 7 replies
-
The latest release series is 5.18.x, please upgrade.
Beta Was this translation helpful? Give feedback.
All reactions
-
I cannot provide individual projects. The issue you are experiencing seems to be intermittent. The problem occurs when receiving the AMQP.Connection.Blocked instruction from the server. This exception is triggered because you have customized the BlockedCallback and thrown an exception in the handling method. As a result, the close method is called.
Beta Was this translation helpful? Give feedback.
All reactions
-
I don't see how the thread loop can get blocked by just looking at the code, I'll need to reproduce the issue. In the meantime, could you:
- provide a thread dump if the problem occurs again
- avoid that the exception from your custom
BlockedListener
closes the connection: catch the exception in the listener or change the exception handler implementation
Beta Was this translation helpful? Give feedback.
All reactions
-
@tlglhlchat I pushed a fix, can you try the 5.19.0 snapshot?
Beta Was this translation helpful? Give feedback.
All reactions
-
I have set up a simple test project to reproduce the above issue.
Project link: https://github.com/tlglhlchat/testmq.git
Here are the steps to reproduce:
Adjust the configuration file of the RabbitMQ server program to set the memory and disk usage limits to the critical point.
Start the test project and send message push requests through a browser until the RabbitMQ server sends the AMQP.Connection.Blocked command(http://127.0.0.1:8080/push/click) .At this point, the test program will print the following log:
-------handleBlockedlow on memory-------
2023年08月03日 15:10:57.593 ERROR 46228 --- [ rabbitmq-nio] c.r.c.impl.ForgivingExceptionHandler : BlockedListener threw an exception for connection amqp://gogo@127.0.0.1:8001/
java.lang.RuntimeException: null
at example.mq.PushController3ドル.handleBlocked(PushController.java:51) ~[classes/:na]
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:881) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.AMQConnection1ドル.processAsync(AMQConnection.java:268) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:185) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:117) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:726) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.AMQConnection.handleReadFrame(AMQConnection.java:695) [amqp-client-5.18.0.jar:5.18.0]
at com.rabbitmq.client.impl.nio.NioLoop.run(NioLoop.java:160) [amqp-client-5.18.0.jar:5.18.0]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
After waiting for a few minutes, check the RabbitMQ management interface and you will find that the connection has been closed. However, you can still send message push requests through the test program without any exceptions being thrown.
Sending a notify request can wake up the blocked event loop thread.
Beta Was this translation helpful? Give feedback.
All reactions
-
@tlglhlchat thanks for the sample. Can you try the latest snapshot? (See my comment above)
Beta Was this translation helpful? Give feedback.