ebson

[ 스프링 부트 배치 속도개선 ] 마이바티스 배치업데이트, 스프링 HTTP API 비동기 요청 적용해 배치 잡 속도 개선하기 본문

HANDS-ON

[ 스프링 부트 배치 속도개선 ] 마이바티스 배치업데이트, 스프링 HTTP API 비동기 요청 적용해 배치 잡 속도 개선하기

ebson 2023. 4. 3. 16:46

외부 API ENDPOINT 으로 JSON, FORM DATA 등의  데이터셋을 POST 요청하여 성공/실패 응답을 받고 처리하는 것 만큼이나 GET 요청을 통해 데이터셋을 받아 정제하고 사용하는 것도 실무에서 만날 수 있는 흔한 요구사항이다. 그리고 GET 요청을 통해 받는 데이터셋이 JSON 포맷인 경우도 많다. 그래서 외부 API ENDPOINT으로 HTTP GET 요청을 효율적으로 실행하며 결과 JSON 데이터셋을 용도에 맞게 수집, 정제, 활용하는 작업이 필요할 수 있다.

 

내가 실무에서 만난 작업은 데이터베이스의 특정 테이블의 정보를 바탕으로 brightcove 에서 제공하는 비디오정보 데이터셋을 특정 기간내 조회하여 데이터베이스의 특정 테이블에 해당 비디오정보의 상태정보를 업데이트 하는 배치 잡이었다. 그리고 변경된 API ENDPOINT를 적용하는 요구사항이 있었다.

 

brightcove으로 비디오 데이터를 요청하려면 비디오아이디 같은 파라미터들과 계정 아이디 뿐만 아니라 액세스 토큰이 필요하다. 그래서 API ENDPOINT가 변경되기 전에는 OAuthClient를 사용해 brightcove에서 제공하는 토큰 발급 서버에 요청하여 토큰을 발급받고 Authorization header에 추가해야 했다. 특히, HttpURLConnection을 사용해 블로킹방식으로 비디오정보 GET HTTP 요청을 한 결과 데이터를 바탕으로 테이블을 업데이트하는 방식이었다.

 

이 방식은 로컬호스트에서 테스트해본 결과, 30분 이상이 소요되었다. 테이블에서 조회한 약 2500개 이상의 비디오 아이디별로 각각 brigtcove 으로 데이터를 요청하고, 정제하고 처리해야 했기 때문이다. 그리고 동기방식이었기 때문에 하나의 HTTP CALL이 시작되면 무조건 끝날때까지 기다리고 다음 HTTP CALL을 순차적으로 실행해야 했다. 테이블 업데이트도 순차적으로 한건씩 업데이트하는 방식이었다.

 

이와 같은 방식은 가독성이 좋고 정확성을 높일 수도 있지만 400여 개의 배치 잡을 대상으로 테스트를 진행해야 하는 상황이었기 때문에 테스트 시간이 30분 이상 소요되면 부담되었다. 

 

게다가 변경된 API ENDPOINT으로 요청하는 경우에는 brightcove으로 요청하는 API으로 간접요청하는 방식이라 액세스 토큰이 필요없지만 추가 요구사항으로서 로케일코드가 추가되야 했고 이것은 더 많은 HTTP CALL을 필요로 했다. 테이블에서 조회되는 비디오 아이디-로케일코드 쌍은 약 4700개로 2배 가까이 더 많았고 테스트해본 결과 실행시간은 약 120분으로 4배 더 필요했다.

 

 

마이바티스 배치 업데이트 적용하기

 

그래서 최대한 기존 로직을 유지하면서 테스트시간을 단축시킬 방법을 찾기로 했다. 먼저, brightcove으로 요청한 데이터셋을 바탕으로 테이블을 업데이트하는 다른 비슷한 잡에서 마이바티스 배치업데이트를 적용해 약 25% 속도가 개선된 사례가 있었기 때문에, 이번 잡에도 마이바티스 배치업데이트를 적용했다. 마이바티스 배치업데이트란 자바 소스에서 for문을 돌면서 VO 파라미터를 전달해 한건씩 업데이트하는 방식 대신, SQL 문에서 List 파라미터를 전달받아 foreach 문을 순회하면서 List 사이즈만큼의 개수의 업데이트문을 한번에 실행하는 것이다. 

 

<update id="updateThumbNailUrlList" parameterType="java.util.List" >
    <foreach collection="list" item="item" index="index" open="" close="" separator=";">
        UPDATE COMPANY_MGR_PUBL.CST_VIDEO_M
        SET THUMBNAIL_URL_ADDR  = CASE WHEN #{item.thumbnailUrlAddr} = 'null' THEN null
        ELSE #{item.thumbnailUrlAddr}
        END
        , DURATION_TIME       = #{item.duration}
        , THUMBNAIL_FLAG      = 'Y'
        , LAST_UPDATE_USER_ID = #{item.batchId}
        , LAST_UPDATE_DATE    = NOW()
        WHERE VIDEO_ID  = #{item.id}
    </foreach>
</update>

[ 소스1 ] 마이바티스 배치업데이트 적용예시 1

<update id="updateStatusList" parameterType="java.util.List">
   <foreach collection="list" item="item" index="index" open="" close="" separator=";">
      UPDATE
      COMPANY_MGR_PUBL.CST_VIDEO_M
      <if test="item.complete != null and item.complete == true">
         SET VIDEO_STATUS_CODE   = 'COMPLETE'
         , ERROR_MESSAGE_DESC  = ''
      </if>
      <if test="item.complete == null or item.complete == ''">
         SET VIDEO_STATUS_CODE   = 'ERROR'
         , ERROR_MESSAGE_DESC  = 'false'
      </if>
      , LAST_UPDATE_USER_ID = #{item.batchId}
      , LAST_UPDATE_DATE    = NOW()
      WHERE VIDEO_ID  = #{item.videoId}
      AND LOCALE_CODE = #{item.localeCode}
   </foreach>
</update>

[ 소스2 ] 마이바티스 배치업데이트 적용예시 2

 

자바 소스에서 for문을 도는 것보다, 데이터베이스 솔루션 내에서 여러건을 간격없이 호출하는 것이 더 빠르게 처리되었다. 위 배치업데이트 적용예시 1의 경우, 약 1200여개의 데이터셋 리스트로 테스트해본 결과 전자가 후자보다 약 25% 빠른 결과(4m -> 3m)를 보였다. 마이바티스 배치업데이트 적용예시 2 의 배치 잡의 경우에는 비동기 논블로킹 요청을 함께 적용한 후 테스트했기 때문에 얼만큼의 속도 향상이 있는 것인지는 모르겠지만, vo 대신 list를 전달하는 동일한 방식이기 때문에 비슷한 향상률이었을 것이라고 생각한다.

 

 

논블로킹 비동기 요청방식 적용하기

 

마이바티스 배치업데이트 적용예시 2에 해당하는 잡의 경우는 비동기 논블로킹 방식을 함께 적용한 덕분에 25%가 아니라 약 4000%의 속도 향상이 있었다. 논블로킹 방식이란 HTTP 요청을 보낸 후 응답이 오기까지 무조건 기다리지 않고 다른 HTTP 요청이 있을 경우 이들을 병렬로 처리하는 것이다. 노드JS에서 이벤트루프 모델을 사용해 논블로킹 싱글스레드 방식으로 요청을 처리하는 것처럼 스프링에서도 이벤트루프 모델을 사용해 논블로킹 HTTP 요청을 지원하는 기술이 있다. 그것은 스프링 웹플럭스 웹클라이언트이다. 메이븐의 경우 다음과 같이 의존성을 추가하면 사용할 수 있다.

 

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
   <version>2.7.7</version>
</dependency>

[ 소스3 ] 스프링 웹플럭스 의존성 

 

 

WebClient란 스프링 웹플럭스 리액티브 프로그래밍의 일부이다. 스프링 MVC 프로그래밍에서는 Thread Per Request 모델을 사용한다면 스프링 리액티브 프로그래밍에서는 Event Loop 모델을 사용한다. Event Loop 모델의 특징은 최소한의 스레드를 사용하여 컨텍스트 스위칭(스레드를 변경하는 것) 비용을 줄일 수 있다는 것과 논블로킹 방식으로 작업을 처리할 수 있다는 것이다. 그래서 WebClient는 블로킹 방식뿐만 아니라 논블로킹 방식으로 HTTP 요청을 보내고 리액티브 프로그래밍에서 사용하는 Mono, Flux 타입의 응답을 수집한다. 

 

위 기술을 적용하고자 먼저 HttpClient를 생성하고 응답시간 타임아웃, 커넥션 타임아웃 시간을 설정했다. WebClient가 null이 되지 않도록 하면서 ReactorClientHttpConnector로 HttpClient를 연결하고 baseUrl으로 변경된 API ENDPOINT를 갖도록 했다. JSON 데이터셋 응답을 받도록 미디어타입을 설정하고 baseurl과 uri를 추가해 GET 요청을 보내도록 했다.

 

기존 방식에서도 200 코드 또는 그 외 오류코드 두가지로만 구분해 테이블을 업데이트했기 때문에, WebClient 요청시 응답에서도 doOnSuccess, doOnError 메서드를 사용해 두가지 케이스로만 구분해 응답을 수집했다. doOnSucess, 즉 200 코드 응답을 받았을 때는 videoId, localeCode, batchId와 함께 complete 값을 true로 갖는 JSONObject를 응답리스트에 추가했다. 그리고 doOnError, 즉 4xx 또는 5xx 의 오류 코드 응답을 받았을 때는 complete 값을 공백으로 갖는 JSONObject를 응답리스트에 추가했다. 

 

이 WebClient 요청은 데이터베이스에서 조회한 요청 파라미터 데이터셋(videoId, localeCode) 리스트를 순회하면서 연속적으로 이루어졌다. 그러나 기존 블로킹 방식대로 했다면 속도 개선 효과가 적었을 것이다. 그래서 block 메서드 대신 subscribe 메서드를 사용해 비동기 논블로킹 방식으로 요청을 처리하도록 했다. 그리고 논블로킹 방식에서는 그 요청을 처리하는 데몬 스레드가 응답값을 받을 때까지 메인 스레드가 기다린다는 보장이 없기 때문에, CountDownLatch를 사용해서 초기값을 videoInfoList.size으로 주고, doOnTerminate 메서드에서(doOnSucess 또는 doOnError가 종료될 때마다) 1씩 카운트를 감소하도록 했다. videoInfoList 순회문 밖에서 CountDownLatch는 await하고 있으면서 순회문을 완전히 다 돌고 응답을 모두 수집할 때까지 메인스레드가 종료되지 않고 기다리도록 한 것이다. 

 

private ArrayList<Map<String, Object>> getStatusList(List<Map<String, Object>> videoInfoList, String batchId) throws Exception { /** 2023-03-28 getStatusList() - cmsApiUrl modified, non-blocking multi request applied */
   Map<String, Object> resultMap = new HashMap<>();
   ArrayList<org.json.JSONObject> responseList = new ArrayList<>();
   ArrayList<Map<String, Object>> statusList = new ArrayList<>();

   String nonBlockingResult = null;
   CountDownLatch cdl = new CountDownLatch(videoInfoList.size());

   try {
      HttpClient httpClient = HttpClient.create()
            .responseTimeout(Duration.ofSeconds(60 * 5))
            .proxyWithSystemProperties()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120 * 1000)
            .option(ChannelOption.SO_KEEPALIVE, true)
            ;

      for (int idx=0; idx<videoInfoList.size(); idx++) {
         String localeCode = videoInfoList.get(idx).get("localeCode").toString();
         String videoId = videoInfoList.get(idx).get("videoId").toString();
         String uri = "/catalog-service/v1/brightcove/"+localeCode.toLowerCase()+"/videos/"+videoId;

         // 2023-03-29 현재는 UK 만 오픈하므로 uk만 통과시킴
         if (!"UK".equals(localeCode)){
            cdl.countDown();
            continue;
         }

         WebClient webClient = null;
         while(webClient == null){
            webClient = WebClient.builder()
                  .clientConnector(new ReactorClientHttpConnector(httpClient))
                  .baseUrl(cmsApiUrl)
                  .build();
         }

         webClient
               .mutate()
               .defaultHeader("Content-Type", "application/json;charset=UTF-8")
               .baseUrl(cmsApiUrl)
               .build()
               .get()
               .uri(uri)
               .accept(MediaType.APPLICATION_JSON)
               .retrieve()
               .bodyToMono(String.class)
               .doOnError(e -> {
                  responseList.add(new org.json.JSONObject()
                        .put("complete", "")
                        .put("localeCode", localeCode)
                        .put("videoId", videoId)
                        .put("batchId", batchId)
                  );
               })
               .doOnSuccess(res -> {
                  responseList.add(new org.json.JSONObject(res.toString())
                        .put("complete", "true")
                        .put("localeCode", localeCode)
                        .put("videoId", videoId)
                        .put("batchId", batchId)
                  );
               })
               .doAfterTerminate(() -> cdl.countDown())
               .subscribe()
         ;
         updateCnt++;
      }

      cdl.await();
      resultMap.put("responseList", responseList);
      nonBlockingResult = new org.json.JSONObject(resultMap).toString();

      Gson gson = new Gson();
      Map<String, Object> m = gson.fromJson(nonBlockingResult, Map.class);
      statusList = (ArrayList) m.get("responseList");

   } catch(Exception e) {
      log.error("BrightcoveStatusMgrService^^getStatusList()^^Exception : " + ExceptionUtils.getMessage(e));
   }

   return statusList;
}

 

그리고 이렇게 해도 약 10% 확률로 응답 수집이 누락되는 것으로 보였다. 그래서 이 응답 수집이 누락된 videoInfoList를 한번 더 데이터베이스로부터 조회해서, 블로킹 방식으로 응답을 수집하고 마이바티스 배치업데이트를 한번더 실행했다. 이렇게 하고나니 결과적으로 2시간 이상 소요되던 테스트 시간이 3분 내외로 단축되었다. 약 4000% 속도향상하는 효과였다.

 

 

피드백

효율 좋은 기술 습득 방식을 선택해야 한다. 마이바티스 배치업데이트에 비해 스프링 리액티브 프로그래밍이 훨씬 공부할 내용이 많고 복잡했기 때문이다. 마이바티스 배치업데이트의 동작방식과 속도상 이점이 생길 수 있음을 이해하고 프로젝트에 적용하는 것에는 하루가 걸리지 않았지만 스프링 리액티브 프로그래밍 웹플럭스의 개념과 동작원리를 이해하는 것에는 몇배의 시간이 필요했다. 문서 독학도 좋지만 강의나 지인의 조언을 구하는 것이 좋은 방법이라고 생각된다.

 

덧붙여, 스프링 리액티브 프로그래밍의 특정 클래스만 위와 같이 사용하는 경우는 없다는 것을 알게 되었다. 해당 프로젝트에서는 WebClinet보다는 AsyncRestTemplate을 사용하는 것이 더 어울렸을 것이라는 것을 나중에 알게 되었다. WebClient를 사용하면서도 리액티브 프로그래밍 방식으로 구현된 프로젝트가 아닌 프로젝트에 리액티브 프로그래밍 기법을 부분적으로만 가져오는 것이 맞는가라는 의구심이 들었지만 그럼에도 불구하고 사용했던 이유는 이것 외에는 비동기 API 통신을 구현할 방법이 없는 것으로 알았기 때문이다. 다른 기술이 있는 줄 알았다면 더 검토해보았을 것 같다. 다음 번에는 조급하게 생각하지 말고 여러가지 선택지를 찾아보아야겠다. 

Comments