Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Spring Batch: EmptyResultDataAccessException on step status update in production job #4683

Unanswered
NikolasTrapp asked this question in Q&A
Discussion options

We're facing an issue with a Spring Batch job that only occurs in our production environment. The job fails with the following error when attempting to update the step status, which causes the entire job to halt and leaves inconsistencies in the application:

org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0 
 ...
 at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution
 ...

Context:
We are building a multi-tenant Spring Cloud application. The issue happens during a job used to process customer file imports. The process consists of two steps:

  • First job: Reads records from the user-uploaded file and saves them to database tables.
  • Second job: Once all data has been imported, a second job reads the inserted records, maps them to domain entities, and sends the data via RabbitMQ to various microservices.

The error only occurs in the second job, specifically when reading the database entries and sending them to the microservices. Importantly, the first job, which mirrors the configuration of the second job (Uses teh same CustomDefaultConfiguration, transactionManager and datasource), has never experienced this issue, suggesting that the problem likely stems from some implementation specific to the second job

We suspect the issue may be related to concurrency or transaction handling, but it’s challenging to replicate this locally. We've reviewed our configuration and tried various adjustments without success.

How can we better diagnose this issue in a production-like setting, given its environment-specific nature?
What configurations or strategies should we explore to manage concurrency and ensure consistent step execution status updates?

Any insights into what might cause this error in a production environment, particularly with a custom transaction manager in a multi-tenant setup? What approaches should we try to better identify the root cause?

Code:
We have a custom configuration for DefaultBatchConfigurer to use our MultiTenantConnectionProvider:

@Configuration
@EnableBatchProcessing
public class CustomDefaultBatchConfigurer extends DefaultBatchConfigurer {
 protected final ServiceMultiTenantConnectionProvider serviceMultiTenantConnectionProvider;
 private final PlatformTransactionManager transactionManager;
 public CustomDefaultBatchConfigurer(ServiceMultiTenantConnectionProvider serviceMultiTenantConnectionProvider, PlatformTransactionManager transactionManager) {
 super(serviceMultiTenantConnectionProvider.getDataSource());
 this.serviceMultiTenantConnectionProvider = serviceMultiTenantConnectionProvider;
 this.transactionManager = transactionManager;
 }
 @Override
 public PlatformTransactionManager getTransactionManager() {
 return transactionManager;
 }
}

This is where the job is started:

@RequiredArgsConstructor
@Component
@Slf4j
public class SendDataImpl implements SendData {
 private final JobLauncher jobLauncher;
 private final JobSendData job;
 @Override
 public void execute(List<SendDataParams> params, UUID ticketImplantation, UUID ticketSendData) {
 var jobParams = new JobParametersBuilder() //
 .addString("ticketImplantacao", ticketImplantation.toString()) //
 .addString("ticketEnvioDados", ticketSendData.toString()) //
 .toJobParameters();
 try {
 jobLauncher.run(job.create(params, ticketImplantation, ticketSendData), jobParams);
 } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
 | JobParametersInvalidException e) {
 ...
 }
 }
}

Our JobSendData:

@Component
@RequiredArgsConstructor
@Slf4j
public class SendDataImpl implements SendData {
 public final JobBuilderFactory jobBuilderFactory;
 protected final StepBuilderFactory stepBuilderFactory;
 private final List<SenderStep> beanSteps;
 @Override
 public Job create(List<SendDataParams> params, UUID ticketImplantation, UUID ticketSendData) {
 var paramsMap = params.stream()
 .collect(Collectors.toMap(it -> it.getFile().getCategory(), Function.identity()));
 var job = jobBuilderFactory.get(format("SendData_ticket_{}", ticketSendData))
 .incrementer(new RunIdIncrementer())
 .listener(new SendDataJobExecutionListenerImpl(params))
 .start(createStartStep(ticketSendData));
 beanSteps.stream() //
 .forEach(bean -> job.next(bean.createStep(paramsMap.get(bean.getCategory().name()))));
 job.next(createEndStep(ticketSendData));
 return job.build();
 }
 ...
}

We have a SenderStep intarface for generic implementation:

public interface SenderStep {
 Step createStep(SendDataParams params);
...
}

And we have a abstract class to create steps:

public abstract class BaseSenderStep<I, O> implements SenderStep {
 private final SendDataBatchDependencyProvider dependencyProvider;
 protected String updateCSVEntitySQL;
 protected BaseSenderStep(SendDataBatchDependencyProvider dependencyProvider) {
 this.dependencyProvider = dependencyProvider;
 initSQL();
 }
 private void initSQL() {
 //Would be something like: update table_csv x set x.status = 'SENDING', x.target_item_id = '...', x.message = '...' where x.id = '....';
 updateCSVEntitySQL = format("UPDATE {} x SET x.{} = :{}, x.{} = :{}, x.{} = :{} WHERE x.{} = :{}", //
 getInputClass().getSimpleName(), //
 ControleCsv.STATUS, ControleCsv.STATUS, //
 ControleCsv.TARGET_ITEM_ID, ControleCsv.TARGET_ITEM_ID, //
 ControleCsv.MESSAGE, ControleCsv.MESSAGE, //
 FIELD_CSV_ID, FIELD_CSV_ID);
 }
 @Override
 public Step createStep(SendDataParams params) {
 var name = getName(params, "Create");
 var step = dependencyProvider.getStepBuilderFactory().get(name) //
 .<I, ItemWrapper<I, O>>chunk(getChunkSize()) //
 .reader(createReader(params)) //
 .processor(createItemProcessor(params)) //
 .writer(createWriter(params)) //
 .listener(getStepExecutionListener(params)) //
 .listener(getChunkListener(params));
 return step.build();
 }
 ....
 
 protected JdbcPagingItemReader<I> createReader(SendDataParams params) {
 try {
 var dataSource = dependencyProvider.getServiceMultiTenantConnectionProvider().getDataSource();
 var tableName = getTableName();
 var queryProvider = createQueryProvider(dataSource, tableName, params);
 return buildReader(dataSource, queryProvider);
 } catch (Exception e) {
 log.error("[CsvSender] - Erro ao criar o JdbcPagingItemReader para leitura paginada do step: '{}'. Motivo: '{}'", params, getMessage(e), e);
 return null;
 }
 }
 protected String getTableName() {
 return getInputClass().getAnnotation(Table.class).name();
 }
 private SqlPagingQueryProviderFactoryBean createQueryProvider(DataSource dataSource, String tableName, SendDataParams params) {
 var queryProvider = new SqlPagingQueryProviderFactoryBean();
 var where = new StringBuilder();
 where.append(format("WHERE status IN ('READY', 'ERROR') "))
 .append(format("AND implantacao = '{}' ", params.getTicketImplantacao()))
 .append(format("AND empresa = '{}'", params.getEmpresaId()));
 queryProvider.setDataSource(dataSource);
 queryProvider.setSelectClause("SELECT * ");
 queryProvider.setFromClause(format("FROM {} ", tableName));
 queryProvider.setWhereClause(where.toString());
 queryProvider.setSortKey(FIELD_CSV_ID);
 return queryProvider;
 }
 private JdbcPagingItemReader<I> buildReader(DataSource dataSource, SqlPagingQueryProviderFactoryBean queryProvider) throws Exception {
 var reader = new JdbcPagingItemReader<I>();
 var query = Objects.requireNonNull(queryProvider.getObject());
 reader.setQueryProvider(query);
 reader.setDataSource(dataSource);
 reader.setPageSize(getChunkSize());
 reader.setRowMapper(new CustomRowMapper<>(getInputClass()));
 reader.afterPropertiesSet();
 return reader;
 }
 protected ItemProcessor<I, ItemWrapper<I, O>> createItemProcessor(SendDataParams params) {
 return input -> {
 try {
 var output = mapToTarget(input, params);
 return ItemWrapper.of(input, output);
 } catch (Exception e) {
 ...
 return null;
 }
 };
 }
 protected ItemWriter<ItemWrapper<I, O>> createWriter(SendDataParams params) {
 return items -> {
 try {
 doWriter(items, params);
 } catch (Exception e) {
 //log
 }
 };
 }
 protected void doWriter(List<? extends ItemWrapper<I, O>> items, SendDataParams params) {
 items.forEach(item -> sendItem(item, params));
 }
 private void sendItem(ItemWrapper<I, O> item, SendDataParams params) {
 try {
 doBeforeSend(List.of(item.getInput()), params);
 var response = doSendItem(item.getOutput());
 item.setOutput(response);
 doSuccess(item);
 } catch (Exception e1) {
 try {
 doError(item.getInput(), e1);
 } catch (Exception e2) {
 //log
 }
 }
 }
 protected void doBeforeSend(List<@NonNull I> items, SendDataParams params) {
 var em = getDependencyProvider().getEm();
 boolean isManyRecords = items.size() > 1;
 if (isManyRecords) {
 var session = em.unwrap(Session.class);
 session.setJdbcBatchSize(items.size());
 }
 items.forEach(item -> updateCSVData(em, getIdFieldValue(item), null, ProcessStatusCsv.SENDING, null));
 if (isManyRecords) {
 em.flush();
 em.clear();
 }
 }
 protected void updateCSVData(EntityManager em, Object id, String targetItemId, ProcessStatusCsv status, String message) {
 try {
 em.createQuery(updateCSVEntitySQL) //
 .setParameter(ControleCsv.STATUS, status) //
 .setParameter(ControleCsv.TARGET_ITEM_ID, targetItemId) //
 .setParameter(ControleCsv.MESSAGE, message) //
 .setParameter(FIELD_CSV_ID, id) //
 .executeUpdate();
 } catch (Exception e) {
 ...
 throw e;
 }
 }
 protected O doSendItem(O item) {
 dependencyProvider.getBeanValidator().validate(item);
 var message = buildMessage(item);
 return dependencyProvider.getMessenger().requestSync(message, getOutputClass()); //Sends the entity to the service.
 }
 protected void doError(I input, Exception e) {
 var em = getDependencyProvider().getEm();
 updateCSVData(em, getIdFieldValue(input), null, ProcessStatusCsv.ERROR, null);
 }
 protected void doSuccess(ItemWrapper<I, O> item) {
 try {
 ....
 var status = ProcessStatusCsv.DONE;
 updateCSVData(em, id, targetItemId, status, null);
 } catch (Exception e) {
 ...
 throw e;
 }
 }
}

And we also have a ChunkListener that controlls visual data

@Slf4j
@RequiredArgsConstructor
public class SenderStepChunkListener implements ChunkListener {
....
//This method basically updates the importation entity, that is a visual row in our system that shows progress status of the proccess.
 @Override
 public void afterChunk(ChunkContext context) {
 var importation= doInTransaction.callReadOnly(() -> importationDao.findByTicketAndCategoria(ticketimportation, categoria, ticketImplantacao));
 var errors = getRowsWithStatus(ProcessStatusCsv.ERROR, importation.getTicket());
 var success = getRowsWithStatus(ProcessStatusCsv.DONE, importation.getTicket());
 importation.setRegistrosWithSuccess(success );
 importation.setErrors(errors );
 doInTransaction.run(() -> importationRepository.save(importation));
 }
 private long getRowsWithStatus(ProcessStatusCsv status, UUID ticket) {
 var sql = format("SELECT COUNT(*) FROM {} x WHERE x.{} = '{}' AND x.{} = '{}'",
 categoria.getNomeEntidade(), ControleCsv.STATUS, status, ControleCsv.TICKET, ticket);
 try {
 return (long) doInTransaction.callReadOnly(() -> em.createQuery(sql).getSingleResult());
 } catch (Exception e) {
 ...
 return 0L;
 }
 }
....
}

And our StepExecutionListener:

@Slf4j
@RequiredArgsConstructor
public class SenderStepExecutionListenerImpl implements StepExecutionListener {
 private final EntityManager em;
 private final DoInTransaction doInTransaction;
 private final ImportationDao importationDao;
 private final ImportationRepository importationRepository;
 @Override
 public ExitStatus afterStep(StepExecution stepExecution) {
 var importation = doInTransaction.callReadOnly(() -> importationDao.findByTicketAndCategoria(ticketImportation, categoria, ticketImplantacao));
 try {
 doInTransaction.runAsNew(() -> {
 updateVisualImportationData(importation);
 });
 } catch (Exception e) {
 ...
 }
 return ExitStatus.EXECUTING;
 }
 private void updateVisualImportationData(ImportationEntity importation) {
 var invalidCount = importationDao.findInvalidCount(importation);
 var successCount = importationDao.findSuccessCount(importation);
 var total = importationDao.findTotalCount(importation);
 updateImportation(successCount, invalidCount, total, importation, false);
 }
 
 private updateImportation(long success, long error, long totasl, importation, boolean failed) {
 ....... //set the importation data
 importationRepository.save(importation);
 }
 ...

DoInTransaction basically runs the code in a @Transactional method.

You must be logged in to vote

Replies: 1 comment

Comment options

Thank you for opening this discussion and for providing these details. I can't see from what you shared what could cause this issue (this kind of concurrency issues are hard to diagnose statically). Were you able to replicate the behaviour similar to what you have in production with a testcontainers-based test? We have some samples in the code base with different databases and brokers, so if you manage to create a minimal example that reproduces the issue, I will take a look deeper. Thank you.

You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter

AltStyle によって変換されたページ (->オリジナル) /