Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

GH-4941: Add batchSize option to RedisItemReader for optimizing N+1 problem using Redis MGET #4942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
DevSeongmin wants to merge 1 commit into spring-projects:main
base: main
Choose a base branch
Loading
from DevSeongmin:GH-4941
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package org.springframework.batch.item.redis;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
Expand All @@ -39,38 +45,100 @@
*/
public class RedisItemReader<K, V> implements ItemStreamReader<V> {

private static final int DEFAULT_BATCH_SIZE = 1;

private final RedisTemplate<K, V> redisTemplate;

private final ScanOptions scanOptions;

private final int batchSize;

private Cursor<K> cursor;

private Queue<V> valueBuffer;

private boolean bufferInitialized = false;

public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions) {
this(redisTemplate, scanOptions, DEFAULT_BATCH_SIZE);
}

public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions, int batchSize) {
Assert.notNull(redisTemplate, "redisTemplate must not be null");
Assert.notNull(scanOptions, "scanOptions must no be null");
Assert.isTrue(batchSize > 0, "batchSize must be greater than 0");

this.redisTemplate = redisTemplate;
this.scanOptions = scanOptions;
this.batchSize = batchSize;
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.cursor = this.redisTemplate.scan(this.scanOptions);

if (batchSize > 1) {
this.valueBuffer = new ConcurrentLinkedQueue<>();
this.bufferInitialized = true;
}
}

@Override
public V read() throws Exception {
if (batchSize == 1) {
return readSingle();
}
else {
return readBatched();
}
}

private V readSingle() throws Exception {
if (this.cursor.hasNext()) {
K nextKey = this.cursor.next();
return this.redisTemplate.opsForValue().get(nextKey);
}
else {
} else {
return null;
}
}

private V readBatched() throws Exception {
if (valueBuffer.isEmpty()) {
fillBuffer();
}
return valueBuffer.poll();
}

private void fillBuffer() {
if (!cursor.hasNext()) {
return;
}

List<K> keyBatch = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize && cursor.hasNext(); i++) {
keyBatch.add(cursor.next());
}

if (!keyBatch.isEmpty()) {
List<V> values = redisTemplate.opsForValue().multiGet(keyBatch);

if (values != null) {
values.stream()
.filter(Objects::nonNull)
.forEach(valueBuffer::offer);
}
}
}

@Override
public void close() throws ItemStreamException {
this.cursor.close();
if (this.cursor != null) {
this.cursor.close();
}

if (bufferInitialized && valueBuffer != null) {
valueBuffer.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class RedisItemReaderBuilder<K, V> {

private ScanOptions scanOptions;

private int batchSize = 1;

/**
* Set the {@link RedisTemplate} to use in the reader.
* @param redisTemplate the template to use
Expand All @@ -53,12 +55,35 @@ public RedisItemReaderBuilder<K, V> scanOptions(ScanOptions scanOptions) {
return this;
}

/**
* Set the batch size for optimized Redis operations.
*
* <p>When batchSize is 1 (default), the reader operates in single-key mode
* for complete backward compatibility. When batchSize is greater than 1,
* the reader uses Redis MGET to fetch multiple keys in a single operation,
* significantly improving performance by reducing network round-trips.</p>
*
* <p>Higher batch sizes reduce network overhead but may increase memory usage.
* Consider your memory constraints when setting this value.</p>
*
* @param batchSize the number of keys to fetch in each Redis operation (must be > 0)
* @return the current builder instance for fluent chaining
* @throws IllegalArgumentException if batchSize is less than or equal to 0
*/
public RedisItemReaderBuilder<K, V> batchSize(int batchSize) {
if (batchSize <= 0) {
throw new IllegalArgumentException("Batch size must be greater than 0");
}
this.batchSize = batchSize;
return this;
}

/**
* Build a new {@link RedisItemReader}.
* @return a new item reader
*/
public RedisItemReader<K, V> build() {
return new RedisItemReader<>(this.redisTemplate, this.scanOptions);
return new RedisItemReader<>(this.redisTemplate, this.scanOptions, this.batchSize);
}

}

AltStyle によって変換されたページ (->オリジナル) /