【試行錯誤編②】億り人をおくりびとした話

公開日:2023-07-17
最終更新日:2023-07-17

Java

Spring-boot

Spring Batch

MyBatis

直列処理から並列処理にしてみよう

前章でQueryTimeoutやOutOfMemoryErrorが発生していたバッチたち、実は問題はこれだけではなくてパフォーマンスの問題も抱えていました。

移行バッチとはいえ並行運用する中でデータをクリーンアップしてまたやり直したくなる場面も想定して、新システム側へ移行したデータの削除処理もやっていました。 まぁバッチとして重要な考え方の1つである冪等性の担保ってやつですね。

で、新システム側にしかないデータで移行時に紐付けが必要なものがあったりするので、そちらのデータ取得も当然ながらしなければならないと。 (新システム側のいわゆるマスターデータ的なやつ)

となるとこの移行バッチ、実はそれなりにやることがあって、

  • 新システム側のデータ削除
  • 新システム側のマスターデータ取得
  • 現行システム側の移行元データ取得
  • 新システム側へのデータ変換
  • 変換した移行データの新システム側への永続化

とまぁざっとこんなことをやらなければならないのですが、これらの処理が全て直列実行で実装されていました。

1日分だけで見ても数百万件オーダーになるというのに、これらを直列なんかでやっていたらいつまで経っても終わりません。

よし、並列処理したらええやんけと。

並列処理ってそんな簡単なものだっけ?

並列処理をしてやろうと鼻息荒くしたものの、ここ数年は並列処理のへの字もいらないくらいのぬるま湯に浸かりきっていた私は焦ります。 いうて並列処理ってめちゃくちゃ難しくなかったっけ?と。

まだJavaが1.6とか1.7がメインストリームだった頃、直接は関わってないのですがミリ秒の世界で戦っていた同期の話を聞いて「ほぇー、すごいなー」なんて鼻くそほじって頃を思い出します。 あんな難しいこと今の自分に出来るのか?と。

でも安心してください。出来ます。 というかJavaが進化して難しいことをあまり考えなくて良くなっていました。

Javaはもうオワコンだと一時期思ってた自分を殴りたい。 ごめんね、Java。

CompletableFutureとは

Javaの1.8で登場したCompletableFuture、今回はこいつを使うことにしました。 なんせSpring-bootと組み合わせるのがラクだったし、それでいてお手軽に並列処理が実装出来る。 なんていいやつなんだお前は。

https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/CompletableFuture.html

Spring-bootと組み合わせるのであれば、 ThreadPoolTaskExecutor を適当にBean登録しておくだけで準備完了。 (スレッドプールのサイズ等々、パラメータの調整はインフラのリソースにもよるので適宜変えてください)

ServerConfiguration.java

@Configuration
@EnableAsync
public class ServerConfiguration {

  @Bean(name = "batchExecutor")
  public ThreadPoolTaskExecutor() {
    return new TaskExecutorBuilder()
        .corePoolSize(20)
    .threadNamePrefix("batch-")
    .build;
  }
}

あとはpublicなメソッドに @Async アノテーションをはって戻り値の型を CompletableFuture にしてやればおしまい。 なんて簡単なんだ・・・。

FetchSourceTable.java

@Component
@RequiredArgsConstructor
public class FetchSourceTable {

  private final SourceTableMapper sourceTableMapper;
  
  @Async("batchExecutor")
  @Transactional(readOnly = true)
  public CompletableFuture<List<SourceTable>> fetch(LocalDate targetDate) {
    List<SourceTable> sources = new ArrayList<>();
    try (var cursor = sourceTableMapper.findByTargetDate(targetDate)) {
      cursor.forEach(sources::add);
    } catch (Exception e) {
      returun CompletableFuture.failedFuture(e);
    }
    return CompletableFuture.completedFuture(sources);
  }
}

あとは呼び出し元で複数の CompletableFuture を待ち合わせるのであれば CompletableFutre#allOf を使ったり、単一の実行結果を待ちたいのであれば CompletableFuture#get を使えばよいと。 CompletableFuture そのものの使い方についてはググってみてください。かなりの数の記事が出てくるのでパクればいいです。

この CompletableFuture 、Lambda式で書くことも出来るし、メソッドチェーンしたり、入れ子にしていったりとやろうと思えばかなり複雑なロジックを組むことも可能です。

