#
SPRING BATCH: Concurrent Steps
The tutorial explains how we can use concurrent steps in Spring Batch.
With Spring Batch
, you can define and run jobs
. The jobs may have a step or many steps
.
Typically, Batch Jobs are long-running, non-interactive and process large volumes of data, more than fits in memory or a single transaction. During a step, the job do something, a particular task, named tasklet. When you define a step you can define the next step you have to execute. A job could execute steps in a defined order.
We can have in Spring Batch: sequential steps
and parallel steps
. We can execute some tasks (steps) when a condition
is true or false. Because of these things, you can create workflows using Spring Batch.
When you define a step you have 2 models:
- run a simple task (tasklet)
- run a step using the following pattern READ-PROCESS-WRITE
This tutorial explains to you how to use the first approach.
In order to create a simple example with 2 or more concurrent steps, first you have to create a simple Maven Spring project with the pom.xml file having the following dependencies:
At this point it is supposed that the Job Repository has been configured already.
In order to see how to create a tasklet step
, you have to create a batch.properties
file with the following content :
package com.examples.tasklets;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyTasklet0 implements Tasklet{
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("MyTasklet#0 is started ...");
Thread.sleep(2000);
System.out.println("MyTasklet#0 is completed ...");
return RepeatStatus.FINISHED ;
}
}
package com.examples.tasklets;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyTasklet1 implements Tasklet{
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("MyTasklet#1 is started ...");
Thread.sleep(2000);
System.out.println("MyTasklet#1 is completed ...");
return RepeatStatus.FINISHED ;
}
}
package com.examples.tasklets;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyTasklet2 implements Tasklet{
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("MyTasklet#2 is started ...");
Thread.sleep(2000);
System.out.println("MyTasklet#2 is completed ...");
return RepeatStatus.FINISHED ;
}
}
package com.examples.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.examples.tasklets.MyTasklet0;
import com.examples.tasklets.MyTasklet1;
import com.examples.tasklets.MyTasklet2;
@EnableAsync
@Configuration
public class Job1 {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
MyTasklet0 taskletStep0;
@Autowired
MyTasklet1 taskletStep1;
@Autowired
MyTasklet2 taskletStep2;
@Bean
public Flow flow0() {
return new FlowBuilder<Flow>("flow0")
.start(steps.get("step0")
.tasklet(taskletStep0).build())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(steps.get("step1")
.tasklet(taskletStep1).build())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(steps.get("step2")
.tasklet(taskletStep2).build())
.build();
}
@Bean
public Job myJob() {
JobBuilder jobBuilder = jobs.get("Spring Batch: parallel execution");
return jobBuilder.start(flow0())
.split(new SimpleAsyncTaskExecutor()).add(flow1(), flow2()).build()
.build();
}
}
package com.examples.config;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
@Configuration
@EnableBatchProcessing
@ComponentScan("com.examples.*")
@PropertySource("classpath:/com/examples/batch.properties")
public class SpringBatchConfiguration {
@Autowired
private Environment env;
@Bean
public DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(env.getRequiredProperty("dataSource.url"));
dataSource.setDriverClassName(env.getRequiredProperty("dataSource.driverClassName"));
dataSource.setUsername(env.getRequiredProperty("dataSource.username"));
dataSource.setPassword(env.getRequiredProperty("dataSource.password"));
return dataSource;
}
}
package com.examples;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import com.examples.config.SpringBatchConfiguration;
public class Main {
public static void main(String[] args) throws Throwable {
ApplicationContext context = new AnnotationConfigApplicationContext(SpringBatchConfiguration.class);
JobRegistry jobRegistry = context.getBean("jobRegistry", JobRegistry.class);
JobLauncher jobLauncher = context.getBean("jobLauncher", JobLauncher.class);
JobRepository jobRepository = context.getBean("jobRepository", JobRepository.class);
System.out.println(" jobRegistry: "+jobRegistry);
System.out.println(" jobLauncher: "+jobLauncher);
System.out.println(" jobRepository: "+jobRepository);
Job job = context.getBean("myJob", Job.class);
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
}
}
When you run the Spring Batch with concurrent (parallel) steps for the first time you will see the following log:
but when you run the second time you will see something like this:
The step is completed and not restartable, so you cannot run it again.