Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 5b50789

Browse files
Add overloads for XAddOptions in StreamOperations.
Closes #2915
1 parent 8b5f29e commit 5b50789

File tree

7 files changed

+311
-1
lines changed

7 files changed

+311
-1
lines changed

‎src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java‎

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3333
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
3434
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
35+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3536
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
3637
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3738
import org.springframework.data.redis.connection.stream.Consumer;
@@ -58,6 +59,7 @@
5859
* @author Tugdual Grall
5960
* @author Dengliming
6061
* @author Mark John Moreno
62+
* @author jinkshower
6163
* @since 2.2
6264
*/
6365
public interface ReactiveStreamCommands {
@@ -394,11 +396,40 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
394396
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
395397
}
396398

399+
/**
400+
* Add stream record with the specified options.
401+
*
402+
* @param record must not be {@literal null}.
403+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
404+
* @return {@link Mono} the {@link RecordId id}.
405+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
406+
* @since 3.3
407+
*/
408+
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
409+
410+
Assert.notNull(record, "Record must not be null");
411+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
412+
413+
AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
414+
.approximateTrimming(xAddOptions.isApproximateTrimming())
415+
.makeNoStream(xAddOptions.isNoMkStream());
416+
417+
if (xAddOptions.hasMaxlen()) {
418+
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
419+
}
420+
421+
if (xAddOptions.hasMinId()) {
422+
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
423+
}
424+
425+
return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
426+
}
427+
397428
/**
398429
* Add stream record with given {@literal body} to {@literal key}.
399430
*
400431
* @param commands must not be {@literal null}.
401-
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
432+
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
402433
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
403434
*/
404435
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);

‎src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.data.redis.connection.Limit;
3434
import org.springframework.data.redis.connection.ReactiveStreamCommands;
3535
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
36+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3637
import org.springframework.data.redis.connection.convert.Converters;
3738
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3839
import org.springframework.data.redis.connection.stream.Consumer;
@@ -60,6 +61,7 @@
6061
* @author Christoph Strobl
6162
* @author Marcin Zielinski
6263
* @author John Blum
64+
* @author jinkshower
6365
* @since 2.2
6466
*/
6567
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -146,6 +148,18 @@ public Mono<RecordId> add(Record<K, ?> record) {
146148
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
147149
}
148150

151+
@Override
152+
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {
153+
154+
Assert.notNull(record.getStream(), "Key must not be null");
155+
Assert.notNull(record.getValue(), "Body must not be null");
156+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
157+
158+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
159+
160+
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
161+
}
162+
149163
@Override
150164
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
151165

‎src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java‎

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisConnection;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3031
import org.springframework.data.redis.connection.stream.ByteRecord;
3132
import org.springframework.data.redis.connection.stream.Consumer;
@@ -54,6 +55,7 @@
5455
* @author Christoph Strobl
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -136,6 +138,21 @@ public RecordId add(Record<K, ?> record) {
136138
return execute(connection -> connection.xAdd(binaryRecord));
137139
}
138140

141+
@Nullable
142+
@Override
143+
@SuppressWarnings("unchecked")
144+
public RecordId add(Record<K , ?> record, XAddOptions options) {
145+
146+
Assert.notNull(record, "Record must not be null");
147+
Assert.notNull(options, "XAddOptions must not be null");
148+
149+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
150+
151+
ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
152+
153+
return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
154+
}
155+
139156
@Override
140157
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
141158

‎src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java‎

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3031
import org.springframework.data.redis.connection.stream.Consumer;
3132
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -54,6 +55,7 @@
5455
* @author Dengliming
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -94,6 +96,48 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
9496
return acknowledge(record.getRequiredStream(), group, record.getId());
9597
}
9698

