-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Skip counter in ItemWriter #4507
-
I am facing an issue with skip count for a step which I have implemented. I have an item writer which sends messages to Kafka. In case an exception occurs in this writer the item is removed from the chunk and I expect that the skip count gets updated but instead of that all the objects which are passed to the write method are recorded as successful.
In case I rethrow the exception in the catch block I get the correct count but I noticed that more messages are sent to Kafka than it is expected and also the processing time takes longer
Is there a way of updating the skip count?
I am using spring batch version 5.0.3
My item writer
@Component @StepScope @AllArgsConstructor public class KafkaWriter implements ItemWriter<JsonNode> { private final KafkaTemplate<String, JsonNode> kafkaTemplate; @Override public void write(Chunk<? extends JsonNode> chunk) throws Exception { Chunk<? extends JsonNode>.ChunkIterator iterator = chunk.iterator(); while (iterator.hasNext()) { JsonNode jsonItem = iterator.next(); try { ProducerRecord<String, JsonNode> producerRecord = createProducerRecord(jsonItem); send(producerRecord); } catch (Exception e) { iterator.remove(e); } } } private void send(ProducerRecord<String, JsonNode> producerRecord) throws ExecutionException, InterruptedException { CompletableFuture<SendResult<String, JsonNode>> future = kafkaTemplate.send(producerRecord); future.get(); } private ProducerRecord<String, JsonNode> createProducerRecord(JsonNode jsonNode) { List<Header> headers = List.of(); return new ProducerRecord<>("foo", null, "bar", jsonNode, headers); } }
Listener which reports the counts
@Slf4j @AllArgsConstructor public class FooListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { long readCount = stepExecution.getReadCount(); long skipCount = stepExecution.getSkipCount(); long writeCount = stepExecution.getWriteCount(); log.info("ReadCount:[{}], SkipCount:[{}], WriteCount:[{}]", readCount, skipCount, writeCount); return stepExecution.getExitStatus(); } }
and my step configuration
@Bean public Step fooStep( SynchronizedItemStreamReader reader, AsyncItemProcessor processor, AsyncItemWriter writer) { return new StepBuilder("Bar STEP", jobRepository) .listener(new FooListener()) .<Object, JsonNode>chunk(10, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skip(Exception.class) .skipPolicy(new AlwaysSkipItemSkipPolicy()) .noRetry(Exception.class) .processorNonTransactional() .build(); }
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 1 comment
-
calling iterator.remove(e);
in the catch block should be sufficient. I don't think it is expected from the user to rethrow the original exception for things to work. This could be a bug in Spring Batch, but I would prefer to validate that with a failing test or a complete example that reproduces the issue. If you manage to prepare that, I can debug the case and help you efficiently.
Beta Was this translation helpful? Give feedback.