일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- 스프링 배치 5
- mybatis
- 아이템 리더 페이징 처리
- 스프링배치 csv
- 트랜잭션 분리
- JSONArray 분할
- Spring Batch
- spring batch 5
- step 사이 변수 공유
- 마이바티스 트랜잭션
- 읽기 작업과 쓰기 작업 분리
- aop proxy
- api 아이템 리더
- spring batch 변수 공유
- JSONObject 분할
- job parameter
- flatfileitemwriter
- executioncontext 변수 공유
- executioncontext
- 아이템 리더 커스텀
- JSON 분리
- JSON 분할
- 스프링 트랜잭션 관리
- stepexecutionlistener
- 스프링배치 메타테이블
- step 여러개
- abstractpagingitemreader
- 선언적 트랜잭션 관리
- 스프링배치 엑셀
- step 값 공유
- Today
- Total
ebson
[Spring Batch 5] API 호출 Item Reader 커스텀과 csv 데이터 생성 본문
Spring Batch Application에서 사용할 수 있는 데이터소스들
대부분의 경우 Spring Batch를 사용한 배치 어플리케이션에서는 데이터소스로 RDBMS를 연동해 사용하기 때문에, MyBatis와 JPA를 사용할 수 있으면 된다. 직접 MyBatis와 JPA를 사용하는 코드를 작성할 수도 있고 Spring Batch가 제공하는 전용 Item Reader 클래스와 Item Writer 클래스를 사용할 수도 있다. ex) MyBatisCursorItemReader, MyBatisPagingItemReader, MybatisBatchItemWirter, JpaPagingItemReader, JpaItemWriter ...
그러나 배치 어플리케이션의 읽기 데이터소스가 항상 MyBatis와 JPA으로 접근 가능한 RDBMS의 데이터인 것은 아니다. API 호출 결과 JSON 데이터일 수도 있고 헤더와 로우를 갖는 csv 데이터일 수도 있다. Tasklet 내에서 API를 호출하고 csv 데이터를 처리하기 위한 라이브러리들을 가져와 구현할 수도 있지만, Spring Batch의 범용 Item Reader를 커스텀하거나 전용 Item Reader를 사용할 수 있다.
실무 요구사항: 출고 상품 데이터의 API 호출 결과를 데이터소스로 갖는 배치 작업
실무에서는 물류센터로 API를 호출해 당일 누적된 주문에 대한 출고 상품 데이터들을 응답 받고 정제하는 Item Reader가 필요했다. 주문 데이터는 RDBMS으로 저장되지만 물류센터에서 이 주문 데이터들을 가지고 실물 상품들을 포장하고 송장번호를 생성한 출고 상품 데이터를 API 호출해 사용해야 했다. Tasklet에서 HTTP Client를 사용해서 처리할 수도 있지만, API를 요청하고 응답결과를 정제하는 작업만 담당하는 전용 Item Reader를 사용하고 페이징 처리하기 위해 페이징 처리의 범용 Item Reader 클래스를 확장해 커스텀하기로 했다.
Spring Batch의 AbstractPagingItemReader.class
AbstractPagingItemReader는 이름대로 페이징 처리가 구현된 Item Reader이다. doReadPage 메서드에서 읽은 데이터들에 대해 doRead 메서드에서 페이지 사이즈, 페이지 번호와 현재 아이템을 검사하고(페이지의 마지막 아이템이면 페이지 번호를 증감한다.) 처리하면서 하나씩 반환한 결과를 Read 메서드에서 처리(현재 아이템 카운트 세팅)하고 읽어 Processor 또는 Writer으로 전달한다.
public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
protected Log logger = LogFactory.getLog(this.getClass());
private volatile boolean initialized = false;
private int pageSize = 10;
private volatile int current = 0;
private volatile int page = 0;
protected volatile List<T> results;
private final Lock lock = new ReentrantLock();
public AbstractPagingItemReader() {
this.setName(ClassUtils.getShortName(AbstractPagingItemReader.class));
}
public int getPage() {
return this.page;
}
public int getPageSize() {
return this.pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public void afterPropertiesSet() throws Exception {
Assert.state(this.pageSize > 0, "pageSize must be greater than zero");
}
@Nullable
protected T doRead() throws Exception {
this.lock.lock();
Object var2;
try {
if (this.results == null || this.current >= this.pageSize) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Reading page " + this.getPage());
}
this.doReadPage();
++this.page;
if (this.current >= this.pageSize) {
this.current = 0;
}
}
int next = this.current++;
if (next < this.results.size()) {
var2 = this.results.get(next);
return var2;
}
var2 = null;
} finally {
this.lock.unlock();
}
return var2;
}
protected abstract void doReadPage();
protected void doOpen() throws Exception {
Assert.state(!this.initialized, "Cannot open an already opened ItemReader, call close first");
this.initialized = true;
}
protected void doClose() throws Exception {
this.lock.lock();
try {
this.initialized = false;
this.current = 0;
this.page = 0;
this.results = null;
} finally {
this.lock.unlock();
}
}
...
}
[코드 1] AbstractPagingItemReader.class
public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {
private static final String READ_COUNT = "read.count";
private static final String READ_COUNT_MAX = "read.count.max";
private int currentItemCount = 0;
private int maxItemCount = Integer.MAX_VALUE;
private boolean saveState = true;
public AbstractItemCountingItemStreamItemReader() {
}
@Nullable
protected abstract T doRead() throws Exception;
protected abstract void doOpen() throws Exception;
protected abstract void doClose() throws Exception;
protected void jumpToItem(int itemIndex) throws Exception {
for(int i = 0; i < itemIndex; ++i) {
this.read();
}
}
@Nullable
public T read() throws Exception {
if (this.currentItemCount >= this.maxItemCount) {
return null;
} else {
++this.currentItemCount;
T item = this.doRead();
if (item instanceof ItemCountAware) {
((ItemCountAware)item).setItemCount(this.currentItemCount);
}
return item;
}
}
...
}
[코드 2] AbstractItemCountingItemStreamItemReader.class
다시 말해서, doReadPage 메서드에서 페이지 사이즈 만큼의 데이터를 읽도록 구현하면, 읽어들인 데이터를 페이징 처리하면서 doRead 메서드와 read 메서드를 차례로 호출한다. AbstractPagingItemReader 클래스를 extends하고 doReadPage 메서드를 구현하면 데이터소스가 무엇이든 그 읽어온 데이터들에 대한 페이징 처리 기능을 제공하는 Item Reader 클래스를 커스텀할 수 있다.
@Slf4j
public class ###APIPageItemReader<T> extends AbstractPagingItemReader<T> {
protected ###ApiFetcher ###ApiFetcher;
protected LocalDate apiReleaseDate;
public ###APIPageItemReader(int pageSize
, ###ApiFetcher ###ApiFetcher
, LocalDate apiReleaseDate) {
setPageSize(pageSize);
this.###ApiFetcher = ###ApiFetcher;
this.apiReleaseDate = apiReleaseDate;
}
@Override
protected void doReadPage() {
initResults();
/** ### API Call - 물류센터 WMS 등록된 당일 출고 데이터 조회 */
List<ReleasedProductApiResponse> releasedProductsForThreeDays = getReleasedProductsForThreeDays(apiReleaseDate);
/** 주문번호 필터링, 송장번호 필터링, 주문번호-송장번호 중복 제거 */
List<ReleasedProductApiResponse> filtered = releasedProductsForThreeDays
.stream()
.filter(product -> !product.getSalesOrderNumber().startsWith("C") && !product.getSalesOrderNumber().startsWith("A"))
.filter(product -> StringUtils.isNumberic(product.getTrackingNumber()))
.distinct()
.toList();
int totalSize = filtered.size();
int page = getPage() >= 0 ? getPage() : 1;
int pageSize = getPageSize();
int fromIndex = page * pageSize;
int toIndex = Math.min(fromIndex + pageSize, totalSize);
List<ReleasedProductApiResponse> pagedReleasedProductsForThreeDays
= new ArrayList<>(filtered.subList(fromIndex, toIndex));
for (ReleasedProductApiResponse releasedProductApiResponse : pagedReleasedProductsForThreeDays) {
results.add((T) releasedProductApiResponse);
}
}
protected void initResults() {
if (CollectionUtils.isEmpty(results)) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
}
/** releaseDate 당일치만 연동 */
public List<ReleasedProductApiResponse> getReleasedProductsForThreeDays(LocalDate releaseDate) {
PageResponse<ReleasedProductApiResponse> tempReleasedDay = ###ApiFetcher.getOrderReleaseBarcodeApiResponse(
releaseDate, 10, 1);
Long totalSize = tempReleasedDay.getTotalSize();
PageResponse<ReleasedProductApiResponse> AllReleasedDayResponse = ###ApiFetcher.getOrderReleaseBarcodeApiResponse(
releaseDate, totalSize.intValue(), 1);
List<ReleasedProductApiResponse> releasedProducts = AllReleasedDayResponse.getList();
return releasedProducts;
}
@Override
protected void doOpen() throws Exception {
...
}
@Override
protected void doClose() throws Exception {
...
}
}
[코드 3] AbstractPagingItemReader.class를 extends한 커스텀 클래스
이상은 doReadPage 메서드에서 API 호출 결과를 저장하고 주문번호와 송장번호를 필터링 및 중복 제거한 결과 리스트에서 페이지 번호와 페이지 사이즈에 해당하는 서브리스트를 추출하도록 구현한 것이다. 결과 리스트에 추가된 데이터들은 다음 단계 메서드에서 순서대로 처리된다. 이와 같이 API 호출 결과를 읽는 기능만을 담당하는 전용 클래스를 사용하면서 읽어들인 데이터를 페이징 처리 할 수 있다.
FlatFileItemWriter를 사용해 csv 데이터 생성하기
읽기 데이터소스가 RDBMS가 아닐 수 있는 것과 마찬가지로 쓰기 데이터소스도 RDBMS가 아닐 수 있다. csv 데이터를 생성하기 위해서 Spring Batch 에서는 전용 Item Writer 클래스를 제공한다. FlatFileItemWriterBuilder의 resource필드에서 생성할 csv 파일의 경로를 세팅하고 LineAggregator를 사용해 Item 데이터를 csv 데이터 로우로 생성한다. FlatFileHeaderCallback 을 implements 한 클래스를 세팅해 헤더를 추가할 수 있다.
@Bean(name="OutboundInvoiceStep1Writer")
@StepScope // .csv 데이터 생성
public FlatFileItemWriter<CSVRow> OutboundInvoiceStep1Writer() {
log.info("OutboundInvoiceStep1Writer started ... ");
String releaseDate_HH = parameter.getReleaseDate().format(DefaultDateTimeFormat.DATE_NONE_DASH_FORMAT)
.concat("_")
.concat(parameter.getExecutionTime());
// tracking_batch_yyyyMMdd_HH.csv (조회기준날짜_실행시각)
String filePath = TRACKING_BATCH_CSV_PATH.concat(releaseDate_HH).concat(".csv");
getInstance().setFilePath(filePath);
return new FlatFileItemWriterBuilder<CSVRow>()
.name("OutboundInvoiceStep1Writer")
.resource(new FileSystemResource(filePath))
.encoding("UTF-8")
.delimited()
.delimiter("\t")
.names("orderNo", "invoiceNo")
.lineAggregator(new OutboundInvoiceLineAggregator())
// .headerCallback(new OutboundInvoiceHeader()) // .csv 헤더 제거
.build();
}
[코드 4] FlatFileItemWriter를 사용해 csv 데이터를 생성
public class OutboundInvoiceLineAggregator implements LineAggregator<CSVRow> {
@Override
public String aggregate(CSVRow item) {
return item.getOrderNo() + "," + item.getInvoiceNo();
}
}
[코드 5] LineAggregator를 사용해 Item을 csv 로우로 변환
public class OutboundInvoiceHeader implements FlatFileHeaderCallback {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write("OrderNumber,TrackingNumber");
}
}
[코드 6] FlatFileHeaderCallback을 사용해 csv 데이터의 헤더 추가
맺음말
이상으로 Spring Batch 5를 사용한 배치 어플리케이션에서 API 호출 결과를 데이터소스로 하는 Item Reader를 커스텀하고 csv 데이터를 생성하기 위해 Spring Batch 에서 제공하는 FlatFileItemWriter를 사용하는 방법에 대해서 알아보았다. 배치 어플리케이션을 사용하는 여러가지 비즈니스 요구사항을 대응하기 위해서는 RDBMS를 데이터소스로 사용하는 코드 뿐만 아니라 다양한 데이터소스를 사용하는 코드를 구현할 수 있어야 할 것이다.
'HANDS-ON' 카테고리의 다른 글
[Spring Boot] DB 트랜잭션을 가지는 API에서 리스트를 처리하면서 실패한 요소만 롤백하기 (0) | 2025.04.01 |
---|---|
[Spring Batch 5] 배치 잡의 Step 사이에 변수 공유하기 (0) | 2025.04.01 |
[ Spring Framework Transaction, LG CNS DEVON-Framework batch-insert ] 데브온 프레임웍의 배치인서트 사용해 성능 개선하기 (0) | 2023.04.06 |
[ 스프링 부트 배치 속도개선 ] 마이바티스 배치업데이트, 스프링 HTTP API 비동기 요청 적용해 배치 잡 속도 개선하기 (0) | 2023.04.03 |
[JSON 라이브러리 pull request] JSON 분리 기능 개발해보기 (0) | 2023.02.24 |