Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit c88d6b1

Browse files
bandalgomsuonobc
authored andcommitted
Allow configuring lifecycle in listener containers.
This commit allows configuration of the `phase` and `autoStartup` lifecycle attributes on the `RedisMessageListenerContainer` and `DefaultStreamMessageListenerContainer`. Original Pull Request: #3224 Resolves: #3208 Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent 396711e commit c88d6b1

File tree

5 files changed

+184
-4
lines changed

5 files changed

+184
-4
lines changed

‎src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java‎

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* @author Mark Paluch
104104
* @author John Blum
105105
* @author Seongjun Lee
106+
* @author Su Ko
106107
* @see MessageListener
107108
* @see SubscriptionListener
108109
*/
@@ -168,6 +169,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
168169

169170
private @Nullable Subscriber subscriber;
170171

172+
private int phase = Integer.MAX_VALUE;
173+
private boolean autoStartup = true;
174+
171175
/**
172176
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default,
173177
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
@@ -618,6 +622,40 @@ public void removeMessageListener(MessageListener listener) {
618622
removeMessageListener(listener, Collections.emptySet());
619623
}
620624

625+
@Override
626+
public int getPhase() {
627+
return this.phase;
628+
}
629+
630+
/**
631+
* Specify the lifecycle phase for this container.
632+
* Lower values start earlier and stop later.
633+
* The default is {@code Integer.MAX_VALUE}.
634+
*
635+
* @see SmartLifecycle#getPhase()
636+
* @since 4.0
637+
*/
638+
public void setPhase(int phase) {
639+
this.phase = phase;
640+
}
641+
642+
@Override
643+
public boolean isAutoStartup() {
644+
return this.autoStartup;
645+
}
646+
647+
/**
648+
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
649+
* the containing ApplicationContext gets refreshed.
650+
* The default is {@code true}.
651+
*
652+
* @see SmartLifecycle#isAutoStartup()
653+
* @since 4.0
654+
*/
655+
public void setAutoStartup(boolean autoStartup) {
656+
this.autoStartup = autoStartup;
657+
}
658+
621659
private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
622660

623661
// stop the listener if currently running

‎src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java‎

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53+
* @author Su Ko
5354
* @since 2.2
5455
*/
5556
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
@@ -67,6 +68,9 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
6768

6869
private boolean running = false;
6970

71+
private int phase = Integer.MAX_VALUE;
72+
private boolean autoStartup = false;
73+
7074
/**
7175
* Create a new {@link DefaultStreamMessageListenerContainer}.
7276
*
@@ -90,6 +94,14 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
9094
} else {
9195
this.streamOperations = this.template.opsForStream();
9296
}
97+
98+
if(containerOptions.isAutoStartup().isPresent()){
99+
this.autoStartup = containerOptions.isAutoStartup().get();
100+
}
101+
102+
if(containerOptions.getPhase().isPresent()){
103+
this.phase = containerOptions.getPhase().getAsInt();
104+
}
93105
}
94106

95107
private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainerOptions<?, ?> options) {
@@ -123,9 +135,21 @@ private RedisTemplate<K, V> createRedisTemplate(RedisConnectionFactory connectio
123135

124136
@Override
125137
public boolean isAutoStartup() {
126-
return false;
138+
return this.autoStartup;
127139
}
128140

141+
/**
142+
* Configure if this Lifecycle connection factory should get started automatically by the container at the time that
143+
* the containing ApplicationContext gets refreshed.
144+
* The default is {@code false}.
145+
*
146+
* @see org.springframework.context.SmartLifecycle#isAutoStartup()
147+
* @since 4.0
148+
*/
149+
public void setAutoStartup(boolean autoStartup) {
150+
this.autoStartup = autoStartup;
151+
}
152+
129153
@Override
130154
public void stop(Runnable callback) {
131155

@@ -177,9 +201,21 @@ public boolean isRunning() {
177201

178202
@Override
179203
public int getPhase() {
180-
return Integer.MAX_VALUE;
204+
return this.phase;
181205
}
182206

207+
/**
208+
* Specify the lifecycle phase for this container.
209+
* Lower values start earlier and stop later.
210+
* The default is {@code Integer.MAX_VALUE}.
211+
*
212+
* @see org.springframework.context.SmartLifecycle#getPhase()
213+
* @since 4.0
214+
*/
215+
public void setPhase(int phase) {
216+
this.phase = phase;
217+
}
218+
183219
@Override
184220
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
185221

‎src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java‎

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.stream;
1717

1818
import java.time.Duration;
19+
import java.util.Optional;
1920
import java.util.OptionalInt;
2021
import java.util.concurrent.Executor;
2122
import java.util.function.Predicate;
@@ -107,6 +108,7 @@
107108
* @author Christoph Strobl
108109
* @author Christian Rest
109110
* @author DongCheol Kim
111+
* @author Su Ko
110112
* @param <K> Stream key and Stream field type.
111113
* @param <V> Stream value type.
112114
* @since 2.2
@@ -503,12 +505,14 @@ class StreamMessageListenerContainerOptions<K, V extends Record<K, ?>> {
503505
private final @Nullable HashMapper<Object, Object, Object> hashMapper;
504506
private final ErrorHandler errorHandler;
505507
private final Executor executor;
508+
private final @Nullable Integer phase;
509+
private final @Nullable Boolean autoStartup;
506510

507511
@SuppressWarnings({ "unchecked", "rawtypes" })
508512
private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable Integer batchSize,
509513
RedisSerializer<K> keySerializer, RedisSerializer<Object> hashKeySerializer,
510514
RedisSerializer<Object> hashValueSerializer, @Nullable Class<?> targetType,
511-
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor) {
515+
@Nullable HashMapper<V, ?, ?> hashMapper, ErrorHandler errorHandler, Executor executor,@NullableIntegerphase, @NullableBooleanautoStartup) {
512516
this.pollTimeout = pollTimeout;
513517
this.batchSize = batchSize;
514518
this.keySerializer = keySerializer;
@@ -518,6 +522,8 @@ private StreamMessageListenerContainerOptions(Duration pollTimeout, @Nullable In
518522
this.hashMapper = (HashMapper) hashMapper;
519523
this.errorHandler = errorHandler;
520524
this.executor = executor;
525+
this.phase = phase;
526+
this.autoStartup = autoStartup;
521527
}
522528

523529
/**
@@ -598,6 +604,21 @@ public Executor getExecutor() {
598604
return executor;
599605
}
600606

607+
/**
608+
* @return the phase.
609+
* @since 4.0
610+
*/
611+
public OptionalInt getPhase() {
612+
return phase != null ? OptionalInt.of(phase) : OptionalInt.empty();
613+
}
614+
615+
/**
616+
* @return the autoStartup.
617+
* @since 4.0
618+
*/
619+
public Optional<Boolean> isAutoStartup() {
620+
return autoStartup != null ? Optional.of(autoStartup) : Optional.empty();
621+
}
601622
}
602623

603624
/**
@@ -618,6 +639,8 @@ class StreamMessageListenerContainerOptionsBuilder<K, V extends Record<K, ?>> {
618639
private @Nullable Class<?> targetType;
619640
private ErrorHandler errorHandler = LoggingErrorHandler.INSTANCE;
620641
private Executor executor = new SimpleAsyncTaskExecutor();
642+
private @Nullable Integer phase;
643+
private @Nullable Boolean autoStartup;
621644

622645
@SuppressWarnings("NullAway")
623646
private StreamMessageListenerContainerOptionsBuilder() {}
@@ -679,6 +702,28 @@ public StreamMessageListenerContainerOptionsBuilder<K, V> errorHandler(ErrorHand
679702
return this;
680703
}
681704

705+
/**
706+
* Configure a phase for the {@link SmartLifecycle}
707+
*
708+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
709+
* @since 4.0
710+
*/
711+
public StreamMessageListenerContainerOptionsBuilder<K, V> phase(int phase) {
712+
this.phase = phase;
713+
return this;
714+
}
715+
716+
/**
717+
* Configure a autoStartup for the {@link SmartLifecycle}
718+
*
719+
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
720+
* @since 4.0
721+
*/
722+
public StreamMessageListenerContainerOptionsBuilder<K, V> autoStartup(boolean autoStartup) {
723+
this.autoStartup = autoStartup;
724+
return this;
725+
}
726+
682727
/**
683728
* Configure a key, hash key and hash value serializer.
684729
*
@@ -796,7 +841,7 @@ public StreamMessageListenerContainerOptions<K, V> build() {
796841
Assert.notNull(hashValueSerializer, "Hash Value Serializer must not be null");
797842

798843
return new StreamMessageListenerContainerOptions<>(pollTimeout, batchSize, keySerializer, hashKeySerializer,
799-
hashValueSerializer, targetType, hashMapper, errorHandler, executor);
844+
hashValueSerializer, targetType, hashMapper, errorHandler, executor,phase,autoStartup);
800845
}
801846

802847
}

‎src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java‎

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,27 @@ void shouldRemoveAllListenersWhenListenerIsNull() {
239239

240240
assertThatNoException().isThrownBy(() -> container.removeMessageListener(null, Collections.singletonList(topic)));
241241
}
242+
243+
@Test // GH-3208
244+
void defaultPhaseShouldBeMaxValue() {
245+
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
246+
}
247+
248+
@Test // GH-3208
249+
void shouldApplyConfiguredPhase() {
250+
container.setPhase(3208);
251+
assertThat(container.getPhase()).isEqualTo(3208);
252+
}
253+
254+
@Test // GH-3208
255+
void defaultAutoStartupShouldBeTrue() {
256+
assertThat(container.isAutoStartup()).isEqualTo(true);
257+
}
258+
259+
@Test // GH-3208
260+
void shouldApplyConfiguredAutoStartup() {
261+
container.setAutoStartup(false);
262+
assertThat(container.isAutoStartup()).isEqualTo(false);
263+
}
264+
242265
}

‎src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java‎

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,44 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
384384
cancelAwait(subscription);
385385
}
386386

387+
@Test // GH-3208
388+
void defaultPhaseShouldBeMaxValue() {
389+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
390+
.create(connectionFactory, containerOptions);
391+
392+
assertThat(container.getPhase()).isEqualTo(Integer.MAX_VALUE);
393+
}
394+
395+
@Test // GH-3208
396+
void shouldApplyConfiguredPhase() {
397+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
398+
.phase(3208)
399+
.build();
400+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
401+
.create(connectionFactory, options);
402+
403+
assertThat(container.getPhase()).isEqualTo(3208);
404+
}
405+
406+
@Test // GH-3208
407+
void defaultAutoStartupShouldBeFalse() {
408+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
409+
.create(connectionFactory, containerOptions);
410+
411+
assertThat(container.isAutoStartup()).isEqualTo(false);
412+
}
413+
414+
@Test // GH-3208
415+
void shouldApplyConfiguredAutoStartup() {
416+
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainerOptions.builder()
417+
.autoStartup(true)
418+
.build();
419+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
420+
.create(connectionFactory, options);
421+
422+
assertThat(container.isAutoStartup()).isEqualTo(true);
423+
}
424+
387425
private static void cancelAwait(Subscription subscription) {
388426

389427
subscription.cancel();

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /