Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Configuration Spring Batch Bean #4440

Unanswered
CodingMakeWordBetter asked this question in Q&A
Discussion options

Hello, I am currently optimizing the code for a historical project that uses Spring batch. The logic is that each product has a job, step, listener, reader, processor, and writer. These are all fine, but there are over 20 products in the historical project, which has caused code redundancy in many places for these classes. I am considering making them configurable, with the aim of automatically creating beans by filling in a small amount of necessary configurations. This spring theory is feasible, but I encountered a problem during the process, which is that the current project reader has the @ StepScope annotation, This bean was created only when the step was used, but now I am creating the bean through configuration. How can I manually register the bean that was created only when the step was used in the spring boot container? The effect of creating a new step after injection is ensured. I would be happy if you could solve my confusion

You must be logged in to vote

Replies: 3 comments 5 replies

Comment options

Can you please share a sample code that shows the duplication you want to refactor and the issue with the step-scoped bean? This would help us understand the problem clearly and help you efficiently. Thank you.

You must be logged in to vote
3 replies
Comment options

My business code mainly generates corresponding product daily reports based on the corresponding product flow, and then generates customer daily reports and supplier daily reports based on the product daily reports. More than 20 products follow this logic.So there will be a job, under which there will be three steps: generating product daily reports, customer daily reports, and supplier daily reports.
The steps for generating product daily reports, customer daily reports, and supplier daily reports are all three steps. The reader aggregates the flow in SQL, the processor converts the aggregated results into the entity class of the daily report, and the writer writes them into the daily report. That means there will be

Product Daily: productReader, productProcessor, and productWriter.

Customer daily reports: productCustomer Reader, productCustomer Processor, and productCustomer Writer.

Supplier daily report: productVendorReader, productVendorProcessor, productVendorWriter.

So a product needs to go through the redundant process of creating beans, and my expectation is to implement an abstract class. In the future, when writing new products, people only need to implement this abstract class and fill in the necessary attributes. For example, the reader actually only needs to pass in the SQL string and queryArguments, the processor only needs to pass in an anonymous internal class of the implementation, and the writer only needs to pass in the SQL. In this way, I will help implement the product daily report class of this abstract class as a bean

The problem now is that the reader reads the pipeline product, where there are date conditional input parameters through @ Value ("# {jobParameters}") Map<String, Object>jobParameters, and this scope will only create beans in this step. So I can't create beans in advance

Comment options

