Amazon Web Services ブログ
トランザクション処理可能なオープンテーブルフォーマット(OTF)のために Amazon Athena for Apache Spark と Spark SQL を活用する
AWS を活用したデータレイクは、きわめて高い可用性を誇るAmazon Simple Storage Service(Amazon S3)を土台としており、多様なデータとアナリティクスのアプローチを組み合わせるのに必要なスケール、敏捷性、柔軟性を提供することができます。データレイクがサイズと利用法の両面で成熟してくるにつれ、データをビジネスイベントに合わせて一貫性を保つのにかなりの労力が費やされることがあります。ファイルがトランザクションと一貫性を保って更新されることを確実にするために、Apache Iceberg、Apache Hudi、Linux Foundation Delta Lake などのオープンソースのトランザクション処理可能なテーブルフォーマット (Open Table Format – OTF) を利用する顧客が増えています。これらのフォーマットは高い圧縮率でデータを保存し、アプリケーションやフレームワークと連携し、Amazon S3 上に構築されたデータレイクでの差分(増分)データ処理を簡素化します。これらのフォーマットにより、 ACID (原子性、一貫性、分離性、持続性)トランザクション、アップサート、削除、タイムトラベルやスナップショットなどの高度な機能が可能になります。これらの機能は以前はデータウェアハウスでのみ利用できたものです。各テーブルフォーマットはこの機能を少しずつ異なる方法で実装しています。比較のためには、AWS 上のトランザクショナルデータレイクのためのオープンテーブルフォーマットの選択(英文)を参照してください。
2023年に、AWS はAmazon Athena for Apache Sparkにおいて、Apache Iceberg、Apache Hudi、Linux Foundation Delta Lake サポートの一般提供開始を発表しました。これにより、個別のコネクタや関連する依存関係をインストールしてパッケージ管理する必要がなくなり、これらのフレームワークを使用するために必要な設定手順が簡素化されます。
この投稿では、Amazon Athena ノートブックで Spark SQL を使用する方法と、Iceberg、Hudi、Delta Lake テーブルフォーマットを操作する方法を示します。
Athena の Spark SQL を使用した、データベースとテーブルの作成、テーブルへのデータの挿入、データのクエリ、Amazon S3 のテーブルスナップショットの確認など、一般的な操作をデモンストレーションします。
前提条件
次の前提条件を完了してください:
- 「Amazon Athena Spark で Spark SQL を実行する」(英文)に記載されているすべての前提条件を満たしていることを確認してください。
- 「Amazon Athena Spark で Spark SQL を実行する」で詳述されているように、AWS Glue Data Catalog に
sparkblogdb
というデータベースとnoaa_pq
というテーブルを作成してください。 - Athena ワークグループで使用される AWS Identity and Access Management (IAM) ロールに、S3 バケットとプレフィックスへの読み書きアクセス許可を付与してください。 詳細については、Amazon S3: Allows read and write access to objects in an S3 Bucket を参照してください。
- クリーンアップを実行するために、Athena ワークグループで使用される IAM ロールに、S3 バケットとプレフィックスへの
s3:DeleteObject
アクセス許可を付与してください。 詳細については、Amazon S3 アクション の オブジェクト削除のアクセス許可 セクションを参照してください。
Amazon S3 からのサンプルノートブックのダウンロードとインポート
この投稿で説明するノートブックは、次の場所からダウンロードできます。
- Iceberg チュートリアルノートブック: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_iceberg.ipynb
- Hudi チュートリアルノートブック: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_hudi.ipynb
- Delta チュートリアルノートブック: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_delta.ipynb
ノートブックをダウンロードしたら、ノートブックファイルの管理 (※注:本稿翻訳時点ではリンク先のドキュメントが未翻訳であり、英語での提供です。以下同様。)の ノートブックのインポート方法 のセクションに従って、Athena Spark 環境にインポートしてください。
Open Table Format にあわせたセクションを参照してください
Iceberg テーブル形式に興味がある場合は、Apache Iceberg テーブルの利用セクションを参照してください。
Hudi テーブル形式に興味がある場合は、Apache Hudi テーブルの利用のセクションを参照してください。
Delta Lake テーブル形式に興味がある場合は、Linux Foundation Delta Lake テーブルの利用のセクションを参照してください。
Apache Iceberg テーブルの利用
Athena の Spark ノートブックを使用する際、PySpark のコードを使用することなく直接 SQL クエリを実行できます。
これは、セルの動作を変更するノートブックセルの特別なヘッダであるセルマジックを使用することで実現します。
SQL の場合、%%sql
マジックを追加できます。これにより、セルの内容全体が Athena 上で実行される SQL ステートメントとして解釈されます。
このセクションでは、Athena の Apache Spark SQL を使用して、Apache Iceberg テーブルを作成、分析、管理する方法を示します。
ノートブックセッションの設定
Athena で Apache Iceberg を使用するには、セッションの作成や編集中に、Apache Spark プロパティ セクションを展開し、Apache Iceberg オプションを選択します。
以下のスクリーンショットに示すように、プロパティが事前に設定されます。
ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。
このセクションで使用されているコードは、SparkSQL_iceberg.ipynb ファイルで参照できます。
データベースとIcebergテーブルの作成
まず、AWS Glue データカタログにデータベースを作成します。
次の SQL を使用すると、icebergdb
というデータベースを作成できます。
次に、データベース icebergdb
で、データのロード先になる Amazon S3 の場所を指す noaa_iceberg
という Iceberg テーブルを作成します。次のステートメントを実行し、ロケーション s3://<your-S3-bucket>/<prefix>/ をご自身の S3 バケットとプレフィックスに置き換えてください:
テーブルへのデータの挿入
noaa_iceberg
Iceberg テーブルにデータを入力するために、前提条件として作成された Parquet テーブル sparkblogdb.noaa_pq
からデータを挿入します。
Spark の INSERT INTO ステートメントを使用してこれを行うことができます。
あるいは、CREATE TABLE AS SELECT に USING iceberg 句を使用することで、 Iceberg テーブルを作成し、ソーステーブルからデータを挿入する一連のステップを一度に実行できます。
Iceberg テーブルのクエリ
データが Iceberg テーブルに挿入されたので、分析を開始できます。
'SEATTLE TACOMA AIRPORT, WA US'
の場所について、年ごとの最低記録気温を見つけるために、Spark SQL を実行してみましょう。
次のような出力が得られます。
Iceberg テーブル内のデータの更新
テーブル内のデータを更新する方法を見ていきましょう。
ステーション名 'SEATTLE TACOMA AIRPORT, WA US'
を 'Sea-Tac'
に更新したいとします。
Spark SQL を使用すると、Iceberg テーブルに対して UPDATE ステートメントを実行できます。
また、前のSELECTクエリを実行して、'Sea-Tac'
ロケーションの最低記録温度を見つけることができます。
次のような出力が得られます。
データファイルの圧縮
Icebergのような Open Table Format における更新処理では、ファイルストレージ内の更新差分を作成し、マニフェストファイルを通じて行のバージョンをトラッキングすることでその機能を実現しています。 ファイル数が多くなるとマニフェストファイルに格納されるメタデータの量が多くなりますし、小さいデータが大量にあると不必要にメタデータを多くしがちで、これによりクエリ効率の低下と Amazon S3 アクセスコストの上昇を招きます。 Athena (for Spark) で Iceberg のrewrite_data_files
プロシージャを実行すると、データファイルが圧縮され、多数の小さな差分ファイルが、読み取りに最適化された少数の Parquet ファイルにまとめられます。 ファイルの圧縮により読み取り操作が高速化されます。 テーブルの圧縮を実行するには、次の Spark SQL を実行します。
rewrite_data_files には ソート戦略を指定するオプションがあり、これによりデータの再編成と圧縮を適切に指定することができます。
テーブルスナップショットのリスト表示
Iceberg テーブル上の各書き込み、更新、削除、アップサート、圧縮操作は、スナップショット分離とタイムトラベルを実現するため、古いデータとメタデータを保持しつつ、テーブルの新しいスナップショットを作成します。Iceberg テーブルのスナップショット一覧を得るには、次の Spark SQL ステートメントを実行します。
古いスナップショットの期限切れ
不要になったデータファイルを削除し、テーブルメタデータのサイズを小さく保つために、期限を指定したスナップショットの定期的な削除が推奨されます。期限切れではないスナップショットでまだ必要とされているファイルは削除されません。Athena for Spark では、次の SQL を実行して、特定のタイムスタンプよりも古い icebergdb.noaa_iceberg
テーブルのスナップショットの期限切れを設定できます。
timestamp の値は yyyy-MM-dd HH:mm:ss.fff
の形式の文字列で指定されていることに注意してください。
出力は削除されたデータファイルとメタデータファイルの数をカウントしたものになります。
テーブルとデータベースの削除
この演習で使用したIcebergテーブルとAmazon S3の関連データをクリーンアップするには、次のSpark SQLを実行できます。
次の Spark SQL を実行して、icebergdb データベースを削除します。
Athena で Spark を使用して Iceberg テーブルで実行できるすべての操作の詳細については、Iceberg ドキュメントの Spark クエリ と Spark プロシージャ を参照してください。
Apache Hudi テーブルの利用
次に、Athena の Spark SQL を使用して、Apache Hudi テーブルを作成、分析、管理する方法を示します。
ノートブックセッションの設定
Athena で Apache Hudi を使用するには、セッションの作成または編集中に、Apache Spark プロパティ セクションを展開し、Apache Hudi オプションを選択します。
ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。
このセクションで使用されているコードは、SparkSQL_hudi.ipynb ファイルで利用できます。以下のステップを確認するためにご利用ください。
データベースとHudiテーブルの作成
まず、AWS Glue データカタログに格納される hudidb
というデータベースを作成します。この次に Hudi テーブルを作成します。
Amazon S3 のデータをロードする場所を指す Hudi テーブルを作成します。
このテーブルは、コピーオンライト型であることに注意してください。
これはテーブル DDL のtype='cow'
によって定義されています。
stationとdate を複合プライマリキー、preCombinedFieldをyearとして定義しました。
また、テーブルは year でパーティション化されています。
次のステートメントを実行し、ロケーションs3://<your-S3-bucket>/<prefix>/
をご自身の S3 バケットとプレフィックスに置き換えてください:
テーブルへのデータの挿入
Iceberg と同様に、前のステップで作成した sparkblogdb.noaa_pq
テーブルからデータを読み取ることによってテーブルにデータを入力するために、INSERT INTO ステートメントを使用します。
Hudi テーブルのクエリ
テーブルが作成されたので、'SEATTLE TACOMA AIRPORT, WA US'
ロケーションにおける最高気温を検索するクエリを実行してみましょう。
Hudi テーブル内のデータの更新
ステーション名(name)を 'SEATTLE TACOMA AIRPORT, WA US'
から 'Sea–Tac'
に変更しましょう。
Athena for Spark で アップデート ステートメントを実行することで、noaa_hudi
テーブルのレコードを更新できます。
「Sea-Tac」ロケーションで記録された最高気温を検索するために、前のSELECTの条件句を’Sea-Tac’に変えて実行します。
タイムトラベルクエリ
SQL でタイムトラベルクエリを使用することで、過去のデータスナップショットを分析できます。例:
このクエリは、過去の特定の時点でのシアトル空港の気温データをチェックします。
timestamp 句を使うことで、現在のデータを変更することなく過去に戻ることができます。
timestamp の値は yyyy-MM-dd HH:mm:ss.fff
のフォーマットの文字列で指定されていることに注意してください。
クラスタリングによるクエリ速度の最適化
Athena でのクエリパフォーマンスを改善するために、Spark SQL を使用して Hudi テーブルで クラスタリング を実行できます。
テーブルのコンパクション(compaction)
コンパクションはHudiに特有の Merge On Read (MOR) テーブルで採用されているテーブルサービスで、行ベースのログファイルからの更新を定期的に対応する列ベースのベースファイルにマージすることで、ベースファイルの新しいバージョンを生成します。コンパクションは Copy On Write (COW) テーブルには適用されず、 MOR テーブルにのみ適用されます。 Athena for Spark を使用して MOR テーブルのコンパクションを実行するには、次のクエリを実行できます。
テーブルとデータベースの削除
以下の Spark SQL を実行して、作成した Hudi テーブルと、Amazon S3 の場所に関連付けられたデータを削除してください:
次の Spark SQL を実行して、データベース hudidb
を削除します。
Athena で Spark を使用して Hudi テーブルで実行できるすべての操作については、Hudi ドキュメントの SQL DDL と Procedures を参照してください。
Linux Foundation Delta Lake テーブルの利用
次に、Athena の Spark SQL を使用して Delta Lake テーブルを作成、分析、管理する方法を示します。
ノートブックセッションの設定
Athena で Spark を使用して Delta Lake を利用するには、セッションの作成または編集中に、Apache Spark プロパティ セクションを展開し、Linux Foundation Delta Lake を選択します。
ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。
このセクションで使用されているコードは、SparkSQL_delta.ipynb ファイルで利用できます。ためにご利用ください。
データベースとDelta Lakeテーブルの作成
このセクションでは、AWS Glue データカタログにデータベースを作成します。
次の SQL を使用すると、deltalakedb
というデータベースを作成できます。
次に、データベース deltalakedb
で、データをロードする Amazon S3 の場所を指す noaa_delta
という Delta Lake テーブルを作成します。次のステートメントを実行し、ロケーション s3://<your-S3-bucket>/<prefix>/
をご自身の S3 バケットとプレフィックスに置き換えてください:
テーブルへのデータの挿入
前の投稿で作成した sparkblogdb.noaa_pq
テーブルからデータを読み取ることにより、テーブルに入力するために INSERT INTO ステートメントを使用します。
CREATE TABLE AS SELECT を使用して、1 つのクエリで Delta Lake テーブルを作成し、ソーステーブルからデータを挿入することもできます。
Delta Lake テーブルのクエリ
Delta Lake テーブルにデータが挿入されたので、分析を開始することができます。
'SEATTLE TACOMA AIRPORT, WA US'
ロケーションの最低記録温度を見つけるために、Spark SQL を実行しましょう。
Deltaレイクテーブル内のデータの更新
ステーション名を 'SEATTLE TACOMA AIRPORT, WA US'
から 'Sea–Tac'
に変更しましょう。
Athena の Spark 上で noaa_delta
テーブルのレコードを更新する UPDATE ステートメントを実行できます。
前のSELECTクエリを実行して、'Sea-Tac'
ロケーションの最低記録温度を検索できます。結果は以前と同じはずです。
データファイルの圧縮 (optimize)
Athena for Spark では、Delta Lake テーブルに対して OPTIMIZE を実行できます。これにより、複数の小さいファイルが大きなファイルに圧縮されるため、小さいファイルがたくさんあることによるクエリへの負担を減らすことができます。圧縮を実行するには、次のクエリを実行します。
Delta Lake のドキュメントの最適化を参照して、OPTIMIZE の実行中に使用できるさまざまなオプションを確認してください。
Delta Lake テーブルで参照されなくなったファイルの削除
Athena の Spark を使用して Delta Lake テーブル上で VACUUM コマンドを実行することで、そのテーブルから参照されなくなった Amazon S3 に保存されたファイルで、保持期間を超えたものを削除できます。
Delta Lake のドキュメントの Delta テーブルで参照されなくなったファイルの削除 を参照して、VACUUM で利用できるオプションを確認してください。
テーブルとデータベースの削除
次の Spark SQL を実行して、作成した Delta Lake テーブルを削除します。
次の Spark SQL を実行して、データベース deltalakedb
を削除します。
Delta Lake テーブルとデータベースで DROP TABLE DDL を実行すると、これらのオブジェクトのメタデータが削除されますが、Amazon S3 のデータファイルは自動的には削除されません。
ノートブックのセルで次の Python コードを実行することで、S3 バケットからデータを削除できます。
Athena の Spark を使用して Delta Lake テーブルで実行できる SQL ステートメントの詳細については、Delta Lake ドキュメントのクイックスタートを参照してください。
まとめ
この投稿では、Athena ノートブックで Spark SQL を使用してデータベースとテーブルを作成し、データを挿入およびクエリを実行し、Hudi、Delta Lake、Iceberg テーブルでの更新、圧縮、タイムトラベルなどの一般的な操作を実行する方法を示しました。
Open Table Format (OTF) は、 ACID トランザクション、アップサート、削除といった操作をデータレイク上で可能にし、オブジェクトストレージ上でのより高度な操作を提供します。
Athena for Spark では、別途コネクタをインストールする必要がないので、Amazon S3 上に信頼できるデータレイクを構築す際のこれらの一般的な準備や管理のオーバーヘッドを削減することができます。
データレイク上の処理における Open Table Format の選択の詳細については、 AWS でのトランザクションデータレイクのためのオープンテーブルフォーマットの選択 を参照してください。
著者について
Pathik Shahは、Amazon Athena のシニアアナリティクスアーキテクトです。2015年にAWSに加入して以来、ビッグデータアナリティクスの分野に注力し、AWSのアナリティクスサービスを使用してスケーラブルで堅牢なソリューションの構築を支援しています。
Raj Devnath は、Amazon Athena のプロダクトマネージャーです。お客様に愛される製品の構築と、お客様のデータから価値を引き出すことに情熱を注いでいます。金融、小売、スマートビル、家庭オートメーション、データ通信システムなど、複数のエンドマーケット向けのソリューションの提供経験があります。
翻訳:ソリューションアーキテクト 下佐粉 昭 ( twitter – @simosako)
原文:Use Amazon Athena with Spark SQL for your open-source transactional table formats