Java
Spring-boot
Spring Batch
MyBatis
結局ボトルネックは最初と最後
性能テストなんかでもそうなりがちですが、最終的なボトルネックはDBになりました。 というかボトルネックがDBだという状況まで追い込めたのであれば、アプリケーションとしてのロジックは限界まで切り詰められた1つの証拠になるのではないかとすら思っています。
今回の移行バッチで言うなれば、移行元の現行システムのDBからデータを取得する処理と、移行先の新システムのDBへデータを永続化する処理の部分になります。
アプリケーションとしては間の変換処理をどこまで切り詰められるかの戦いになるわけで、ここまで持ってこれればあとはいよいよどれだけ札束で殴れるかの勝負になってきます。
最終的に辿り着いた結論
これまでの試行錯誤編で答えは書いているのですが、最後にまとめておくことにします。
Cursorでメモリセーフに大量データを読み出す
MyBatis使いであれば大量データを扱うのは Cursor
を使うのが常套手段になります。
https://mybatis.org/mybatis-3/apidocs/org/apache/ibatis/cursor/Cursor.html
fetchSize
オプションでどれだけのレコード数を1度にメモリに展開するのかを調整しつつ、SELECTの結果をJavaのオブジェクトでさばくことが出来るので、まさにメモリセーフに大量データを読み出すのにうってつけの代物です。
個人的には大量データを読み出す際、 Cursor#getCurrentIndex
を使ってどこまで読み出せているのか定期的にログに出してあげると精神衛生上ハッピーになれます。
FetchSourceTable.java
@Component
@RequiredArgsConstructor
@Slf4j
public class FetchSourceTable {
private final SourceTableMapper sourceTableMapper;
@Transactional(readOnly = true)
public void fetch(LocalDate targetDate) {
try (var cursor = sourceTableMapper.findByTargetDate(targetDate)) {
cursor.forEach(source -> {
var index = cursor.getCurrentIndex();
// 10万件ずつログに出力
if (index % 100000 == 0) {
log.info("fetch SouceTable in progress: index={}, userId={}, ticker={}", index, source.getUserId(), source.getTicker());
}
});
}
}
}
CompletableFutureで重い処理を並列処理する
試行錯誤編では現行システムのDBからのデータ取得から変換にかけて CompletableFuture
を使った並列処理の話をしました。
が、実は使い所はここだけではなくて新システムのDBのデータ削除についてもこの並列処理が有効になってきます。
つまりデータの削除は変換後の永続化の手前までに終わっていればいいわけで、言い方を変えると データの永続化まで削除の処理結果の評価を遅延させられる ということを意味します。
データの削除処理も並列処理の対象として切り出して、
DeleteTargetTable.java
@Component
@RequiredArgsConstructor
public class DeleteTargetTable {
private final TargetTableMapper targetTableMapper;
@Async("batchExecutor")
public CompletableFuture<Integer> delete(LocalDate from, LocalDate to) {
try {
var deleted = from.detesUntil(to.plusDays(1))
.toList()
.stream()
.mapToInt(targetTableMapper::deleteByTargetDate)
.sum();
return CompletableFuture.completedFuture(deleted);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
}
永続化の手前で削除処理の結果を待ち合わせればいいというわけです。
MigrateTask.java
@StepScope
@Component
@RequiredArgsConstructor
public class MigrateTask implements Tasklet {
@Value("#{jobParameters['from']}")
private LocalDate from;
@Value("#{jobParameters['to']}")
private LocalDate to;
private final DeleteSourceTable deleteSourceTable;
private final FetchAndDataConverter fetchAndDataConverter;
private final TargetTableMapper targetTableMapper;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
var deleteTask = deleteSourceTable.delete(from, to);
var fetchAndConvertTasks = from.datesUntil(to.plusDays(1))
.toList()
.stream()
.map(fetchAndDataConverter::execute)
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(fetchAndConvertTasks).join();
// 永続化の手前まで削除処理の評価を遅延させる
var deleted = deleteTask.get();
var loadedCount = Arrays.stream(fetchAndConvertTasks)
.map(this::take)
.mapToInt(targetTableMapper::loadData)
.sum();
}
private Path take(CompletableFuture<Path> future) {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
今回のデータの削除処理や変換処理に限った話ではありませんが、ある程度のまとまりで重たい処理 CompletableFuture
を使って切り出し、適切なタイミングで処理結果を待ち合わせることで(評価を遅延させる)、より効率的にリソースを使いながら様々な処理をしていくことが可能になります。
LOAD DATAで大量データを入れ込む
正直、LOAD DATAの威力がここまでとは思ってもいませんでした。
Auroraのバージョンだったり、インスタンスタイプにもよって処理性能はもちろん変わってきますが、200万件〜300万件のデータを入れるのにバルクインサートだと数十分かかっていたところが、10分もかからずに入れられたわけですからその差は歴然です。
ちなみに今回はAurora for MySQLのバージョンがv2系だったこともあり、LOAD DATAは直列処理とせざるを得なかったのですが、v3系(MySQL v8系)になるとこれが並列処理出来るようになるので、さらにパフォーマンスを上げられそうでした。
パラレルテーブルインポートユーティリティというものが、MySQL v8系で実装されていて、それを使うことになります。 これは今後同じような大量データの投入に迫られた際には是非とも試してみたい機能の1つです。
https://dev.mysql.com/doc/mysql-shell/8.0/ja/mysql-shell-utilities-parallel-table.html