Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 183bc96

Browse files
committed
Fix KafkaItemReader ExecutionContext deserialization error when using Jackson2ExecutionContextStringSerializer
Signed-off-by: Hyunwoo Jung <hyunwoojung@kakao.com>
1 parent f90e965 commit 183bc96

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

‎spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
*
4949
* @author Mathieu Ouellet
5050
* @author Mahmoud Ben Hassine
51+
* @author Hyunwoo Jung
5152
* @since 4.2
5253
*/
5354
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
@@ -56,6 +57,8 @@ public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
5657

5758
private static final long DEFAULT_POLL_TIMEOUT = 30L;
5859

60+
private final String topicName;
61+
5962
private final List<TopicPartition> topicPartitions;
6063

6164
private Map<TopicPartition, Long> partitionOffsets;
@@ -110,6 +113,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List<Int
110113
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided");
111114
this.consumerProperties = consumerProperties;
112115
Assert.hasLength(topicName, "Topic name must not be null or empty");
116+
this.topicName = topicName;
113117
Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided");
114118
this.topicPartitions = new ArrayList<>();
115119
for (Integer partition : partitions) {
@@ -174,10 +178,10 @@ public void open(ExecutionContext executionContext) {
174178
}
175179
}
176180
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
177-
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext
178-
.get(TOPIC_PARTITION_OFFSETS);
179-
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
180-
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181+
Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
182+
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
183+
this.partitionOffsets.put(newTopicPartition(this.topicName, Integer.parseInt(entry.getKey())),
184+
entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181185
}
182186
}
183187
this.kafkaConsumer.assign(this.topicPartitions);
@@ -203,7 +207,11 @@ public V read() {
203207
@Override
204208
public void update(ExecutionContext executionContext) {
205209
if (this.saveState) {
206-
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
210+
Map<String, Long> offsets = new HashMap<>();
211+
for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
212+
offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
213+
}
214+
executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
207215
}
208216
this.kafkaConsumer.commitSync();
209217
}

‎spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
* @author Mahmoud Ben Hassine
6060
* @author François Martin
6161
* @author Patrick Baumgartner
62+
* @author Hyunwoo Jung
6263
*/
6364
@Testcontainers(disabledWithoutDocker = true)
6465
@ExtendWith(SpringExtension.class)
@@ -266,8 +267,8 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
266267
future.get();
267268
}
268269
ExecutionContext executionContext = new ExecutionContext();
269-
Map<TopicPartition, Long> offsets = new HashMap<>();
270-
offsets.put(newTopicPartition("topic3", 0), 1L);
270+
Map<String, Long> offsets = new HashMap<>();
271+
offsets.put("0", 1L);
271272
executionContext.put("topic.partition.offsets", offsets);
272273

273274
// topic3-0: val0, val1, val2, val3, val4
@@ -307,9 +308,9 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
307308
}
308309

309310
ExecutionContext executionContext = new ExecutionContext();
310-
Map<TopicPartition, Long> offsets = new HashMap<>();
311-
offsets.put(newTopicPartition("topic4", 0), 1L);
312-
offsets.put(newTopicPartition("topic4", 1), 2L);
311+
Map<String, Long> offsets = new HashMap<>();
312+
offsets.put("0", 1L);
313+
offsets.put("1", 2L);
313314
executionContext.put("topic.partition.offsets", offsets);
314315

315316
// topic4-0: val0, val2, val4, val6

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /