Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit c64f547

Browse files
committed
Make LettucePoolingConnectionProvider implements SmartLifeCycle
See #3103 Signed-off-by: UHyeon Jeong <uh.jeong651@gmail.com> <authoremail@example.com>
1 parent adb23d6 commit c64f547

File tree

2 files changed

+69
-35
lines changed

2 files changed

+69
-35
lines changed

‎src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java‎

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
* @author Chris Bono
117117
* @author John Blum
118118
* @author Zhian Chen
119+
* @author UHyeon Jeong
119120
*/
120121
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
121122
InitializingBean, DisposableBean, SmartLifecycle {
@@ -979,6 +980,9 @@ public void stop() {
979980
dispose(reactiveConnectionProvider);
980981
reactiveConnectionProvider = null;
981982

983+
dispose(clusterCommandExecutor);
984+
clusterCommandExecutor = null;
985+
982986
if (client != null) {
983987
try {
984988
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
@@ -1012,20 +1016,7 @@ public void afterPropertiesSet() {
10121016

10131017
@Override
10141018
public void destroy() {
1015-
10161019
stop();
1017-
this.client = null;
1018-
1019-
ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
1020-
1021-
if (clusterCommandExecutor != null) {
1022-
try {
1023-
clusterCommandExecutor.destroy();
1024-
this.clusterCommandExecutor = null;
1025-
} catch (Exception ex) {
1026-
log.warn("Cannot properly close cluster command executor", ex);
1027-
}
1028-
}
10291020

10301021
this.state.set(State.DESTROYED);
10311022
}
@@ -1043,6 +1034,16 @@ private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
10431034
}
10441035
}
10451036

1037+
private void dispose(@Nullable ClusterCommandExecutor commandExecutor) {
1038+
if (commandExecutor != null) {
1039+
try {
1040+
commandExecutor.destroy();
1041+
} catch (Exception ex) {
1042+
log.warn("Cannot properly close cluster command executor", ex);
1043+
}
1044+
}
1045+
}
1046+
10461047
@Override
10471048
public RedisConnection getConnection() {
10481049

‎src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java‎

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import java.util.concurrent.CompletionStage;
3232
import java.util.concurrent.ConcurrentHashMap;
3333

34+
import java.util.concurrent.atomic.AtomicReference;
3435
import org.apache.commons.logging.Log;
3536
import org.apache.commons.logging.LogFactory;
3637
import org.apache.commons.pool2.impl.GenericObjectPool;
3738
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
3839
import org.springframework.beans.factory.DisposableBean;
40+
import org.springframework.context.SmartLifecycle;
3941
import org.springframework.data.redis.connection.PoolException;
4042
import org.springframework.util.Assert;
4143

@@ -56,13 +58,16 @@
5658
* @author Mark Paluch
5759
* @author Christoph Strobl
5860
* @author Asmir Mustafic
61+
* @author UHyeon Jeong
5962
* @since 2.0
6063
* @see #getConnection(Class)
6164
*/
62-
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
65+
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean,
66+
SmartLifecycle {
6367

6468
private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
6569

70+
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
6671
private final LettuceConnectionProvider connectionProvider;
6772
private final GenericObjectPoolConfig<StatefulConnection<?, ?>> poolConfig;
6873
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(
@@ -76,6 +81,10 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
7681
private final Map<Class<?>, AsyncPool<StatefulConnection<?, ?>>> asyncPools = new ConcurrentHashMap<>(32);
7782
private final BoundedPoolConfig asyncPoolConfig;
7883

84+
enum State {
85+
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
86+
}
87+
7988
LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider,
8089
LettucePoolingClientConfiguration clientConfiguration) {
8190

@@ -206,39 +215,51 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
206215

207216
@Override
208217
public void destroy() throws Exception {
218+
stop();
219+
state.set(State.DESTROYED);
220+
}
209221

210-
List<CompletableFuture<?>> futures = new ArrayList<>();
211-
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
212-
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
213-
}
214222

215-
if (!inProgressAsyncPoolRef.isEmpty()) {
223+
@Override
224+
public void start() {
225+
state.set(State.STARTED);
226+
}
216227

217-
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
218-
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
219-
}
228+
@Override
229+
public void stop() {
230+
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
231+
List<CompletableFuture<?>> futures = new ArrayList<>();
232+
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
233+
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
234+
}
220235

221-
if (!poolRef.isEmpty()) {
236+
if (!inProgressAsyncPoolRef.isEmpty()) {
222237

223-
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
224-
poolRef.clear();
225-
}
238+
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
239+
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
240+
}
226241

227-
if (!asyncPoolRef.isEmpty()) {
242+
if (!poolRef.isEmpty()) {
228243

229-
asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
230-
asyncPoolRef.clear();
231-
}
244+
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
245+
poolRef.clear();
246+
}
247+
248+
if (!asyncPoolRef.isEmpty()) {
249+
250+
asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
251+
asyncPoolRef.clear();
252+
}
232253

233-
pools.forEach((type, pool) -> pool.close());
254+
pools.forEach((type, pool) -> pool.close());
234255

235-
CompletableFuture
256+
CompletableFuture
236257
.allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors()))
237-
.toArray(CompletableFuture[]::new)) //
258+
.toArray(CompletableFuture[]::new)) //
238259
.thenCompose(ignored -> {
239260

240261
CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync)
241-
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
262+
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
242263

243264
return CompletableFuture.allOf(poolClose);
244265
}) //
@@ -248,6 +269,18 @@ public void destroy() throws Exception {
248269
}) //
249270
.join();
250271

251-
pools.clear();
272+
pools.clear();
273+
}
274+
state.set(State.STOPPED);
275+
}
276+
277+
@Override
278+
public boolean isRunning() {
279+
return State.STARTED.equals(this.state.get());
280+
}
281+
282+
@Override
283+
public boolean isAutoStartup() {
284+
return true;
252285
}
253286
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /