Spring Batch Step 파헤쳐보기

Step 개념

  • Batch Job 을 구성하는 독립적인 단계로 하나의 Job은 하나 이상의 Step으로 구성됩니다.
  • Spring Batch Job 구현체중 하나인 SimpleJob을 까보면 Job 은 내부 멤버변수로 Step List를 들고 있음을 확인할 수 있습니다.
    1
    2
    3
    4
    public class SimpleJob extends AbstractJob {
    private List<Step> steps = new ArrayList<>();
    //...
    }
    [2] Spring Batch Step

StepExecution 이란?

  • Step에 대한 한번의 시도를 의미하는 객체로서 Step 실행 중에 발생한 정보들을 저장하고 있는 객체입니다.
  • DB의 BATCH_STEP_EXECUTION 테이블과 1:1로 매핑됩니다.
  • Job이 실패해서 재수행하는 경우에는 실패한 Step에 대해서만 재시작하고, 성공한 Step은 생략합니다. ( 하지만 allowStartIfComplete 설정값을 변경하면 성공한 Step도 재시작되게 변경할 수 있습니다 )

예를 들어 Step 1,2,3이 존재하는데 Step2번이 실행중에 실패한다면 Step 1은 성공 , Step 2는 실패처리 됩니다. 그리고 Step 3는 실행되지 않습니다.
이 상태에서 Job을 재시작하면 Step2부터 재시작하여 Step2,Step3 가 수행됩니다.

  • Job을 구성하는 모든 Step의 실행 정보인 StepExecution이 완료 처리되어야만 JobExecution이 완료처리됩니다.

즉 Spring Batch 도메인 용어를 정리하면 하나의 Job은 여러개의 Step으로 구성되고, Job이 JobParameter를 주입받아 실행되는 객체가 JobInstance객체입니다.

JobInstance를 실행한 정보가 JobExecution이고, Job이 실행되면서 Job을 구성하는 Step들의 실행정보가 StepExecution입니다.

[1] Spring Batch Job과 Step에 관련된 도메인 객체들

Step간 데이터 공유하기 - ExecutionContext

  • ExecutionContext를 활용하면 Job내 Step간 데이터를 공유할 수 있습니다. 혹은 실패한 Step에서 Step 재시작시 실패 이전까지 작업했던 상태값들을 가져올 수 있습니다.
  • ExecutionContext는 Spring Batch에서 관리하는 key-value (Map) 컬렉션입니다.
  • StepExecution, JobExecution 객체의 멤버변수로 선언되고 , 각각 DB의 BATCH_JOB_EXECUTION_CONTEXT , BATCH_STEP_EXECUTION_CONTEXT 테이블에 1:1 매핑됩니다.
  • StepExecutionExecutionContext는 Step안에서만 공유됩니다. 즉 특정 Step에서만 접근이 가능합니다. 실패한 Step이 재시작된 경우도 이전까지 작업한 내용을 불러들일수 있습니다.
  • JobExeuctionExecutionContext는 모든 Step안에서 공유됩니다.
ExecutionContext

예시 코드

아래와 같이 ExecutionContextStepContribution 또는 ChunkContext를 통해 접근하고, 값을 넣어줄 수 있습니다.
넣어준 값은 BATCH_JOB_EXECUTION_CONTEXT , BATCH_STEP_EXECUTION_CONTEXT 테이블에 각각 직렬화되어 저장됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ExecutionContextTasklet1 implements Tasklet {

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {

ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
ExecutionContext stepExecutionContext = contribution.getStepExecution().getExecutionContext();

jobExecutionContext.put("jobName", "developer"); //1. BATCH_JOB_EXECUTION_CONTEXT 에 저장 (모든 STEP에서 공유)
stepExecutionContext.put("stepName", "software"); // 2. BATCH_STEP_EXECUTION_CONTEXT 에 저장 (특정 STEP에서 공유)

return RepeatStatus.FINISHED;
}
}
BATCH_JOB_EXECUTION_CONTEXT 테이블
BATCH_STEP_EXECUTION_CONTEXT 테이블

Spring Batch에서 제공하는 Step 구현체(5)

  • Step 인터페이스를 AbstractStep이라는 추상 클래스에서 구현하고,AbstractStep 추상클래스를 구현하는 구조입니다.
  • Batch에서 제공하는 Step의 구현체는 아래와 같은 5개의 구현체가 존재합니다.
  1. Tasklet Step
  2. Partition Step
  3. Job Step
  4. Flow Step
  5. Decision Step
Spring Batch에서 제공하는 Step 구현체 종류

TaskletStep

  • RepeatTemplate을 사용해서 Tasklet 코드 block을 트랜잭션 경계 내(성공시 커밋,실패시 롤백)에서 반복해서 실행합니다.
    언제까지 반복해서 실행할것인가에 대한 판단은 Tasklet 객체에서 반환하는 RepeatStatus값에 의해 결정됩니다. RepeatStatus.FINISHED 와 같이 특정 RepeatStatus를 반환할떄까지 계속해서 실행합니다.

  • TaskletStep은 아래와 같은 tasklet 인터페이스를 구현하는 tasklet 구현체를 멤버변수로 가지고 있습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * Strategy for processing in a step.
    */
    public interface Tasklet {

    /**
    * @return an {@link RepeatStatus} indicating whether processing is
    * continuable. Returning {@code null} is interpreted as {@link RepeatStatus#FINISHED}
    *
    * @throws Exception thrown if error occurs during execution.
    */
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;

    }
  • TaskletStep의 코드를 확인해보면 Tasklet 호출횟수를 조정하기 위한 RepeatTemplate 와 실행되야할 작업 그 자체 (tasklet) 구현체를 멤버변수로 들고있는것을 확인할 수 있습니다.

1
2
3
4
5
6
7
public class TaskletStep extends AbstractStep {

private RepeatOperations stepOperations = new RepeatTemplate();
//...
private Tasklet tasklet;
//...
}
  • 자주 사용되는 tasklet 구현체는 Spring Batch에서 이미 구현해놓았습니다. 이 중 ChunkOrientedTasklet구현체을 활용해 Chunk 단위로 배치 작업을 쪼개서 처리할 수 있습니다.
    Spring Batch에서 제공하는 Step 구현체 종류

Chunk 기반과 Task 기반

  • Spring Batch에서 Tasklet Step의 실행 단위는 크게 Chunk기반과 Task기반으로 2가지입니다.

Chunk 기반 Tasklet의 특징

  • 하나의 큰 덩어리를 n개씩 쪼개서 실행한다는 의미로 데이터 대용량 처리용으로 설계되었습니다.
  • ItemReader, ItemProcessor, ItemWriter를 사용합니다.
  • Spring Batch에서는 Chunk기반 Tasklet 실행을 위해 ChunkOrientedTasklet 구현체를 제공합니다.
    1
    2
    3
    4
    5
    6
    7
    8
    public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
    .<String, String>chunk(2)
    .reader(new ListItemReader<>(List.of("a", "b", "c", "d", "e", "f")))
    .processor((ItemProcessor<String, String>) String::toUpperCase)
    .writer(items -> items.forEach(System.out::println))
    .build();
    }

Chunk 작업의 Buffering - StepContribution

  • Chunk Process의 변경 사항을 버퍼링 한 뒤, StepExecution 상태를 업데이트하는 도메인 객체입니다.

  • ItemReader, ItemWriterreadCount , writeCount와 같은 Chunk내 작업 정보들을 임시적으로 들고 있다가 Chunk Commit 직전에 update해주는 역할입니다.

  • 실제로 Chunk 작업 결과를 Buffering했다가 update해주는 코드는 StepExecution.apply method를 호출하면서 발생합니다. 아래 로직을 확인해보면 StepContribution의 상태값들을 StepExecution 에 누적시켜주는것을 확인할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class StepExecution {
//...
/**
* On successful execution just before a chunk commit, this method should be
* called. Synchronizes access to the {@link StepExecution} so that changes
* are atomic.
*
* @param contribution {@link StepContribution} instance used to update the StepExecution state.
*/
public synchronized void apply(StepContribution contribution) {
readSkipCount += contribution.getReadSkipCount();
writeSkipCount += contribution.getWriteSkipCount();
processSkipCount += contribution.getProcessSkipCount();
filterCount += contribution.getFilterCount();
readCount += contribution.getReadCount();
writeCount += contribution.getWriteCount();
exitStatus = exitStatus.and(contribution.getExitStatus());
}
//...
}

Task 기반 Tasklet의 특징

  • Chunk 기반으로 작업을 n개로 쪼개서 처리하기보다, 단일 작업 기반이 더 효율적인 경우 사용됩니다.

  • 주로 Tasklet 인터페이스 구현체를 만들어서 사용합니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public Step taskStep(){
    return stepBuilderFactory.get("taskStep")
    .tasklet(new CustomTasklet())
    .listener(new StepExecutionListener() {
    @Override
    public void beforeStep(StepExecution stepExecution) {}

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {return null;}
    }) // 1. Step 실행전과 후에 수행해야할 콜백 로직 설정
    .build();
    }
  • Chunk Buffering

Step API - startLimit / allowStartIfComplete

  • Step 재시작과 관련된 API로 startLimit , allowStartIfComplete API가 존재합니다.

startLimit API

  • step 실패시 step이 재수행될 수 있는데, 이떄 재수행횟수를 제한하는 API입니다.
  • 예를 들어서 startLimit 값을 3으로 주게 되면 3번까지만 step 재시작이 허용됩니다. 이후에는 org.springframework.batch.core.StartLimitExceededException 예외가 터집니다.
  • default값은 Integer.MAX_VALUE로 사실상 제한이 없습니다.

설정 예시

  • 아래 테스트 케이스는 startLimit을 3으로 주었을떄입니다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public Step step(){
    return stepBuilderFactory.get("step")
    .tasklet((contribution, chunkContext) -> {
    System.out.println("step execute");
    throw new RuntimeException();
    })
    .startLimit(3)
    .build();
    }
  • tasklet 로직에서 의도적으로 RuntimeException을 던지고 Job을 여러번까지 재시작하면 3번까지는 BATCH_JOB_STEP_EXECUTION테이블에 저장됩니다.
startLimit 3으로 설정하고 STEP 재시작시 BATCH_STEP_EXECUTION 테이블 상태
  • 이후에 step은 재시작하더라도 아래와 같은 예외가 던져지게 됩니다.
    1
    org.springframework.batch.core.StartLimitExceededException: Maximum start limit exceeded for step: stepStartMax: 3
  • 주의해야할 부분은 해당 API와 설정과 무관하게 Job Execution은 계속해서 일어나게 됩니다. 즉 BATCH_JOB_EXECUTION 테이블의 행은 누적됩니다. 즉 Step 재시작이 안되는 것 뿐 입니다.
    아래 예시를 확인해보면 설정된 startLimit값 (3) 이후로는 StartLimitExceptionEXIT_MESSAGE로 기록되고 있습니다.
    startLimit 3으로 설정하고 STEP 재시작시 BATCH_JOB_EXECUTION 테이블 상태

allowStartIfComplete API

  • step 실패시 step이 재시작될 수 있는데, 이떄 기존에 완료된 step은 재시작할지 말지를 결정하는 api입니다.
  • 기본값은 false로 완료된 step은 재시작을 수행하지 않습니다. 만약 true로 주게 된다면 이미 COMPLETE된 step이더라도 항상 재시작합니다.

설정 예시

  • 아래 테스트 케이스는 Job을 첫번쨰 step은 allowStartIfComplete값을 true로 준 step이고, 두번째 step은 항상 실패하는 step입니다.
    즉 두번쨰 step에서 항상 실패함으로 step1은 allowStartIfComplete값이 true라면 같이 재시작됩니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Bean
    public Job batchJob() {
    return this.jobBuilderFactory.get("Job")
    .start(restartAllowableStep())
    .next(alwaysFailStep())
    .build();
    }
    @Bean
    public Step restartAllowableStep(){
    return stepBuilderFactory.get("restartAllowableStep")
    .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
    .allowStartIfComplete(true)
    .build();
    }
    @Bean
    public Step alwaysFailStep(){
    return stepBuilderFactory.get("alwaysFailStep")
    .tasklet((contribution, chunkContext) -> {
    throw new RuntimeException();
    })
    .startLimit(3)
    .build();
    }
  • 5번 Job을 재시작해보면 allowStartIfCompletetrue로 준 첫번쨰 step은 이미 step이 완료되었음에도 불구하고 항상 재시작 되는 것을 확인할 수 있습니다.

  • 항상 실패하는 두번쨰 step은 startLimit 값을 초과하게 되면 재시작되지 않고, 첫번쨰 step만 계속해서 재시작됩니다.

    allowStartIfComplete값을 true로 설정하고 STEP 재시작시 BATCH_STEP_EXECUTION 테이블 상태

언제 allowStartIfComplete API를 사용할 수 있을까?

  • 시점에 따라 데이터 값이 변경될 수 있어 데이터 유효성 점검을 수행하는 step의 경우에는 이전에 성공했던 step이더라도 재실행이 필요할 것입니다.

JobStep

  • Job을 포함하고 있는 Step입니다. 즉, Step안에 또 다른 Job이 있습니다.
  • JobStep안에 있는 Job이 실패하면, 해당 JobStep을 포함하고 있는 Job 역시 당연히 실패합니다.
  • Job 실행 관련 메타 데이터는 JobStep안에 Job도 별도로 DB에 저장됩니다.
  • 커다란 Job 하나를 작은 Job으로 쪼개서 관리하고자 할 때 사용될 수 있습니다.

설정 예시

  • 먼저 가장 상위 Job을 설정하고, 하위에 step을 JobStep 으로 구성합니다.
  • JobStep에 들어갈 Job이 사용할 parameter를 정의하기 위해서 parametersExtractor를 정의합니다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Bean
    public Job parentJob(Step jobStep) {
    return this.jobBuilderFactory.get("parentJob") // 1. 가장 상위의 Job입니다.
    .start(jobStep)
    .next(parentStep())
    .build();
    }

    @Bean
    public Step jobStep(JobLauncher jobLauncher) {
    return stepBuilderFactory.get("jobStep") // 2. JobStep 입니다.
    .job(childJob()) // 3. JobStep이 포함할 Job을 명시합니다.
    .launcher(jobLauncher) // 4. 해당 Job이 사용할 JobLauncher를 명시합니다.
    .parametersExtractor(jobParmetersExtractor()) // 5. 해당 Job에 넘겨줄 파라미터를 어떻게 가져올것인지 정의합니다.
    .listener(new StepExecutionListener() { // 6. step 실행 전후로 실행할 콜백메소드를 정의합니다.
    @Override
    public void beforeStep(StepExecution stepExecution) {
    stepExecution.getExecutionContext().putString("name", "chansoo");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
    }
    })
    .build();
    }
  • Spring Batch에서는 parametersExtractor 인터페이스의 구현체인 DefaultJobParametersExtractor를 제공합니다.
  • DefaultJobParametersExtractor는 부모 Job의 parameter와 Step의 parameter를 가져올 수 있습니다. 이떄 가져올 parameter의 key값을 설정해줄 수 있습니다.
  • 위 예시 코드에서는 StepExecutionListener에서 JobStep이 실행되기전에 name parameter를 설정해서 넘겨주고 있습니다. 이럴때는 아래와 같이 key를 name으로 주면 설정된 value값을 들고 올 수 있습니다.
    1
    2
    3
    4
    5
    6
    // stepExecutionContext에 있는 key를 찾아서 job에게 값을 넘겨줄 수 있습니다.
    private JobParametersExtractor jobParmetersExtractor() {
    DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
    extractor.setKeys(new String[]{"name"}); // 1. name key를 가진 parameter가 존재하는 경우 Job에 넘겨준다.
    return extractor;
    }

배치 작업이 부모 Job,JobStep의 Job모두 성공한 경우가 아니라 JobStep의 Job만 성공한 case에는 어떻게 될까요? 아래와 같이 각각 3개의 케이스로 나눠볼 수 있을것입니다.

  1. 부모 Job, JobStep의 Job 모두 성공된 경우
  2. 부모 Job 실행 중 JobStep의 Job이 실패된 경우
  3. 부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우

부모 Job, JobStep의 Job 모두 성공된 케이스

  • 우선 성공/실패 케이스와 별개로 BATCH_JOB_INSTANCE 테이블에는 부모 Job와 JobStep의 Job 모두 데이터가 들어갑니다.
JobStep 구성한 Job의 BATCH_JOB_INSTANCE 테이블 상태
  • JobStep의 Job또한 별개의 Job Instance로 관리되기 때문에 BATCH_JOB_EXECUTION 테이블의 레코드도 2개가 생성됩니다.
JobStep 구성한 Job의 BATCH_JOB_EXECUTION 테이블 상태
JobStep 구성한 Job의 BATCH_STEP_EXECUTION 테이블 상태

부모 Job 실행 중 JobStep의 Job이 실패된 케이스

  • JobStep의 Job이 실패되면 부모의 Job도 실패처리가 됩니다. 즉 모든 Job이 실패된 것으로 간주됩니다.

아래와 같이 JobStep의 job의 step에서 무조건 RuntimeException을 던져서 실패하도록 설정하였습니다.

1
2
3
4
5
6
7
8
@Bean
public Step childStep() {
return stepBuilderFactory.get("childStep")
.tasklet((contribution, chunkContext) -> {
throw new RuntimeException("child job failed");
})
.build();
}
  • job 실행 후 BATCH_JOB_EXECUTION테이블을 확인해보면 부모 Job까지 모두 실패처리된것을 확인할 수 있고 부모 Job은 org.springframework.batch.core.UnexpectedJobExecutionException: Step failure: the delegate Job failed in JobStep. 예외가 던져집니다.
JobStep 구성한 Job의 BATCH_JOB_EXECUTION 테이블 상태
  • BATCH_STEP_EXECUTION테이블을 확인해보면 부모 Job의 다음 step은 실행처리가 안되고, JobStep의 job의 step까지만 처리하다가 실패된것으로 기록됩니다.
JobStep 구성한 Job의 BATCH_STEP_EXECUTION 테이블 상태

부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우

  • JobStep의 Job은 별개의 JobInstance로 취급되기 때문에 COMPLETED상태로 기록되고, 부모 Job만 FAILED상태로 기록됩니다. 즉 JobStep의 Job은 성공처리됩니다.
    BATCH_JOB_EXECUTION 테이블을 확인해보면 JobStep의 job은 성공처리된것을 확인할 수 있습니다.
JobStep 구성한 Job의 BATCH_JOB_EXECUTION 테이블 상태
JobStep 구성한 Job의 BATCH_STEP_EXECUTION 테이블 상태

Reference

[1] https://docs.spring.io/spring-batch/docs/current/reference/html/domain.html
[2] https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#configureStep

Comments