Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 141ee1e

Browse files
rozzavbabanin
andauthored
Fix Netty reference leak. (#1762)
* Fix Netty reference leak. Fixes netty byte buffer releases in edge case scenarios: - Ensure async select server uses a callback if the cluster had been closed - Ensure that handleReadResponse checks to see if the cluster had been closed before retaining incoming buffers - Ensure closing the netty stream releases all references Test fixes - Ensure tests run using paranoid leak detection - Fail the testsuite if a leak is detected. - Fixed releasing buffers in the ByteBufferBsonOutputTest. JAVA-5901 Co-authored-by: Viacheslav Babanin <slav.babanin@mongodb.com>
1 parent a3c3857 commit 141ee1e

File tree

8 files changed

+44
-13
lines changed

8 files changed

+44
-13
lines changed

‎.evergreen/run-tests.sh‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,9 @@ echo "Running tests with Java ${JAVA_VERSION}"
132132
${MULTI_MONGOS_URI_SYSTEM_PROPERTY} ${API_VERSION} ${GRADLE_EXTRA_VARS} \
133133
${JAVA_SYSPROP_ASYNC_TRANSPORT} ${JAVA_SYSPROP_NETTY_SSL_PROVIDER} \
134134
-Dorg.mongodb.test.fle.on.demand.credential.test.failure.enabled=true \
135-
--stacktrace --info --continue ${TESTS}
135+
--stacktrace --info --continue ${TESTS} | tee -a logs.txt
136+
137+
if grep -q 'LEAK:' logs.txt ; then
138+
echo "Netty Leak detected, please inspect build log"
139+
exit 1
140+
fi

‎buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ tasks.withType<Test> {
3434

3535
useJUnitPlatform()
3636

37+
jvmArgs.add("-Dio.netty.leakDetection.level=paranoid")
38+
3739
// Pass any `org.mongodb.*` system settings
3840
systemProperties =
3941
System.getProperties()

‎driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
183183
@Override
184184
public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext,
185185
final SingleResultCallback<ServerTuple> callback) {
186-
isTrue("open", !isClosed());
186+
if (isClosed()) {
187+
callback.onResult(null, new MongoClientException("Cluster was closed during server selection."));
188+
}
187189

188190
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
189191
ServerSelectionRequest request = new ServerSelectionRequest(

‎driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@
7575
import static com.mongodb.assertions.Assertions.assertNull;
7676
import static com.mongodb.assertions.Assertions.isTrue;
7777
import static com.mongodb.assertions.Assertions.notNull;
78-
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
7978
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
79+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
8080
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
8181
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
8282
import static com.mongodb.internal.connection.CommandHelper.HELLO;
@@ -355,7 +355,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) {
355355
public void close() {
356356
// All but the first call is a no-op
357357
if (!isClosed.getAndSet(true) && (stream != null)) {
358-
stream.close();
358+
stream.close();
359359
}
360360
}
361361

‎driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public ByteBuf asReadOnly() {
256256

257257
@Override
258258
public ByteBuf duplicate() {
259-
return new NettyByteBuf(proxied.duplicate().retain(), isWriting);
259+
return new NettyByteBuf(proxied.retainedDuplicate(), isWriting);
260260
}
261261

262262
@Override

‎driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java‎

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,7 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
307307
composite.addComponent(next);
308308
iter.remove();
309309
} else {
310-
next.retain();
311-
composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer));
310+
composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer));
312311
}
313312
composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
314313
bytesNeeded -= bytesNeededFromCurrentBuffer;
@@ -349,7 +348,11 @@ private boolean hasBytesAvailable(final int numBytes) {
349348
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
350349
PendingReader localPendingReader = withLock(lock, () -> {
351350
if (buffer != null) {
352-
pendingInboundBuffers.add(buffer.retain());
351+
if (isClosed) {
352+
pendingException = new MongoSocketException("Received data after the stream was closed.", address);
353+
} else {
354+
pendingInboundBuffers.add(buffer.retain());
355+
}
353356
} else {
354357
pendingException = t;
355358
}
@@ -378,7 +381,9 @@ public void close() {
378381
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
379382
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
380383
iterator.remove();
381-
nextByteBuf.release();
384+
// Drops all retains to prevent silent leaks; assumes callers have already released
385+
// ByteBuffers returned by that NettyStream before calling close.
386+
nextByteBuf.release(nextByteBuf.refCnt());
382387
}
383388
});
384389
}

‎driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java‎

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider buffer
495495
@ParameterizedTest(name = "should get byte buffers as little endian. Parameters: useBranch={0}, bufferProvider={1}")
496496
@MethodSource("bufferProvidersWithBranches")
497497
void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) {
498+
List<ByteBuf> byteBuffers = new ArrayList<>();
498499
try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
499500
byte[] v = {1, 0, 0, 0};
500501
if (useBranch) {
@@ -504,7 +505,11 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferPro
504505
} else {
505506
out.writeBytes(v);
506507
}
507-
assertEquals(1, out.getByteBuffers().get(0).getInt());
508+
509+
byteBuffers = out.getByteBuffers();
510+
assertEquals(1, byteBuffers.get(0).getInt());
511+
} finally {
512+
byteBuffers.forEach(ByteBuf::release);
508513
}
509514
}
510515

@@ -1017,6 +1022,7 @@ void shouldWriteInt32WithinSpanningBuffers(
10171022
final int expectedLastBufferPosition,
10181023
final BufferProvider bufferProvider) {
10191024

1025+
List<ByteBuf> buffers = new ArrayList<>();
10201026
try (ByteBufferBsonOutput output =
10211027
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {
10221028

@@ -1028,12 +1034,14 @@ void shouldWriteInt32WithinSpanningBuffers(
10281034

10291035
//then
10301036
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1031-
List<ByteBuf> buffers = output.getByteBuffers();
1037+
buffers = output.getByteBuffers();
10321038
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10331039
assertBufferContents(expectedBuffers, buffers);
10341040

10351041
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
10361042
assertEquals(expectedOutputPosition, output.getPosition());
1043+
} finally {
1044+
buffers.forEach(ByteBuf::release);
10371045
}
10381046
}
10391047

@@ -1049,6 +1057,7 @@ void shouldWriteInt64WithinSpanningBuffers(
10491057
final int expectedLastBufferPosition,
10501058
final BufferProvider bufferProvider) {
10511059

1060+
List<ByteBuf> buffers = new ArrayList<>();
10521061
try (ByteBufferBsonOutput output =
10531062
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
10541063

@@ -1060,12 +1069,14 @@ void shouldWriteInt64WithinSpanningBuffers(
10601069

10611070
//then
10621071
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1063-
List<ByteBuf> buffers = output.getByteBuffers();
1072+
buffers = output.getByteBuffers();
10641073
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10651074
assertBufferContents(expectedBuffers, buffers);
10661075

10671076
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
10681077
assertEquals(expectedOutputPosition, output.getPosition());
1078+
} finally {
1079+
buffers.forEach(ByteBuf::release);
10691080
}
10701081
}
10711082

@@ -1081,6 +1092,7 @@ void shouldWriteDoubleWithinSpanningBuffers(
10811092
final int expectedLastBufferPosition,
10821093
final BufferProvider bufferProvider) {
10831094

1095+
List<ByteBuf> buffers = new ArrayList<>();
10841096
try (ByteBufferBsonOutput output =
10851097
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
10861098

@@ -1092,12 +1104,14 @@ void shouldWriteDoubleWithinSpanningBuffers(
10921104

10931105
//then
10941106
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
1095-
List<ByteBuf> buffers = output.getByteBuffers();
1107+
buffers = output.getByteBuffers();
10961108
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
10971109
assertBufferContents(expectedBuffers, buffers);
10981110

10991111
assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
11001112
assertEquals(expectedOutputPosition, output.getPosition());
1113+
} finally {
1114+
buffers.forEach(ByteBuf::release);
11011115
}
11021116
}
11031117

‎driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
4848
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
4949
import static com.mongodb.ReadPreference.primary;
50+
import static com.mongodb.assertions.Assertions.isTrue;
5051
import static com.mongodb.assertions.Assertions.notNull;
5152
import static com.mongodb.internal.TimeoutContext.createTimeoutContext;
5253
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
@@ -73,6 +74,7 @@ public class OperationExecutorImpl implements OperationExecutor {
7374
@Override
7475
public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ReadConcern readConcern,
7576
@Nullable final ClientSession session) {
77+
isTrue("open", !mongoClient.getCluster().isClosed());
7678
notNull("operation", operation);
7779
notNull("readPreference", readPreference);
7880
notNull("readConcern", readConcern);
@@ -109,6 +111,7 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
109111
@Override
110112
public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadConcern readConcern,
111113
@Nullable final ClientSession session) {
114+
isTrue("open", !mongoClient.getCluster().isClosed());
112115
notNull("operation", operation);
113116
notNull("readConcern", readConcern);
114117

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /