Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

close method of the AMQConnection class #1083

tlglhlchat started this conversation in General
Discussion options

Describe the bug

Regarding the close method of the AMQConnection class in RabbitMQ 5.9.x-stable version,
when the value of sync is false, the channels managed by the ChannelManager class are not destroyed.
When using RabbitMQ in nio mode, calling the close method will cause the event loop thread to block.
This is the method details:

public void close(int closeCode,
 String closeMessage,
 boolean initiatedByApplication,
 Throwable cause,
 int timeout,
 boolean abort)
 throws IOException
 {
 boolean sync = !(Thread.currentThread() == mainLoopThread);
 try {
 AMQP.Connection.Close reason =
 new AMQP.Connection.Close.Builder()
 .replyCode(closeCode)
 .replyText(closeMessage)
 .build();
 final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
 if(sync){
 BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
 @Override
 public AMQCommand transformReply(AMQCommand command) {
 AMQConnection.this.finishShutdown(sse);
 return command;
 }};
 _channel0.quiescingRpc(reason, k);
 k.getReply(timeout);
 } else {
 _channel0.quiescingTransmit(reason);
 }
 } catch (TimeoutException tte) {
 if (!abort) {
 ShutdownSignalException sse = new ShutdownSignalException(true, true, null, this);
 sse.initCause(cause);
 throw sse;
 }
 } catch (ShutdownSignalException sse) {
 if (!abort)
 throw sse;
 } catch (IOException ioe) {
 if (!abort)
 throw ioe;
 } finally {
 if(sync) {
 _frameHandler.close();
 }
 }
 }

Reproduction steps

1.In the "processControlCommand" method of the "AMQConnection" class, when the "AMQP.Connection.Blocked" command is received, the "handleBlocked" method of the "BlockedListener" class is called. If an exception occurs at this point, it will trigger a call to the "close" method.
This is the method details:

public boolean processControlCommand(Command c) throws IOException
 {
 // Similar trick to ChannelN.processAsync used here, except
 // we're interested in whole-connection quiescing.
 // See the detailed comments in ChannelN.processAsync.
 Method method = c.getMethod();
 if (isOpen()) {
 if (method instanceof AMQP.Connection.Close) {
 handleConnectionClose(c);
 return true;
 } else if (method instanceof AMQP.Connection.Blocked) {
 AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
 try {
 for (BlockedListener l : this.blockedListeners) {
 l.handleBlocked(blocked.getReason());
 }
 } catch (Throwable ex) {
 getExceptionHandler().handleBlockedListenerException(this, ex);
 }
 return true;
 } else if (method instanceof AMQP.Connection.Unblocked) {
 try {
 for (BlockedListener l : this.blockedListeners) {
 l.handleUnblocked();
 }
 } catch (Throwable ex) {
 getExceptionHandler().handleBlockedListenerException(this, ex);
 }
 return true;
 } else {
 return false;
 }
 } else {
 if (method instanceof AMQP.Connection.Close) {
 // Already shutting down, so just send back a CloseOk.
 try {
 _channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
 } catch (IOException ignored) { } // ignore
 return true;
 } else if (method instanceof AMQP.Connection.CloseOk) {
 // It's our final "RPC". Time to shut down.
 _running = false;
 // If Close was sent from within the MainLoop we
 // will not have a continuation to return to, so
 // we treat this as processed in that case.
 return !_channel0.isOutstandingRpc();
 } else { // Ignore all others.
 return true;
 }
 }
 }

Expected behavior

Non-blocking event loop thread.

Additional context

No response

You must be logged in to vote

Replies: 1 comment 7 replies

Comment options

The latest release series is 5.18.x, please upgrade.

You must be logged in to vote
7 replies
Comment options

I cannot provide individual projects. The issue you are experiencing seems to be intermittent. The problem occurs when receiving the AMQP.Connection.Blocked instruction from the server. This exception is triggered because you have customized the BlockedCallback and thrown an exception in the handling method. As a result, the close method is called.

Comment options

I don't see how the thread loop can get blocked by just looking at the code, I'll need to reproduce the issue. In the meantime, could you:

  • provide a thread dump if the problem occurs again
  • avoid that the exception from your custom BlockedListener closes the connection: catch the exception in the listener or change the exception handler implementation
Comment options

@tlglhlchat I pushed a fix, can you try the 5.19.0 snapshot?

Comment options

I have set up a simple test project to reproduce the above issue.
Project link: https://github.com/tlglhlchat/testmq.git
Here are the steps to reproduce:
Adjust the configuration file of the RabbitMQ server program to set the memory and disk usage limits to the critical point.
Start the test project and send message push requests through a browser until the RabbitMQ server sends the AMQP.Connection.Blocked command(http://127.0.0.1:8080/push/click) .At this point, the test program will print the following log:

-------handleBlockedlow on memory-------
2023年08月03日 15:10:57.593 ERROR 46228 --- [ rabbitmq-nio] c.r.c.impl.ForgivingExceptionHandler : BlockedListener threw an exception for connection amqp://gogo@127.0.0.1:8001/
java.lang.RuntimeException: null
	at example.mq.PushController3ドル.handleBlocked(PushController.java:51) ~[classes/:na]
	at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:881) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.AMQConnection1ドル.processAsync(AMQConnection.java:268) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:185) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:117) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:726) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.AMQConnection.handleReadFrame(AMQConnection.java:695) [amqp-client-5.18.0.jar:5.18.0]
	at com.rabbitmq.client.impl.nio.NioLoop.run(NioLoop.java:160) [amqp-client-5.18.0.jar:5.18.0]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]

After waiting for a few minutes, check the RabbitMQ management interface and you will find that the connection has been closed. However, you can still send message push requests through the test program without any exceptions being thrown.
Sending a notify request can wake up the blocked event loop thread.

Comment options

@tlglhlchat thanks for the sample. Can you try the latest snapshot? (See my comment above)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Converted from issue

This discussion was converted from issue #1082 on August 02, 2023 07:12.

AltStyle によって変換されたページ (->オリジナル) /