#SPRING BATCH: Concurrent Steps

â—Ź
In 
â—Ź
Published 2022-12-03

The tutorial explains how we can use concurrent steps in Spring Batch.

With Spring Batch, you can define and run jobs. The jobs may have a step or many steps.

Typically, Batch Jobs are long-running, non-interactive and process large volumes of data, more than fits in memory or a single transaction. During a step, the job do something, a particular task, named tasklet. When you define a step you can define the next step you have to execute. A job could execute steps in a defined order.

We can have in Spring Batch: sequential steps and parallel steps. We can execute some tasks (steps) when a condition is true or false. Because of these things, you can create workflows using Spring Batch.

When you define a step you have 2 models:

  1. run a simple task (tasklet)
  2. run a step using the following pattern READ-PROCESS-WRITE

This tutorial explains to you how to use the first approach.

In order to create a simple example with 2 or more concurrent steps, first you have to create a simple Maven Spring project with the pom.xml file having the following dependencies:

At this point it is supposed that the Job Repository has been configured already.

In order to see how to create a tasklet step, you have to create a batch.properties file with the following content :

package com.examples.tasklets; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Configuration; @Configuration public class MyTasklet0 implements Tasklet{ @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("MyTasklet#0 is started ..."); Thread.sleep(2000); System.out.println("MyTasklet#0 is completed ..."); return RepeatStatus.FINISHED ; } }
package com.examples.tasklets; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Configuration; @Configuration public class MyTasklet1 implements Tasklet{ @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("MyTasklet#1 is started ..."); Thread.sleep(2000); System.out.println("MyTasklet#1 is completed ..."); return RepeatStatus.FINISHED ; } }
package com.examples.tasklets; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Configuration; @Configuration public class MyTasklet2 implements Tasklet{ @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("MyTasklet#2 is started ..."); Thread.sleep(2000); System.out.println("MyTasklet#2 is completed ..."); return RepeatStatus.FINISHED ; } }
package com.examples.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.examples.tasklets.MyTasklet0; import com.examples.tasklets.MyTasklet1; import com.examples.tasklets.MyTasklet2; @EnableAsync @Configuration public class Job1 { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Autowired MyTasklet0 taskletStep0; @Autowired MyTasklet1 taskletStep1; @Autowired MyTasklet2 taskletStep2; @Bean public Flow flow0() { return new FlowBuilder<Flow>("flow0") .start(steps.get("step0") .tasklet(taskletStep0).build()) .build(); } @Bean public Flow flow1() { return new FlowBuilder<Flow>("flow1") .start(steps.get("step1") .tasklet(taskletStep1).build()) .build(); } @Bean public Flow flow2() { return new FlowBuilder<Flow>("flow2") .start(steps.get("step2") .tasklet(taskletStep2).build()) .build(); } @Bean public Job myJob() { JobBuilder jobBuilder = jobs.get("Spring Batch: parallel execution"); return jobBuilder.start(flow0()) .split(new SimpleAsyncTaskExecutor()).add(flow1(), flow2()).build() .build(); } }
package com.examples.config; import javax.sql.DataSource; import org.apache.commons.dbcp2.BasicDataSource; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; @Configuration @EnableBatchProcessing @ComponentScan("com.examples.*") @PropertySource("classpath:/com/examples/batch.properties") public class SpringBatchConfiguration { @Autowired private Environment env; @Bean public DataSource dataSource() { BasicDataSource dataSource = new BasicDataSource(); dataSource.setUrl(env.getRequiredProperty("dataSource.url")); dataSource.setDriverClassName(env.getRequiredProperty("dataSource.driverClassName")); dataSource.setUsername(env.getRequiredProperty("dataSource.username")); dataSource.setPassword(env.getRequiredProperty("dataSource.password")); return dataSource; } }
package com.examples; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.examples.config.SpringBatchConfiguration; public class Main { public static void main(String[] args) throws Throwable { ApplicationContext context = new AnnotationConfigApplicationContext(SpringBatchConfiguration.class); JobRegistry jobRegistry = context.getBean("jobRegistry", JobRegistry.class); JobLauncher jobLauncher = context.getBean("jobLauncher", JobLauncher.class); JobRepository jobRepository = context.getBean("jobRepository", JobRepository.class); System.out.println(" jobRegistry: "+jobRegistry); System.out.println(" jobLauncher: "+jobLauncher); System.out.println(" jobRepository: "+jobRepository); Job job = context.getBean("myJob", Job.class); JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); } }

When you run the Spring Batch with concurrent (parallel) steps for the first time you will see the following log:

but when you run the second time you will see something like this:

The step is completed and not restartable, so you cannot run it again.