Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Skip counter in ItemWriter #4507

Unanswered
peterhalachan asked this question in Q&A
Discussion options

I am facing an issue with skip count for a step which I have implemented. I have an item writer which sends messages to Kafka. In case an exception occurs in this writer the item is removed from the chunk and I expect that the skip count gets updated but instead of that all the objects which are passed to the write method are recorded as successful.
In case I rethrow the exception in the catch block I get the correct count but I noticed that more messages are sent to Kafka than it is expected and also the processing time takes longer
Is there a way of updating the skip count?
I am using spring batch version 5.0.3

My item writer

@Component
@StepScope
@AllArgsConstructor
public class KafkaWriter implements ItemWriter<JsonNode> {
 private final KafkaTemplate<String, JsonNode> kafkaTemplate;
 @Override
 public void write(Chunk<? extends JsonNode> chunk) throws Exception {
 Chunk<? extends JsonNode>.ChunkIterator iterator = chunk.iterator();
 while (iterator.hasNext()) {
 JsonNode jsonItem = iterator.next();
 try {
 ProducerRecord<String, JsonNode> producerRecord = createProducerRecord(jsonItem);
 send(producerRecord);
 } catch (Exception e) {
 iterator.remove(e);
 }
 }
 }
 private void send(ProducerRecord<String, JsonNode> producerRecord) throws ExecutionException, InterruptedException {
 CompletableFuture<SendResult<String, JsonNode>> future = kafkaTemplate.send(producerRecord);
 future.get();
 }
 private ProducerRecord<String, JsonNode> createProducerRecord(JsonNode jsonNode) {
 List<Header> headers = List.of();
 return new ProducerRecord<>("foo", null, "bar", jsonNode, headers);
 }
}

Listener which reports the counts

@Slf4j
@AllArgsConstructor
public class FooListener implements StepExecutionListener {
 @Override
 public void beforeStep(StepExecution stepExecution) {
 }
 @Override
 public ExitStatus afterStep(StepExecution stepExecution) {
 long readCount = stepExecution.getReadCount();
 long skipCount = stepExecution.getSkipCount();
 long writeCount = stepExecution.getWriteCount();
 log.info("ReadCount:[{}], SkipCount:[{}], WriteCount:[{}]", readCount, skipCount, writeCount);
 return stepExecution.getExitStatus();
 }
}

and my step configuration

@Bean
public Step fooStep(
 SynchronizedItemStreamReader reader,
 AsyncItemProcessor processor,
 AsyncItemWriter writer) {
 return new StepBuilder("Bar STEP", jobRepository)
 .listener(new FooListener())
 .<Object, JsonNode>chunk(10, transactionManager)
 .reader(reader)
 .processor(processor)
 .writer(writer)
 .faultTolerant()
 .skip(Exception.class)
 .skipPolicy(new AlwaysSkipItemSkipPolicy())
 .noRetry(Exception.class)
 .processorNonTransactional()
 .build();
}
You must be logged in to vote

Replies: 1 comment

Comment options

calling iterator.remove(e); in the catch block should be sufficient. I don't think it is expected from the user to rethrow the original exception for things to work. This could be a bug in Spring Batch, but I would prefer to validate that with a failing test or a complete example that reproduces the issue. If you manage to prepare that, I can debug the case and help you efficiently.

You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter

AltStyle によって変換されたページ (->オリジナル) /