99+
/**
100+
* Append a record to the stream {@code key} with the specified options.
101+
*
102+
* @param key the stream key.
103+
* @param content record content as Map.
104+
* @param xAddOptions parameters for the {@literal XADD} call.
105+
* @return the {@link Mono} emitting the {@link RecordId}.
106+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
107+
* @since 3.3
108+
*/
109+
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
110+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
111+
}
112+
113+
/**
114+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
115+
*
116+
* @param record the record to append.
117+
* @param xAddOptions parameters for the {@literal XADD} call.
118+
* @return the {@link Mono} emitting the {@link RecordId}.
119+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
120+
* @since 3.3
121+
*/
122+
@SuppressWarnings("unchecked")
123+
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
124+
return add((Record) record, xAddOptions);
125+
}
126+
127+
/**
128+
* Append the record, backed by the given value, to the stream with the specified options.
129+
* The value will be hashed and serialized.
130+
*
131+
* @param record must not be {@literal null}.
132+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
133+
* @return the {@link Mono} emitting the {@link RecordId}.
134+
* @see MapRecord
135+
* @see ObjectRecord
136+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
137+
* @since 3.3
138+
*/
139+
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);
140+
97141
/**
98142
* Append one or more records to the stream {@code key}.
99143
*

‎src/main/java/org/springframework/data/redis/core/StreamOperations.java‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.domain.Range;
2626
import org.springframework.data.redis.connection.Limit;
2727
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
28+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2829
import org.springframework.data.redis.connection.stream.ByteRecord;
2930
import org.springframework.data.redis.connection.stream.Consumer;
3031
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -53,6 +54,7 @@
5354
* @author Dengliming
5455
* @author Marcin Zielinski
5556
* @author John Blum
57+
* @author jinkshower
5658
* @since 2.2
5759
*/
5860
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -95,6 +97,53 @@ default Long acknowledge(String group, Record<K, ?> record) {
9597
return acknowledge(record.getRequiredStream(), group, record.getId());
9698
}
9799

100+
/**
101+
* Append a record to the stream {@code key} with the specified options.
102+
*
103+
* @param key the stream key.
104+
* @param content record content as Map.
105+
* @param xAddOptions additional parameters for the {@literal XADD} call.
106+
* @return the record Id. {@literal null} when used in pipeline / transaction.
107+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
108+
* @since 3.3
109+
*/
110+
@SuppressWarnings("unchecked")
111+
@Nullable
112+
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
113+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
114+
}
115+
116+
/**
117+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
118+
*
119+
* @param record the record to append.
120+
* @param xAddOptions additional parameters for the {@literal XADD} call.
121+
* @return the record Id. {@literal null} when used in pipeline / transaction.
122+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
123+
* @since 3.3
124+
*/
125+
@SuppressWarnings("unchecked")
126+
@Nullable
127+
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
128+
return add((Record) record, xAddOptions);
129+
}
130+
131+
/**
132+
* Append the record, backed by the given value, to the stream with the specified options.
133+
* The value will be hashed and serialized.
134+
*
135+
* @param record must not be {@literal null}.
136+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
137+
* @return the record Id. {@literal null} when used in pipeline / transaction.
138+
* @see MapRecord
139+
* @see ObjectRecord
140+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
141+
* @since 3.3
142+
*/
143+
@SuppressWarnings("unchecked")
144+
@Nullable
145+
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);
146+
98147
/**
99148
* Append a record to the stream {@code key}.
100149
*

‎src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java‎

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.data.redis.connection.Limit;
3636
import org.springframework.data.redis.connection.RedisConnection;
3737
import org.springframework.data.redis.connection.RedisConnectionFactory;
38+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3839
import org.springframework.data.redis.connection.stream.Consumer;
3940
import org.springframework.data.redis.connection.stream.MapRecord;
4041
import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -61,6 +62,7 @@
6162
* @author Mark Paluch
6263
* @author Christoph Strobl
6364
* @author Marcin Zielinski
65+
* @author jinkshower
6466
*/
6567
@MethodSource("testParams")
6668
@SuppressWarnings("unchecked")
@@ -192,6 +194,102 @@ void addShouldAddReadSimpleMessageWithRawSerializer() {
192194
.verifyComplete();
193195
}
194196

197+
@ParameterizedRedisTest // GH-2915
198+
void addShouldAddMessageWithOptions() {
199+
200+
K key = keyFactory.instance();
201+
HK hashKey = hashKeyFactory.instance();
202+
HV value = valueFactory.instance();
203+
204+
streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
205+
206+
HV newValue = valueFactory.instance();
207+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
208+
209+
RecordId messageId = streamOperations.add(key, Collections.singletonMap(hashKey, newValue), options).block();
210+
211+
streamOperations.range(key, Range.unbounded()) //
212+
.as(StepVerifier::create) //
213+
.consumeNextWith(actual -> {
214+
215+
assertThat(actual.getId()).isEqualTo(messageId);
216+
assertThat(actual.getStream()).isEqualTo(key);
217+
assertThat(actual).hasSize(1);
218+
219+
if (!(key instanceof byte[] || value instanceof byte[])) {
220+
assertThat(actual.getValue()).containsEntry(hashKey, newValue);
221+
}
222+
223+
}) //
224+
.verifyComplete();
225+
}
226+
227+
@ParameterizedRedisTest // GH-2915
228+
void addShouldAddReadSimpleMessageWithOptions() {
229+
230+
assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
231+
&& !(serializer instanceof GenericJackson2JsonRedisSerializer)
232+
&& !(serializer instanceof JdkSerializationRedisSerializer) && !(serializer instanceof OxmSerializer));
233+
234+
K key = keyFactory.instance();
235+
HV value = valueFactory.instance();
236+
237+
streamOperations.add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
238+
239+
HV newValue = valueFactory.instance();
240+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
241+
242+
RecordId messageId = streamOperations.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
243+
244+
streamOperations.range((Class<HV>) value.getClass(), key, Range.unbounded())
245+
.as(StepVerifier::create) //
246+
.consumeNextWith(actual -> {
247+
assertThat(actual.getId()).isEqualTo(messageId);
248+
assertThat(actual.getStream()).isEqualTo(key);
249+
250+
assertThat(actual.getValue()).isEqualTo(newValue);
251+
}) //
252+
.expectNextCount(0)
253+
.verifyComplete();
254+
}
255+
256+
@ParameterizedRedisTest // GH-2915
257+
void addShouldAddReadSimpleMessageWithRawSerializerWithOptions() {
258+
259+
assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
260+
&& !(serializer instanceof GenericJackson2JsonRedisSerializer));
261+
262+
SerializationPair<K> keySerializer = redisTemplate.getSerializationContext().getKeySerializationPair();
263+
264+
RedisSerializationContext<K, String> serializationContext = RedisSerializationContext
265+
.<K, String> newSerializationContext(StringRedisSerializer.UTF_8).key(keySerializer)
266+
.hashValue(SerializationPair.raw()).hashKey(SerializationPair.raw()).build();
267+
268+
ReactiveRedisTemplate<K, String> raw = new ReactiveRedisTemplate<>(redisTemplate.getConnectionFactory(),
269+
serializationContext);
270+
271+
K key = keyFactory.instance();
272+
Person value = new PersonObjectFactory().instance();
273+
274+
raw.opsForStream().add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
275+
276+
Person newValue = new PersonObjectFactory().instance();
277+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
278+
279+
RecordId messageId = raw.opsForStream().add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
280+
281+
raw.opsForStream().range((Class<HV>) value.getClass(), key, Range.unbounded())
282+
.as(StepVerifier::create) //
283+
.consumeNextWith(it -> {
284+
285+
assertThat(it.getId()).isEqualTo(messageId);
286+
assertThat(it.getStream()).isEqualTo(key);
287+
assertThat(it.getValue()).isEqualTo(newValue);
288+
}) //
289+
.expectNextCount(0)
290+
.verifyComplete();
291+
}
292+
195293
@ParameterizedRedisTest // DATAREDIS-864
196294
void rangeShouldReportMessages() {
197295

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /