ebson

[Spring Batch 5] 배치 잡의 Step 사이에 변수 공유하기 본문

HANDS-ON

[Spring Batch 5] 배치 잡의 Step 사이에 변수 공유하기

ebson 2025. 4. 1. 11:43

Spring Batch 에서 Step 개발 

Spring Batch를 사용한 Batch Application에서 대부분의 경우에는 단일 Step으로 요구사항을 충족할 수 있다. 그러나 비즈니스 로직을 분리하면서 모듈화하고 가독성을 향상하거나, 큰 작업을 작은 작업들로 나누면서 재사용성을 향상하려는 경우, 또는 필요에 따라 새로운 Step을 추가하고 제거, 순서 변경하는 등 유연성과 확장성을 가지려는 경우에 여러 Step으로 Job을 구성할 수 있다. 

여러 Step으로 Job을 구성하고 개발하다보면 Step 사이에 변수를 공유해야 하는 경우가 생긴다. 다른 스텝에서의 계산 결과나 데이터베이스 쿼리 결과, API 호출 결과를 사용하거나, 앞선 스텝의 결과 값으로 조건 분기 처리하여 다음 스텝을 결정해야 하는 경우, 또는 여러 스텝에 걸쳐서 하나의 집계 작업을 수행하면서 다른 스텝의 중간 집계 결과를 사용하려는 경우 등이다. 

 

 

실무 요구사항 1 : Step에서 API 호출 결과 정제한 Read Item Count를 다른 Step과 공유 

실무에서는 단순히 Step에서 API를 호출한 결과의 값을 다른 Step에서도 사용해야 했다. 물류센터로 API를 호출하여 응답받은 당일 주문건에 대해 출고 송장이 등록된 데이터들을 정제해 csv 데이터로 생성, sftp 파일서버로 업로드 및 실패건수를 알림 발송하는 배치 잡이었다. 다시 말해서, API를 호출하고 응답값을 정제하는 A Step에서 저장된 Read Item Count를 실패건수 알림을 발송하는 B Step에서도 사용해야 했다. Read Item Count는 해당 컨텍스트에서 관리되는 값이다. 

 

 

Job Execution Context를 사용해 Step 사이에 값을 공유 

이와 같은 경우에 Spring Batch에서 관리하는 Job Execution Context에서 관리하는 Map에 키-값을 저장하면 여러 개의 Step들 사이에 변수를 공유할 수 있다. Batch Application은 대용량 처리에서 긴 작업 시간 때문에 문제가 발생하는 것을 방지할 수 있어야 한다. 그래서 Step을 여러 개로 나누어 독립적인 트랜잭션으로 처리하는 것이 좋다. 그리고 이렇게 Step을 나누면 Step들 사이에 상태를 공유하는 매커니즘이 필요한데, Spring Batch는 그 필요성에 의해 Job Execution Context를 제공한다. Job Execution Context를 사용하면 Job 내부에서 값을 저장하고 Step들 사이에 공유하면서 Step들 사이에 경계를 유지함과 동시에 데이터 공유 수준을 유지할 수 있다. 

 

public class JobExecution extends Entity {
    private final JobParameters jobParameters;
    private JobInstance jobInstance;
    private volatile Collection<StepExecution> stepExecutions;
    private volatile BatchStatus status;
	...
    private volatile ExitStatus exitStatus;
    private volatile ExecutionContext executionContext;
    private transient volatile List<Throwable> failureExceptions;

    public JobExecution(JobExecution original) {
        this.stepExecutions = Collections.synchronizedSet(new LinkedHashSet());
        this.status = BatchStatus.STARTING;
        this.startTime = null;
        this.createTime = LocalDateTime.now();
        this.endTime = null;
        this.lastUpdated = null;
        this.exitStatus = ExitStatus.UNKNOWN;
        this.executionContext = new ExecutionContext();
        this.failureExceptions = new CopyOnWriteArrayList();
        this.jobParameters = original.getJobParameters();
        this.jobInstance = original.getJobInstance();
        this.stepExecutions = original.getStepExecutions();
        this.status = original.getStatus();
        this.startTime = original.getStartTime();
        this.createTime = original.getCreateTime();
        this.endTime = original.getEndTime();
        this.lastUpdated = original.getLastUpdated();
        this.exitStatus = original.getExitStatus();
        this.executionContext = original.getExecutionContext();
        this.failureExceptions = original.getFailureExceptions();
        this.setId(original.getId());
        this.setVersion(original.getVersion());
    }
	
	...

    public ExecutionContext getExecutionContext() { 
        return this.executionContext;
    }
    
    ...

}

 

[코드 1] Spring Batch의 JobExecution.class 는 ExecutionContext를 가진다. 

 

public class ExecutionContext implements Serializable {
    private volatile boolean dirty;
    private final Map<String, Object> map;

    public ExecutionContext() {
        this.dirty = false;
        this.map = new ConcurrentHashMap();
    }

    ...

    public void put(String key, @Nullable Object value) {
        Object result;
        if (value != null) {
            result = this.map.put(key, value);
            this.dirty = result == null || !result.equals(value);
        } else {
            result = this.map.remove(key);
            this.dirty = result != null;
        }

    }

 	...

    @Nullable
    public Object get(String key) {
        return this.map.get(key);
    }

	... 

}


[코드 2] Spring Batch의 ExecutionContext.class는 여러 Step 사이에서 공유할 수 있는 Map<String, Object> map을 가진다. 


Spring Batch 에서 제공하는 Listener 

한편, Spring Batch에서는 배치 작업의 생명주기 동안에 특정 시점에서 콜백 로직을 수행할 수 있도록 Listener를 제공한다. 다시 말해서, Spring Batch가 제공하는 Listener를 사용하면 배치 작업의 잡 실행 전후, 스텝 실행 전후, 청크 실행 전후와 같은 특정 시점에 개발자가 원하는 로직의 코드를 신속하고 편리하게 정의하고 실행할 수 있다.

예를 들어, JobExecutionListener는 Job의 전체 생명주기 동안 이벤트를 추적하는 beforJob, afterJob 메서드를 제공하고 StepExecutionListener는 Step의 전체 생명주기 동안 이벤트를 추적하는 beforeStep, afterStep 메서드를, ChunkListener는 chunk의 처리 과정에서 이벤트를 추적하는 beforeChunk, afterChunk, afterChunkError 메서드를 제공한다. 

public interface JobExecutionListener {
    default void beforeJob(JobExecution jobExecution) {
    }

    default void afterJob(JobExecution jobExecution) {
    }
}

 

[코드 3] Spring Batch의 JobExecutionListener

public interface StepExecutionListener extends StepListener {
    default void beforeStep(StepExecution stepExecution) {
    }

    @Nullable
    default ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}


[코드 4] Spring Batch의 StepExecutionListener 

public interface ChunkListener extends StepListener {
    String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";

    default void beforeChunk(ChunkContext context) {
    }

    default void afterChunk(ChunkContext context) {
    }

    default void afterChunkError(ChunkContext context) {
    }
}


[코드 5] Spring Batch의 ChunkListener 


ExecutionContext와 StepExecutionListener를 사용해 Step사이에 값을 공유하기 

위와 같이 각각의 Listener들의 before, after 메서드는 Context를 가지는데, 각 Context는 상위 Context에 접근할 수 있다. 예를 들어, ChunkContext는 getStepContext를 가지고 StepContext는 getJobExectutionContext를 가진다. 다시 말해서 어떤 Listener를 사용해도 Job의 ExecutionContext에 접근할 수 있다. 그리고 Step 사이의 값을 공유하려면, StepExecutionListener의 afterStep메서드를 오버라이드하고 공유하려는 값을 ExecutionContext의 map에 저장하고 사용하면 된다. 

 

@Slf4j
@Component
public class ReadItemCountStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.STARTED) {
            log.info(">>>>> Step 시작 >>>>> step name : " + stepExecution.getStepName() + " start");
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info(">>>>> Step 종료 >>>>> step name : " + stepExecution.getStepName()  + " end "+ " execution time : "+ stepExecution.getEndTime());
        }
        stepExecution.getJobExecution().getExecutionContext().put(stepExecution.getStepName().concat(CodeVal.READ_COUNT), stepExecution.getReadCount());

        return new ExitStatus("stepListener exit");
    }

}

 

[코드 6] Spring Batch 에서 StepExecutionListener를 사용해 Step의 Read Item Count를 공유하는 코드 


// Step1 에서 ### API 호출 결과를 정제한 데이터 총 건수 조회
public Long getStep1ReadCount(ChunkContext chunkContext) {
    JobExecution jobExecution = chunkContext.getStepContext().getStepExecution().getJobExecution();
    return jobExecution.getExecutionContext()
            .getLong(CodeVal.##_LOGISTICS_OUTBOUND_INVOICE + CodeVal.STEP_1 + CodeVal.READ_COUNT);
}


[코드 7] 잎선 Step에서 Listener를 사용해 JobExecution의 ExecutionContext에 저장한 값을 다른 Step에서 사용하는 코드 

 

실무 요구사항 2 : JOB의 실행 시각이 포함된 파일 경로를 여러 Step 에서 공유 

여러 Step을 가지는 배치 잡 개발 실무에서 두번째 요구사항은 csv 파일 경로를 여러 Step 사이에 공유하는 것이었다. 대용량의 출고 상품 데이터를 처리하기 때문에 실행시간이 길어질 수 있었고 파일 경로에 실행 시각이 포함되기 때문에 파일 경로를 매 Step에서 새로 만들면 문제가 생길 수 있었다. 예를 들어, 자바 코드로 LocalDateTime.now()를 호출하는 경우, 10시 59분경에 Step A에서는 10을 추출하고 11시 00분 경에 Step B에서는 11을 추출하는 경우가 없도록 대응해야 했다. 일단은 자바 코드로 LocalDateTime.now()를 호출하지 않기로 했다. Jenkins 스케줄러에서 JobParameter으로 전달한 고정된 실행시각 값을 사용하기로 했다. 그리고 이 값을 사용해 Job이 시작하기 전 한번 파일 경로를 생성했다. 

 

JobListener의 beforeJob 메서드에서 생성한 파일 경로를 Step 사이에 공유하기 위한 다양한 방법이 있다. 첫번째는 파일경로를 Job Config 파일의 static 변수에 저장하는 것, 두번째는 파일경로 관리를 위한 전용 singleton 인스턴스를 생성해서 마찬가지로 static 변수에 저장하는 것, 세번째는 Job Parameter를 관리하는 클래스에서 filePath도 저장하는 것이다. static 변수를 사용하면 멀티 스레딩 환경에서 데이터가 불일치하게 되거나 메모리 누수가 발생할 수 있는 위험이 있다. 그리고 Job Parameter 관리용 클래스는 이미 Job의 하위 컨텍스트들 사이에서 값을 공유하기 위해 작성되었기 때문에 singleton 클래스를 추가할 필요가 없었다. 

 

@Slf4j
@JobScope
@RequiredArgsConstructor
public class OutboundInvoiceJobListener implements JobExecutionListener {

    private final OutboundInvoiceParameter parameter;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info(">>>>> Job 시작 >>>>> job name : " + jobExecution.getJobInstance().getJobName() + " start : " + LocalDateTime.now());

        // .csv 경로 생성
        String filePath = createFilePath();
        log.info("ReadItemCountStepListener^^filePath : {}", filePath);

		...
        
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
		...
	}

    // .csv 경로 생성
    public String createFilePath() {
        String releaseDate_HH = parameter.getReleaseDate().format(DefaultDateTimeFormat.DATE_NONE_DASH_FORMAT)
            .concat("_")
            .concat(parameter.getExecutionTime());

        // tracking_batch_yyyyMMdd_HH.csv (조회기준날짜_실행시각)
        return TRACKING_BATCH_CSV_PATH.concat(releaseDate_HH).concat(".csv");
    }

}

 

[코드 8] JobListener에서 Job Parameter를 관리하는 클래스로부터 releaseDate, executionTime 값을 추출해 filePath를 생성 

 

 

@Getter
@NoArgsConstructor
public class OutboundInvoiceParameter extends CommonParameter {

    @Value("#{ T(com.grit.batch.util.DateConverter).stringToDate(jobParameters[releaseDate])}")
    private LocalDate releaseDate;

    @Value("#{jobParameters[executionTime]}")
    private String executionTime;

    @Value("#{jobParameters[filePath]}")
    @Setter
    private String filePath;

}

 

[코드 9] jobParameters를 관리하는 클래스에 filePath 필드와 Setter를 추가 

 

 

 

 

맺음말 

이상으로 Spring Batch를 사용한 Batch Application에서 여러 Step으로 Job을 구성한 경우, StepExecutionListener와 ExecutionContext를 사용해 Step들 사이에 변수를 공유하는 방법에 대해서 살펴보았다. 그리고 job parameters를 관리하기 위한 클래스에서 Step에서 공유하기 위한 변수를 관리하는 방법을 살펴보았다.  Context와 각 단계의 이벤트를 추적하는 Listener를 사용하면 Spring Batch가 관리하는 생명 주기 내에서 각각 독립된 범위를 사용할 수 있을 뿐만 아니라 StepExecution에서 관리하는 read item count 같은 값을 여러 Step에서 공유할 수 있다. 그리고 job parameters는 job의 모든 하위 컨텍스트에서 사용 가능해야 하므로 이를 관리하는 클래스에서 Step 사이에 공유해야 하는 값들을 추가 관리하기에 적합하다. 

 

Comments