@EnableBatchProcessing
@Configuration
public class WaDayReportBatch {
 @Bean
 public Job waDayReportJob(
 @Qualifier("waDayReportStep") Step waDayReportStep,
 @Qualifier("customerWaDayStep") Step customerWaDayStep,
 @Qualifier("vendorWaDayStep") Step vendorWaDayStep
 ){
 String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
 return jobBuilderFactory.get(funcName)
 .listener(waDayReportJobListener)
 .flow(waDayReportStep)
 .next(customerWaDayStep)
 .next(vendorWaDayStep)
 .end()
 .build();
 }
 @Bean
 public Step waDayReportStep(JdbcCursorItemReader<WaDayReportDto> waDayReader,
 WaDayReportProcessor waDayReportProcessor,
 JdbcBatchItemWriter<WaDayReport> waDayWriter){
 String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
 return stepBuilderFactory.get(funcName)
 .listener(waDayStepListener)
 .<WaDayReportDto, WaDayReport>chunk(1000)
 .reader(waDayReader)
 .processor(waDayReportProcessor)
 .writer(waDayWriter)
 .build();
 }
 @Bean
 @StepScope
 public JdbcCursorItemReader<WaDayReportDto> waDayReader(
 ApplicationContext applicationContext,
 @Value("#{jobParameters}") Map<String, Object> jobParameters) throws ParseException {
 return new JdbcCursorItemReaderBuilder<WaDayReportDto>()
 .name(funcName)
 .dataSource(dataSource)
 .queryTimeout(600)
 .sql(readSql).queryArguments(calcDay, startDate, endDate)
 .verifyCursorPosition(false)
 .rowMapper(BeanPropertyRowMapper.newInstance(WaDayReportDto.class))
 .build();
 }
 @Bean
 public WaDayReportProcessor waDayReportProcessor(){
 return e-> {
 WaDayReport dayReport = new WaDayReport();
 BeanUtils.copyProperties(dayReportDto, dayReport);
 dayReport.setSubjectCode(CrcUtil.crc32(dayReportDto.getCustomerSubject()));
 return dayReport;
 };
 }
 @Bean
 public JdbcBatchItemWriter<WaDayReport> waDayWriter(@Qualifier("financeDataSource")DataSource financeDataSource){
 return new JdbcBatchItemWriterBuilder<WaDayReport>()
 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
 .sql(insertSql)
 .dataSource(financeDataSource)
 .build();
 }
 @Bean
 public Step customerWaDayStep(
 ItemReader<CustomerDayReport> customerWaDayReader,
 ItemProcessor<CustomerDayReport, CustomerDayReport> customerDayProcessor,
 ItemWriter<CustomerDayReport> customerDayWriter
 ){
 String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
 return stepBuilderFactory.get(funcName)
 .listener(stepListener)
 .<CustomerDayReport, CustomerDayReport>chunk(20)
 .reader(customerWaDayReader)
 .processor(customerDayProcessor)
 .writer(customerDayWriter)
 .build();
 }
 @Bean
 @StepScope
 public JdbcCursorItemReader<CustomerDayReport> customerWaDayReader(
 @Autowired DataSource financeDataSource,
 @Value("#{jobParameters}") Map<String, Object> jobParameters
 ){
 return new JdbcCursorItemReaderBuilder<CustomerDayReport>()
 .name(funcName)
 .dataSource(financeDataSource)
 .sql(readSql).queryArguments(calcDay, calcDay)
 .verifyCursorPosition(false)
 .rowMapper(BeanPropertyRowMapper.newInstance(CustomerDayReport.class))
 .build();
 }
 @Bean
 public ItemProcessor<CustomerDayReport, CustomerDayReport> customerDayProcessor() {
 return item -> {
 return item;
 };
 }
 @Bean
 public JdbcBatchItemWriter<CustomerDayReport> customerDayWriter(@Qualifier("financeDataSource") DataSource financeDataSource) {
 return new JdbcBatchItemWriterBuilder<CustomerDayReport>()
 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
 .sql(insertSql)
 .dataSource(financeDataSource)
 .build();
 }
 @Bean
 public Step vendorWaDayStep(
 ItemReader<VendorDayReportDto> vendorWaDayReader,
 ItemProcessor<VendorDayReportDto, VendorDayReport> vendorDayProcess,
 ItemWriter<VendorDayReport> vendorDayWriter
 ) {
 String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
 return stepBuilderFactory.get(funcName)
 .listener(stepListener)
 .<VendorDayReportDto, VendorDayReport>chunk(20)
 .reader(vendorWaDayReader)
 .processor(vendorDayProcess)
 .writer(vendorDayWriter)
 .build();
 }
 @Bean
 @StepScope
 public JdbcCursorItemReader<VendorDayReportDto> vendorWaDayReader(
 @Autowired DataSource financeDataSource,
 @Value("#{jobParameters}") Map<String, Object> jobParameters
 ) {
 return new JdbcCursorItemReaderBuilder<VendorDayReportDto>()
 .name(funcName)
 .dataSource(financeDataSource)
 .sql(readSql).queryArguments(calcDay, calcDay)
 .verifyCursorPosition(false)
 .rowMapper(BeanPropertyRowMapper.newInstance(VendorDayReportDto.class))
 .build();
 }
 @Bean
 public ItemProcessor<VendorDayReportDto, VendorDayReport> vendorDayProcessor() {
 return report -> {
 return report;
 };
 }
 @Bean
 public JdbcBatchItemWriter<VendorDayReport> vendorDayWriter(@Qualifier("financeDataSource") DataSource financeDataSource) {
 return new JdbcBatchItemWriterBuilder<VendorDayReport>()
 .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
 .sql(insertSql)
 .dataSource(financeDataSource)
 .build();
 }
}
Comment options

The above is an example code block. It can be seen that adding a product requires manually creating 9 beans: Product Daily: productReader, productProcessor, and productWriter

Customer daily reports: productCustomer Reader, productCustomer Processor, and productCustomer Writer

Supplier daily report: productVendorReader, productVendorProcessor, productVendorWriter

However, a paradigm can be specified, such as a reader that only needs to implement abstract class reading pipeline SQL and query parameters, a processor that implements lambda expressions, and a writer that only needs one SQL statement. These things can help generate corresponding beans and reduce code. Now, the reader generates classes because passing in parameters every time a job is run makes it impossible to generate beans from the beginning

Comment options

Thank you for the feedback. If I understand correctly, the product in this example is Wa, and you can have 20 products, therefore 20 files similar to the one you shared, something like WaDayReportBatch, WbDayReportBatch, WcDayReportBatch, etc. Is that correct?

Moreover, are the DTOs similar or is there a DTO type per product? This is important to generify expressions like .<WaDayReportDto, WaDayReport>chunk(1000) in the reader's definition.

