Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 798b5a1

Browse files
Add IDLE argument to XPENDING command.
Original pull request #3116 Closes #2046 Signed-off-by: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com>
1 parent 06f3591 commit 798b5a1

15 files changed

+835
-31
lines changed

‎src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
* @author ihaohong
8383
* @author Dennis Neufeld
8484
* @author Shyngys Sapraliyev
85+
* @author Jeonggyu Choi
8586
*/
8687
@SuppressWarnings({ "ConstantConditions", "deprecation" })
8788
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
@@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
29682969
Converters.identityConverter());
29692970
}
29702971

2972+
@Override
2973+
public PendingMessages xPending(String key, String groupName, String consumerName,
2974+
org.springframework.data.domain.Range<String> range, Long count, Duration idle) {
2975+
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, idle),
2976+
Converters.identityConverter());
2977+
}
2978+
29712979
@Override
29722980
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
29732981
Long count) {
29742982
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
29752983
}
29762984

2985+
@Override
2986+
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
2987+
Long count, Duration idle) {
2988+
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idle),
2989+
Converters.identityConverter());
2990+
}
2991+
29772992
@Override
29782993
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
29792994
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());

‎src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java‎

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Dengliming
6161
* @author Mark John Moreno
6262
* @author jinkshower
63+
* @author Jeonggyu Choi
6364
* @since 2.2
6465
*/
6566
public interface ReactiveStreamCommands {
@@ -747,6 +748,25 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
747748
.map(CommandResponse::getOutput);
748749
}
749750

751+
/**
752+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
753+
* {@literal consumer group} and over a given {@link Duration} of idle time.
754+
*
755+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
756+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
757+
* @param range the range of messages ids to search within. Must not be {@literal null}.
758+
* @param count limit the number of results. Must not be {@literal null}.
759+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
760+
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
761+
* transaction.
762+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
763+
* @since 3.5
764+
*/
765+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count, Duration idle) {
766+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next()
767+
.map(CommandResponse::getOutput);
768+
}
769+
750770
/**
751771
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
752772
* {@link Consumer} within a {@literal consumer group}.
@@ -763,6 +783,23 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
763783
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
764784
}
765785

786+
/**
787+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
788+
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
789+
*
790+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
791+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
792+
* @param range the range of messages ids to search within. Must not be {@literal null}.
793+
* @param count limit the number of results. Must not be {@literal null}.
794+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
795+
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
796+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
797+
* @since 3.5
798+
*/
799+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count, Duration idle) {
800+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle);
801+
}
802+
766803
/**
767804
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
768805
* {@literal consumer} within a {@literal consumer group}.
@@ -783,6 +820,28 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
783820
.next().map(CommandResponse::getOutput);
784821
}
785822

823+
/**
824+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
825+
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
826+
*
827+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
828+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
829+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
830+
* @param range the range of messages ids to search within. Must not be {@literal null}.
831+
* @param count limit the number of results. Must not be {@literal null}.
832+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
833+
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
834+
* when used in pipeline / transaction.
835+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
836+
* @since 3.5
837+
*/
838+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
839+
Long count, Duration idle) {
840+
return xPending(
841+
Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).idle(idle)))
842+
.next().map(CommandResponse::getOutput);
843+
}
844+
786845
/**
787846
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
788847
* options}.
@@ -798,6 +857,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
798857
* Value Object holding parameters for obtaining pending messages.
799858
*
800859
* @author Christoph Strobl
860+
* @author Jeonggyu Choi
801861
* @since 2.3
802862
*/
803863
class PendingRecordsCommand extends KeyCommand {
@@ -806,16 +866,18 @@ class PendingRecordsCommand extends KeyCommand {
806866
private final @Nullable String consumerName;
807867
private final Range<?> range;
808868
private final @Nullable Long count;
869+
private final @Nullable Duration idle;
809870

810871
private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
811-
@Nullable Long count) {
872+
@Nullable Long count, @NullableDurationidle) {
812873

813874
super(key);
814875

815876
this.groupName = groupName;
816877
this.consumerName = consumerName;
817878
this.range = range;
818879
this.count = count;
880+
this.idle = idle;
819881
}
820882

821883
/**
@@ -826,7 +888,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
826888
* @return new instance of {@link PendingRecordsCommand}.
827889
*/
828890
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
829-
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
891+
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
830892
}
831893

832894
/**
@@ -841,7 +903,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
841903
Assert.notNull(range, "Range must not be null");
842904
Assert.isTrue(count > -1, "Count must not be negative");
843905

844-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
906+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null);
845907
}
846908

847909
/**
@@ -851,7 +913,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
851913
* @return new instance of {@link PendingRecordsCommand}.
852914
*/
853915
public PendingRecordsCommand consumer(String consumerName) {
854-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
916+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
917+
}
918+
919+
/**
920+
* Append given idle time.
921+
*
922+
* @param idle must not be {@literal null}.
923+
* @return new instance of {@link PendingRecordsCommand}.
924+
*/
925+
public PendingRecordsCommand idle(Duration idle) {
926+
927+
Assert.notNull(idle, "Idle must not be null");
928+
929+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
855930
}
856931

857932
public String getGroupName() {
@@ -881,6 +956,14 @@ public Long getCount() {
881956
return count;
882957
}
883958

959+
/**
960+
* @return can be {@literal null}.
961+
*/
962+
@Nullable
963+
public Duration getIdle() {
964+
return idle;
965+
}
966+
884967
/**
885968
* @return {@literal true} if a consumer name is present.
886969
*/
@@ -894,6 +977,13 @@ public boolean hasConsumer() {
894977
public boolean isLimited() {
895978
return count != null;
896979
}
980+
981+
/**
982+
* @return {@literal true} if idle is set.
983+
*/
984+
public boolean hasIdle() {
985+
return idle != null;
986+
}
897987
}
898988

899989
/**

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /