Amazon Web Services ブログ
AWS DMS によるリアルタイムでの Iceberg の取り込み
本投稿は、 Caius Brindescu と Mahesh Kansara による記事 「Real-time Iceberg ingestion with AWS DMS」を翻訳したものです。
これは、AWS とのパートナーシップに基づいた、Etleap の主任エンジニアである Caius Brindescu によるゲスト投稿です。
タイムリーな意思決定には、低レイテンシーで最新のデータにアクセスすることが不可欠です。しかし、多くのチームにとって、データレイクへのレイテンシーを削減することは、更新の処理、パイプラインの複雑さ、ウェアハウスとの統合と格闘することを意味します。Apache Iceberg は、リアルタイムの取り込みとマルチエンジンアクセスを実用的かつ効率的にすることで、この方程式を変えます。
Etleap は、AWS データ&アナリティクスコンピテンシーと Amazon Redshift サービスレディの認定を受けた AWS アドバンストテクノロジーパートナーです。この投稿では、Etleap が、AWS Database Migration Service(AWS DMS)を使用して、オペレーション中の SQL データベースからIceberg テーブルにデータをストリーミングする、スケーラブルでほぼリアルタイムのパイプラインの構築にどのように役立つかを示します。AWS DMS は、すべての主要なデータベースから AWS への変更データキャプチャ(CDC)のための堅牢で構成可能なソリューションとして使用できます。
実際のユースケースを参考に、一貫性、耐障害性、クラウドデータウェアハウスとの統合をどのように提供しているかをご紹介します。これにより、最も重要な時にデータがアクションを促す原動力となります。
AWS DMS の概要
AWS DMS は、リレーショナルデータベース、データウェアハウス、NoSQL データベース、その他のタイプのデータストアの移行を簡単に実現するクラウドサービスです。
AWS DMS を使用して、データを AWS クラウドに移行したり、クラウドとオンプレミス環境の組み合わせ間で移行したりすることができます。
AWS DMS を使用すると、1 回限りの移行を実行できるほか、継続的な変更を複製して、ソースとターゲットを様々な要件に応じてほぼリアルタイムで同期させることができます。ソースで変更が行われてからターゲットに複製されるまでに、わずかな遅延が生じる可能性があります。この遅延、つまりレイテンシーは、AWS DMS の設定、ネットワーク帯域幅、ソースおよびターゲットデータベースの負荷など、さまざまな要因の影響を受ける可能性があります。
次の図は、AWS DMS のレプリケーションプロセスを示しています。

Iceberg の概要
Iceberg は、データレイク上の大規模分析用に設計されたオープンテーブルフォーマットです。
コストの高いテーブルの書き換えやダウンタイムを必要とせずに、ACID トランザクションの完全サポート、隠しパーティショニング、スキーマ進化、タイムトラベルを提供します。
Iceberg は、Hive のような古い形式とは異なり、メタデータとデータファイルを別々に追跡し、スナップショットの高速読み取りと効率的なファイルレベルの更新を可能にします。データファイルはイミュータブルで、新しいスナップショットを作成することで変更が適用されるため、upserts や delete などの操作が安全で効率的になります。次の図は、Iceberg テーブルのファイルレイアウトを示しています。

Iceberg は、エンジンに依存しないデータストレージフォーマットであり、Spark、Apache Flink、Trino (Amazon Athena などのサービスを含む)、および Hive と統合されています。
これは、複数のダウンストリームシステムに確実にサービスを提供できる、本番環境グレードの低レイテンシーデータレイクアーキテクチャを構築するための強固な基盤となります。
カタログをクエリすることで、各エンジンは格納されているデータの一貫したビューを持つことができます。
Etleap の顧客が Iceberg を必要とする理由
Etleap のお客様にとって、低レイテンシーは最優先事項です。
ここでいうレイテンシーとは、ソースデータとターゲットデータの間の時間差として定義しています。
例えば、ターゲットがソースより 5 分遅れている場合、レイテンシーは 5 分となります。
従来のデータウェアハウスやデータレイクでは、レイテンシーをどこまで低く抑えられるかに制限があります。
ウェアハウスでは、新しいデータがステージング領域にロードされ、その後、ターゲットテーブルの大部分をスキャンしてから書き換えるようなマージが実行されます。
データに 2 回アクセスする (1 回は読み取り、1 回は書き換え) ことが、遅延の大部分を引き起こしています。
一方、データレイクは通常追加専用で、更新を効率的に処理しません。すべての行バージョンを保存してクエリ時に最新のバージョンを解決する(クエリのパフォーマンスを犠牲にする)か、ロード時に最新の状態を事前に計算することができます。ロード時にコストの高いテーブルの書き換えが必要になり、待ち時間が長くなります。
Iceberg は、Etleap ユーザーにとってこれらの両方の課題を解決します。ある事例では、ヨーロッパの自転車シェアリング企業が、自転車ラックの空き状況を監視し、ラックが空の状態で放置される時間を制限する規制を遵守するために運用ダッシュボードを使用しています。Iceberg は、パイプラインのレイテンシーを 5 ~ 10 分から数秒に短縮することで、より リアルタイムなデータを可能にし、オペレーターがラックのバランスを取り直し、ペナルティを回避するための重要な追加時間を提供します。
別のケースでは、チームは 1 つのデータセットを複数のデータウェアハウスで利用できるようにする必要がありました。以前は、移行先ごとに別々のパイプラインを構築して維持する必要がありました。Icebergを使用すると、データを一度ロードして複数のエンジンからのクエリがサポートされるため、複雑さが軽減され、ツール間の一貫性が保たれます。
ソリューションの概要
最初のステップは、ソースデータベースからできるだけ迅速に更新を抽出することです。このために、高スループットのデータキャプチャ (CDC) に確実にスケールする AWS DMS を使用します。AWS DMS は変更を Amazon Kinesis Data Streams に書き込みます。Etleap はこのデータストリームを読み取り、Amazon EMR 上の Flink を使用して変更を処理します。
そこから、データは Amazon S3 に保存された Iceberg テーブルに書き込まれ、AWS Glue がデータカタログとして使用されます。これにより、複数のクエリエンジンからデータをクエリできるようになります。
以下の図は、パイプラインのアーキテクチャを示しています。

完全に一度だけの処理
低遅延のパイプラインでデータの整合性を維持するには、データを迅速に移動するだけでは不十分です。また、データを一度だけ処理する必要もあります。レコードが重複または欠落していると、特に更新が多いワークロードでは、誤った結果になる可能性があります。変更が AWS DMS によってキャプチャされ、Kinesis Data Streams にストリーミングされた後、Etleap は Flink の2フェーズコミットプロトコルを使用して1回だけ処理します。
フェーズ 1 では、Flink が各並列データストリームにチェックポイントバリアを挿入します(次の図を参照)。
バリアがパイプラインを流れる際、各オペレーターは一時停止し、同期的に状態を保存してから、バリアをダウンストリームに転送します。その後、状態のスナップショットを耐久性のあるストレージ(この場合は Amazon S3 )に非同期で永続化します。
この分離により、I/O 待ちの間も重要な処理がブロックされないことが保証されます。オペレーターが状態の保存に成功すると、チェックポイントが完了したことを Flink Job Manager に通知します。

フェーズ 2 では、パイプラインのすべての部分でチェックポイントを完了したことをジョブマネージャーが確認し、その後グローバルなコミット信号をブロードキャストします (次の図を参照)。
この時点で、オペレーターは外部システムへのデータ書き込みなどの副作用を安全に実行できます。
Etleap の場合、これには Iceberg テーブルへの新しいスナップショットのコミットや、モニタリング用のメトリクスの記録が含まれます。

耐障害性をサポートするため、コミット操作は冪等性を備えるように実装されています。これは、Iceberg テーブルに新しいスナップショットを書き込む場合と、パイプラインのメトリクスとログを記録する場合の両方に適用されます。
この段階でパイプラインが失敗し、再起動が必要になった場合、Flink は安全にコミットを再試行します。
各操作は、重複や不整合を生じることなく複数回実行できるため、障害が発生した場合でもデータの正確性を維持できます。
パズルの最後のピースは、チェックポイントの頻度、つまりパイプラインがどのくらいの頻度でデータを Iceberg にコミットするかです。
この設定は、パイプライン全体のレイテンシーを決定する上で重要な役割を果たします。
ユースケースを評価した結果、3 秒のチェックポイント間隔を選択しました。
これは効果的なバランスを取っています。エンドツーエンドのレイテンシーを 5 秒未満に抑えつつ、頻繁なコミットによるパフォーマンスのオーバーヘッドを最小限に抑えています。
この間隔は、次のセクションで説明するように、ウェアハウスのメタデータ更新サイクルともうまく一致しています。
テーブルのメンテナンスとデータウェアハウスの統合
Iceberg テーブルは時間の経過とともに、メタデータ、スナップショット、最適でないファイルレイアウトを蓄積し、パフォーマンスが低下する可能性があります。
クエリのパフォーマンスを高く保ち、ストレージを効率的に使用するためには、定期的なメンテナンスが不可欠です。
これには、データファイルの圧縮、スナップショットの有効期限切れ、メタデータのクリーンアップなどのタスクが含まれます。
Etleap は、取り込みを中断することなく、これらのメンテナンスタスクを自動的に処理します。
メンテナンスジョブはデータパイプラインと並行して実行され、ファイルサイズの分布やスナップショット数などの発見的手法に基づいてトリガーされます。
次のスクリーンショットは、データフローを中断することなく、並行してメンテナンス作業を実行している Etleap パイプラインの例を示しています。

最後の要素はウェアハウスの統合です。Iceberg の主な利点の1つは相互運用性です。Athena、Amazon Redshift、Snowflake、Databricks などのエンジンから同じテーブルをクエリできます。
手動でのセットアップも可能ですが、Etleap はパイプラインの作成時にこれらの統合を自動的に設定できます。
例えば、Iceberg テーブルを Snowflake でクエリ可能にするために、AWS Glue とのカタログ統合を作成し、テーブルの Amazon S3 ロケーションを指す外部ボリュームを定義します:
その後、Snowflake で Iceberg テーブルを指す新しいテーブルを作成できます:
Snowflake を最新の Iceberg スナップショットと同期させるために、Etleap は各コミット後に REFRESH 操作をトリガーします。
これにより、ユーザーは最新のデータを確認でき、Snowflake のビューが大きく遅れることを防ぎます。
リフレッシュの同期的な性質は、自然なレート制限メカニズムも提供し、スナップショットの可視性をウェアハウスのクエリパフォーマンスと整合させます。
まとめ
AWS DMS、Kinesis Data Streams、Amazon EMR などの AWS ツールと、Iceberg の更新、スキーマ進化、マルチエンジンアクセスのサポートを組み合わせることで、低レイテンシーで信頼性の高いデータパイプラインを Iceberg に構築することが可能です。
このポストでは、運用データベースからの変更を Iceberg にストリーミングし、エンドツーエンドのレイテンシーを 5 秒未満に抑えながら、データの整合性を維持し、Snowflake のようなツールでのダウンストリーム分析をサポートする方法を紹介しました。
このアーキテクチャは、データレイクの最新化、エンジン間のアクセス統合、リアルタイムの運用要件の達成を目指すチームにとって、強力な基盤となります。
Etleap がどのようにして自動パイプライン作成、耐障害性がある処理、Iceberg のメンテナンスを含め、これらをすぐに実現可能にするかを確認するには、サインアップして、パーソナライズされたデモをご覧ください。