For step-scoped beans, there is a way to generate them dynamically with GenericBeanDefinition#setScope(String), see examples here and here, but I believe there is another way to address your use case probably with custom FactoryBeans. If you share a github repo with two job definitions that I can compile and run, I will try to help you refactor the configuration to a generic one.

You must be logged in to vote
2 replies
Comment options

I have reviewed your code and it mainly focuses on setting the scope to step. But what if you need to obtain jobParameters? Let me give you the specific content of my code

 @Bean
 @StepScope
 public JdbcCursorItemReader<WaDayReportDto> waDayReader(
 ApplicationContext applicationContext,
 @Value("#{jobParameters}") Map<String, Object> jobParameters) throws ParseException {
 String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
 String dsName = jobParameters.get("data_source").toString();
 DataSource dataSource = applicationContext.getBean(dsName, DataSource.class);
 String calcDay = jobParameters.get("calc_day").toString();
 String startDate = calcDay+ " 00:00:00";
 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 Calendar cd = Calendar.getInstance();
 cd.setTime(sdf.parse(startDate));
 cd.add(Calendar.DATE, 1);//增加一天
 String endDate = sdf.format(cd.getTime());
 String msgTbl = " wa_cdr ";
 String customerTbl = " customer ";
 String where = " where cdr.send_time >= ? and cdr.send_time < ? " ;
 if (Constants.CDR_DATA_SOURCE.equals(dsName)) {
 // 数仓语音表按月分表
 msgTbl = " wa_cdr_" + LocalDateUtil.format(LocalDateUtil.parse(calcDay), "yyyyMM");
 where = " where cdr.send_time >= to_date(?,'YYYY-MM-DD HH24:MI:SS') and cdr.send_time < to_date(?,'YYYY-MM-DD HH24:MI:SS') " ;
 customerTbl = " nx_customer ";
 }
 String readSql = " select cdr.customer_id," +
 " cdr.customer_app_id," +
 " cdr.cus_quotation_currency as customer_currency," +
 " cdr.customer_unit_price customer_price," +
 " cdr.customer_unit_fprice customer_original_price," +
 " c.contract_subject customer_subject," +
 " cdr.vendor_id," +
 " IFNULL(cdr.vendor_unit_price, 0) vendor_price," +
 " IFNULL(cdr.vendor_unit_fprice, 0) vendor_original_price," +
 " cdr.country_id," +
 " cdr.give_flag," +
 " cdr.cdr_type type," +
 " cdr.source, "+
 " ? as calc_day," +
 " count( cdr.id ) total_size," +
 " sum( customer_unit_price ) total_customer_price," +
 " sum( customer_unit_fprice ) total_customer_original_price," +
 " IFNULL(sum( vendor_unit_price ), 0) total_vendor_price," +
 " IFNULL(sum( vendor_unit_fprice ), 0) total_vendor_original_price," +
 " ( SUM( customer_unit_price ) - IFNULL(SUM( vendor_unit_price ), 0)) total_profit " +
 " FROM "+msgTbl+" cdr LEFT JOIN "+customerTbl+" c ON cdr.customer_id = c.id " +
 where+
 " and cdr.cdr_status = 1 " +
 " GROUP BY cdr.customer_id, cdr.customer_app_id, cdr.cus_quotation_currency, cdr.customer_unit_price, cdr.customer_unit_fprice, c.contract_subject, cdr.vendor_id, cdr.vendor_unit_price,cdr.vendor_unit_fprice, cdr.country_id, cdr.give_flag, cdr.cdr_type, cdr.source ";
 logger.info("sql:"+readSql+"; param: calcDay:"+calcDay);
 return new JdbcCursorItemReaderBuilder<WaDayReportDto>()
 .name(funcName)
 .dataSource(dataSource)
 .queryTimeout(600)
 .sql(readSql).queryArguments(calcDay, startDate, endDate)
 .verifyCursorPosition(false)
 .rowMapper(BeanPropertyRowMapper.newInstance(WaDayReportDto.class))
 .build();
 }
Comment options

As can be seen, the main requirement for building a reader in the above code is the value of the jobParameters at the beginning of the job, which is to put the type thread variable into it at the beginning of the job. So I couldn't get it when I couldn't build the bean. So my main intention here is to define an abstract class, where users only need to provide SQL and queryArguments to help them generate reader beans

Comment options

I understand that the difficulty comes from step-scoped beans to get job parameters. As mentioned previously, if you can share a github repo with a minimal example with the duplication in place, then I can try to see if we could refactor it with an abstract class that users can extend and provide only the SQL query and its arguments.

You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
status: waiting-for-reporter Issues for which we are waiting for feedback from the reporter
Converted from issue

This discussion was converted from issue #4439 on September 04, 2023 11:13.

AltStyle によって変換されたページ (->オリジナル) /