ただ個人的な好みとしてはクラス分割・メソッド分割してあげて、呼び出し元で必要に応じて結果を待ち合わせるような書き方の方が可読性は高いのかなと感じました。

並列処理する単位の最適解を探る

さて、並列処理が出来るようになったので(といっても CompletableFuture なので非同期処理と書く方が正確ですが)、並列処理で最も重要だと思われる どの単位で並列処理をするのか を考えていきます。

この現行システムの移行元データ、日付のカラムをキーに分割移行するようにしたのは既にお話している通りなのですが、さらにユーザー単位に割ることも出来るというのはこの時点で分かっていました。

エンティティクラスにするとこのような感じ。

SourceTableEntity.java

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@ToString
@FieldDefaults(level = AccessLevel.PRIVATE)
public class SourceTable implements Serializable {

  LocalDate targetDate;
  
  String userId;
  
  String ticker;
  
  BigDecimal quantity;
  
  BigDecimal amount;
}

つまりここで考えられる方法2つ。

  • 日付のカラムをfrom / toの範囲指定でドカンと取得、さらにユーザーごとの単位に割った上で、変換処理を並列でぶん回す
  • データの取得、変換処理を日付単位で並列でぶん回す

日付・ユーザー単位で並列処理をぶん回す

この方法で狙っていたことは次の2つでした。

  • データ取得のSQLを1発に抑える
  • 1スレッドあたりの処理量を極小化する

分割移行をする上で使用する日付のカラムにはインデックスがはられていることが多かったので、BETWEEN句を使えばインデックススキャンを有効にしながら、1発でデータが取得できるのでDBに対する負荷を下げられるのではないかと睨んだわけです。

現行システムのDBは当然既に稼働しているものなので、おいそれと簡単にスケールアップだなんだと変更を加えることが出来ない状況だったので。 代わりに移行バッチはこれから稼働させるものなわけで、リソースを積もうと思えばいくら積んでもいいわけです。 つまりスケールアップをしやすい所に重たい処理を移すという作戦です。

なので日付の範囲指定でデータを取得して、

SourceTableMapper.java

@Mapper
@Repository
public interface SourceTableMapper {

  Cursor<SourceTableEntity> findByTargetDateBetween(@Param("from") LocalDate from, @Param("to") LocalDate to);
}

SourceTableMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="okuribito.domain.repository.SourceTableMapper">
  <select
    id="findByTargetDateBetween"
    fetchSize="10000"
    resultType="okuribito.domain.entity.SourceTable"
  >
    SELECT *
    FROM SourceDB.dbo.SourceTable
    WHERE TargetDate BETWEEN #{from} AND #{to}
  </select>
</mapper>

データのユーザー単位を表すRecordクラスを定義して、

SourceTableKey.java

public record SourceTableKey(LocalDate targetDate, String userId) {}

取得したデータを分割して、

FetchSourceTable.java

@Component
@RequiredArgsConstructor
public class FetchSourceTable {

  private final SourceTableMapper sourceTableMapper;
  
  @Transactional(readOnly = true)
  public Map<SourceTableKey, List<SourceTableEntity>> fetch(LocalDate from, LocalDate to) throws IOException {
    Map<SourceTableKey, List<SourceTableEntity>> sources = new HashMap<>();
    try (var cursor = sourceTableMapper.findByTargetDateBetween(from, to)) {
      cursor.forEach(source -> {
        var key = new SourceTableKey(source.getTargetDate(), source.getUserId());
    sources.computeIfAbsent(key, k -> new ArrayList<>()).add(source);
      });
    }
    return sources;
  }
}

変換処理を並列実行すると。

DataConverter.java

@Component
@RequiredArgsConstructor
public class DataConverter {

  @Async("batchExecutor")
  public CompletableFuture<List<TargetTableEntity>> execute(Map.Entry<SourceTableKey, List<SourceTableEntity>> entry) {
    try {
      var key = entry.getKey();
      var sources = entry.getValue();
      var targets = sources.stream()
          .map(v -> {
            // 変換処理
          })
      .toList();
      return CompletableFuture.completedFuture(targets);
    } catch (Exception e) {
      return CompletableFuture.failedFuture(e);
    }
  }
}

MigrateTask.java

@StepScope
@Component
@RequiredArgsConstructor
public class MigrateTask implements Tasklet {

  @Value("#{jobParameters['from']}")
  private LocalDate from;

  @Value("#{jobParameters['to']}")
  private LocalDate to;

  private final FetchSourceTable fetchSourceTable;
  
  private final DataConverter dataConverter;

  @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
    var convertTasks = fetchSourceTable.fetch(from, to)
        .entrySet()
    .stream()
    .map(dataConverter::execute)
    .toArray(CompletableFuture[]::new);
    
    CompletableFuture.allOf(convertTasks).join();
    
    var targets = Arrays.stream(convertTasks)
        .map(this::take)
    .flatMap(List::stream)
    .toList();
  }
  
  private List<TargetTableEntity> take(CompletableFuture<List<TargetTableEntity>> future) {
    try {
      return future.get();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

こうしてみると変換処理の部分だけ見れば、200万件〜300万件程度であればものの数分程度で変換できていることが確認できたのです。もちろん、ECSのリソースは潤沢に積みましたが。 さらに現行システムのSQLServerのリソース状況もウォッチしていましたが、逼迫することなくなんなら余裕あるなくらいの数値を見せていました。

日付単位で並列処理をぶん回す

変換処理の部分だけ見れば確実に高いパフォーマンスを叩き出せることは前項で分かったわけですが、そうは言っても結局データ取得の部分で数十分を要していたので、やはり何とかするべきはここなんだろうなと。 いうてSQLServerまだ負荷かけられそうなツラしてたし、お前もう少し可愛がってやろうじゃないかと。

つまり取得〜変換までを1セットとして、それを日付単位で並列実行してやろうというのが次の作戦です。

なので日付を1日指定でデータを取得して、

SourceTableMapper.java

@Mapper
@Repository
public interface SourceTableMapper {

  Cursor<SourceTableEntity> findByTargetDate(@Param("targetDate") LocalDate targetDate);
}

SourceTableMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="okuribito.domain.repository.SourceTableMapper">
  <select
    id="findByTargetDate"
    fetchSize="10000"
    resultType="okuribito.domain.entity.SourceTable"
  >
    SELECT *
    FROM SourceDB.dbo.SourceTable
    WHERE TargetDate = #{targetDate}
  </select>
</mapper>

データの取得〜変換処理を並列実行すると。

FetchAndDataConverter.java

@Component
@RequiredArgsConstructor
public class FetchAndDataConverter {

  private final SourceTableMapper sourceTableMapper;

  @Async("batchExecutor")
  public CompletableFuture<List<TargetTableEntity>> execute(LocalDate targetDate) {
    try (var cursor = sourceTableMapper.findByTargetDate(targetDate)) {
      List<TargetTableEntity> targets = new ArrayList<>();
      cursor.forEach(source -> {
        // 変換処理
      });
      return CompletableFuture.completedFuture(targets);
    } catch (Exception e) {
      return CompletableFuture.failedFuture(e);
    }
  }
}

MigrateTask.java

@StepScope
@Component
@RequiredArgsConstructor
public class MigrateTask implements Tasklet {

  @Value("#{jobParameters['from']}")
  private LocalDate from;

  @Value("#{jobParameters['to']}")
  private LocalDate to;

  private final FetchAndDataConverter fetchAndDataConverter;

  @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
    var fetchAndConvertTasks = from.datesUntil(to.plusDays(1))
        .toList()
    .stream()
    .map(fetchAndDataConverter::execute)
    .toArray(CompletableFuture[]::new);
    
    CompletableFuture.allOf(fetchAndConvertTasks).join();
    
    var targets = Arrays.stream(fetchAndConvertTasks)
        .map(this::take)
    .flatMap(List::stream)
    .toList();
  }
  
  private List<TargetTableEntity> take(CompletableFuture<List<TargetTableEntity>> future) {
    try {
      return future.get();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

こうしてみると仮にデータの取得に待たされているスレッドがいたとしても、データの取得が終わった別スレッドでは変換処理が走ってくれるので、効率良く処理をさばくことが出来ていました。 from / toの指定する範囲にもよりますが、適度にSQLServer側に負荷をかけつつ、全体で見ると前項の作戦よりもパフォーマンスを上げることに成功したわけです。

また次章で登場するLOAD DATAとの兼ね合いもあり、CompletableFutureによる並列処理は日付単位とする方針を選択することになります。

©︎ s-kugel All Rights Reserved.