Amazon Web Services ブログ

トランザクション処理可能なオープンテーブルフォーマット(OTF)のために Amazon Athena for Apache Spark と Spark SQL を活用する

AWS を活用したデータレイクは、きわめて高い可用性を誇るAmazon Simple Storage Service(Amazon S3)を土台としており、多様なデータとアナリティクスのアプローチを組み合わせるのに必要なスケール、敏捷性、柔軟性を提供することができます。データレイクがサイズと利用法の両面で成熟してくるにつれ、データをビジネスイベントに合わせて一貫性を保つのにかなりの労力が費やされることがあります。ファイルがトランザクションと一貫性を保って更新されることを確実にするために、Apache IcebergApache HudiLinux Foundation Delta Lake などのオープンソースのトランザクション処理可能なテーブルフォーマット (Open Table Format – OTF) を利用する顧客が増えています。これらのフォーマットは高い圧縮率でデータを保存し、アプリケーションやフレームワークと連携し、Amazon S3 上に構築されたデータレイクでの差分(増分)データ処理を簡素化します。これらのフォーマットにより、 ACID (原子性、一貫性、分離性、持続性)トランザクション、アップサート、削除、タイムトラベルやスナップショットなどの高度な機能が可能になります。これらの機能は以前はデータウェアハウスでのみ利用できたものです。各テーブルフォーマットはこの機能を少しずつ異なる方法で実装しています。比較のためには、AWS 上のトランザクショナルデータレイクのためのオープンテーブルフォーマットの選択(英文)を参照してください。

2023年に、AWS はAmazon Athena for Apache Sparkにおいて、Apache Iceberg、Apache Hudi、Linux Foundation Delta Lake サポートの一般提供開始を発表しました。これにより、個別のコネクタや関連する依存関係をインストールしてパッケージ管理する必要がなくなり、これらのフレームワークを使用するために必要な設定手順が簡素化されます。

この投稿では、Amazon Athena ノートブックで Spark SQL を使用する方法と、Iceberg、Hudi、Delta Lake テーブルフォーマットを操作する方法を示します。
Athena の Spark SQL を使用した、データベースとテーブルの作成、テーブルへのデータの挿入、データのクエリ、Amazon S3 のテーブルスナップショットの確認など、一般的な操作をデモンストレーションします。

前提条件

次の前提条件を完了してください:

Amazon S3 からのサンプルノートブックのダウンロードとインポート

この投稿で説明するノートブックは、次の場所からダウンロードできます。

ノートブックをダウンロードしたら、ノートブックファイルの管理 (※注:本稿翻訳時点ではリンク先のドキュメントが未翻訳であり、英語での提供です。以下同様。)の ノートブックのインポート方法 のセクションに従って、Athena Spark 環境にインポートしてください。

Open Table Format にあわせたセクションを参照してください

Iceberg テーブル形式に興味がある場合は、Apache Iceberg テーブルの利用セクションを参照してください。

Hudi テーブル形式に興味がある場合は、Apache Hudi テーブルの利用のセクションを参照してください。

Delta Lake テーブル形式に興味がある場合は、Linux Foundation Delta Lake テーブルの利用のセクションを参照してください。

Apache Iceberg テーブルの利用

Athena の Spark ノートブックを使用する際、PySpark のコードを使用することなく直接 SQL クエリを実行できます。
これは、セルの動作を変更するノートブックセルの特別なヘッダであるセルマジックを使用することで実現します。
SQL の場合、%%sql マジックを追加できます。これにより、セルの内容全体が Athena 上で実行される SQL ステートメントとして解釈されます。

このセクションでは、Athena の Apache Spark SQL を使用して、Apache Iceberg テーブルを作成、分析、管理する方法を示します。

ノートブックセッションの設定

Athena で Apache Iceberg を使用するには、セッションの作成や編集中に、Apache Spark プロパティ セクションを展開し、Apache Iceberg オプションを選択します。
以下のスクリーンショットに示すように、プロパティが事前に設定されます。

ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。

このセクションで使用されているコードは、SparkSQL_iceberg.ipynb ファイルで参照できます。

データベースとIcebergテーブルの作成

まず、AWS Glue データカタログにデータベースを作成します。
次の SQL を使用すると、icebergdb というデータベースを作成できます。

%%sql
CREATE DATABASE icebergdb

次に、データベース icebergdb で、データのロード先になる Amazon S3 の場所を指す noaa_iceberg という Iceberg テーブルを作成します。次のステートメントを実行し、ロケーション s3://<your-S3-bucket>/<prefix>/ をご自身の S3 バケットとプレフィックスに置き換えてください:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

テーブルへのデータの挿入

noaa_iceberg Iceberg テーブルにデータを入力するために、前提条件として作成された Parquet テーブル sparkblogdb.noaa_pq からデータを挿入します。
Spark の INSERT INTO ステートメントを使用してこれを行うことができます。

%%sql
INSERT INTO icebergdb.noaa_iceberg select * from sparkblogdb.noaa_pq

あるいは、CREATE TABLE AS SELECT に USING iceberg 句を使用することで、 Iceberg テーブルを作成し、ソーステーブルからデータを挿入する一連のステップを一度に実行できます。

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (year)
AS SELECT * FROM sparkblogdb.noaa_pq

Iceberg テーブルのクエリ

データが Iceberg テーブルに挿入されたので、分析を開始できます。
'SEATTLE TACOMA AIRPORT, WA US'の場所について、年ごとの最低記録気温を見つけるために、Spark SQL を実行してみましょう。

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

次のような出力が得られます。

Iceberg テーブル内のデータの更新

テーブル内のデータを更新する方法を見ていきましょう。
ステーション名 'SEATTLE TACOMA AIRPORT, WA US''Sea-Tac' に更新したいとします。
Spark SQL を使用すると、Iceberg テーブルに対して UPDATE ステートメントを実行できます。

%%sql
UPDATE icebergdb.noaa_iceberg
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

また、前のSELECTクエリを実行して、'Sea-Tac'ロケーションの最低記録温度を見つけることができます。

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'Sea-Tac'
group by 1,2

次のような出力が得られます。

データファイルの圧縮

Icebergのような Open Table Format における更新処理では、ファイルストレージ内の更新差分を作成し、マニフェストファイルを通じて行のバージョンをトラッキングすることでその機能を実現しています。 ファイル数が多くなるとマニフェストファイルに格納されるメタデータの量が多くなりますし、小さいデータが大量にあると不必要にメタデータを多くしがちで、これによりクエリ効率の低下と Amazon S3 アクセスコストの上昇を招きます。 Athena (for Spark) で Iceberg のrewrite_data_filesプロシージャを実行すると、データファイルが圧縮され、多数の小さな差分ファイルが、読み取りに最適化された少数の Parquet ファイルにまとめられます。 ファイルの圧縮により読み取り操作が高速化されます。 テーブルの圧縮を実行するには、次の Spark SQL を実行します。

%%sql
CALL spark_catalog.system.rewrite_data_files
(table => 'icebergdb.noaa_iceberg', strategy=>'sort', sort_order => 'zorder(name)')

rewrite_data_files には ソート戦略を指定するオプションがあり、これによりデータの再編成と圧縮を適切に指定することができます。

テーブルスナップショットのリスト表示

Iceberg テーブル上の各書き込み、更新、削除、アップサート、圧縮操作は、スナップショット分離とタイムトラベルを実現するため、古いデータとメタデータを保持しつつ、テーブルの新しいスナップショットを作成します。Iceberg テーブルのスナップショット一覧を得るには、次の Spark SQL ステートメントを実行します。

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

古いスナップショットの期限切れ

不要になったデータファイルを削除し、テーブルメタデータのサイズを小さく保つために、期限を指定したスナップショットの定期的な削除が推奨されます。期限切れではないスナップショットでまだ必要とされているファイルは削除されません。Athena for Spark では、次の SQL を実行して、特定のタイムスタンプよりも古い icebergdb.noaa_iceberg テーブルのスナップショットの期限切れを設定できます。

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

timestamp の値は yyyy-MM-dd HH:mm:ss.fff の形式の文字列で指定されていることに注意してください。
出力は削除されたデータファイルとメタデータファイルの数をカウントしたものになります。

テーブルとデータベースの削除

この演習で使用したIcebergテーブルとAmazon S3の関連データをクリーンアップするには、次のSpark SQLを実行できます。

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

次の Spark SQL を実行して、icebergdb データベースを削除します。

%%sql
DROP DATABASE icebergdb

Athena で Spark を使用して Iceberg テーブルで実行できるすべての操作の詳細については、Iceberg ドキュメントの Spark クエリSpark プロシージャ を参照してください。

Apache Hudi テーブルの利用

次に、Athena の Spark SQL を使用して、Apache Hudi テーブルを作成、分析、管理する方法を示します。

ノートブックセッションの設定

Athena で Apache Hudi を使用するには、セッションの作成または編集中に、Apache Spark プロパティ セクションを展開し、Apache Hudi オプションを選択します。

ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。

このセクションで使用されているコードは、SparkSQL_hudi.ipynb ファイルで利用できます。以下のステップを確認するためにご利用ください。

データベースとHudiテーブルの作成

まず、AWS Glue データカタログに格納される hudidb というデータベースを作成します。この次に Hudi テーブルを作成します。

%%sql
CREATE DATABASE hudidb

Amazon S3 のデータをロードする場所を指す Hudi テーブルを作成します。
このテーブルは、コピーオンライト型であることに注意してください。
これはテーブル DDL のtype='cow'によって定義されています。
stationとdate を複合プライマリキー、preCombinedFieldをyearとして定義しました。
また、テーブルは year でパーティション化されています。
次のステートメントを実行し、ロケーションs3://<your-S3-bucket>/<prefix>/をご自身の S3 バケットとプレフィックスに置き換えてください:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
year string)
USING HUDI
PARTITIONED BY (year)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'year',
type = 'cow'
)
LOCATION ''s3://<your-S3-bucket>/<prefix>/noaahudi/'

テーブルへのデータの挿入

Iceberg と同様に、前のステップで作成した sparkblogdb.noaa_pq テーブルからデータを読み取ることによってテーブルにデータを入力するために、INSERT INTO ステートメントを使用します。

%%sql
INSERT INTO hudidb.noaa_hudi select * from sparkblogdb.noaa_pq

Hudi テーブルのクエリ

テーブルが作成されたので、'SEATTLE TACOMA AIRPORT, WA US' ロケーションにおける最高気温を検索するクエリを実行してみましょう。

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Hudi テーブル内のデータの更新

ステーション名(name)を 'SEATTLE TACOMA AIRPORT, WA US' から 'Sea–Tac' に変更しましょう。
Athena for Spark で アップデート ステートメントを実行することで、noaa_hudi テーブルのレコードを更新できます。

%%sql
UPDATE hudidb.noaa_hudi
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

「Sea-Tac」ロケーションで記録された最高気温を検索するために、前のSELECTの条件句を’Sea-Tac’に変えて実行します。

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'Sea-Tac'
group by 1,2

タイムトラベルクエリ

SQL でタイムトラベルクエリを使用することで、過去のデータスナップショットを分析できます。例:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

このクエリは、過去の特定の時点でのシアトル空港の気温データをチェックします。
timestamp 句を使うことで、現在のデータを変更することなく過去に戻ることができます。
timestamp の値は yyyy-MM-dd HH:mm:ss.fff のフォーマットの文字列で指定されていることに注意してください。

クラスタリングによるクエリ速度の最適化

Athena でのクエリパフォーマンスを改善するために、Spark SQL を使用して Hudi テーブルで クラスタリング を実行できます。

%%sql
CALL run_clustering(table => 'hudidb.noaa_hudi', order => 'name')

テーブルのコンパクション(compaction)

コンパクションはHudiに特有の Merge On Read (MOR) テーブルで採用されているテーブルサービスで、行ベースのログファイルからの更新を定期的に対応する列ベースのベースファイルにマージすることで、ベースファイルの新しいバージョンを生成します。コンパクションは Copy On Write (COW) テーブルには適用されず、 MOR テーブルにのみ適用されます。 Athena for Spark を使用して MOR テーブルのコンパクションを実行するには、次のクエリを実行できます。

%%sql
CALL run_compaction(op => 'run', table => 'hudi_table_mor');

テーブルとデータベースの削除

以下の Spark SQL を実行して、作成した Hudi テーブルと、Amazon S3 の場所に関連付けられたデータを削除してください:

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

次の Spark SQL を実行して、データベース hudidb を削除します。

%%sql
DROP DATABASE hudidb

Athena で Spark を使用して Hudi テーブルで実行できるすべての操作については、Hudi ドキュメントの SQL DDLProcedures を参照してください。

Linux Foundation Delta Lake テーブルの利用

次に、Athena の Spark SQL を使用して Delta Lake テーブルを作成、分析、管理する方法を示します。

ノートブックセッションの設定

Athena で Spark を使用して Delta Lake を利用するには、セッションの作成または編集中に、Apache Spark プロパティ セクションを展開し、Linux Foundation Delta Lake を選択します。

ステップの詳細は、セッションの詳細を編集する または 独自のノートブックを作成する をご覧ください。

このセクションで使用されているコードは、SparkSQL_delta.ipynb ファイルで利用できます。ためにご利用ください。

データベースとDelta Lakeテーブルの作成

このセクションでは、AWS Glue データカタログにデータベースを作成します。
次の SQL を使用すると、deltalakedb というデータベースを作成できます。

%%sql
CREATE DATABASE deltalakedb

次に、データベース deltalakedb で、データをロードする Amazon S3 の場所を指す noaa_delta という Delta Lake テーブルを作成します。次のステートメントを実行し、ロケーション s3://<your-S3-bucket>/<prefix>/ をご自身の S3 バケットとプレフィックスに置き換えてください:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

テーブルへのデータの挿入

前の投稿で作成した sparkblogdb.noaa_pq テーブルからデータを読み取ることにより、テーブルに入力するために INSERT INTO ステートメントを使用します。

%%sql
INSERT INTO deltalakedb.noaa_delta select * from sparkblogdb.noaa_pq

CREATE TABLE AS SELECT を使用して、1 つのクエリで Delta Lake テーブルを作成し、ソーステーブルからデータを挿入することもできます。

Delta Lake テーブルのクエリ

Delta Lake テーブルにデータが挿入されたので、分析を開始することができます。
'SEATTLE TACOMA AIRPORT, WA US' ロケーションの最低記録温度を見つけるために、Spark SQL を実行しましょう。

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Deltaレイクテーブル内のデータの更新

ステーション名を 'SEATTLE TACOMA AIRPORT, WA US' から 'Sea–Tac' に変更しましょう。
Athena の Spark 上で noaa_delta テーブルのレコードを更新する UPDATE ステートメントを実行できます。

%%sql
UPDATE deltalakedb.noaa_delta
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

前のSELECTクエリを実行して、'Sea-Tac'ロケーションの最低記録温度を検索できます。結果は以前と同じはずです。

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'Sea-Tac'
group by 1,2

データファイルの圧縮 (optimize)

Athena for Spark では、Delta Lake テーブルに対して OPTIMIZE を実行できます。これにより、複数の小さいファイルが大きなファイルに圧縮されるため、小さいファイルがたくさんあることによるクエリへの負担を減らすことができます。圧縮を実行するには、次のクエリを実行します。

%%sql
OPTIMIZE deltalakedb.noaa_delta

Delta Lake のドキュメントの最適化を参照して、OPTIMIZE の実行中に使用できるさまざまなオプションを確認してください。

Delta Lake テーブルで参照されなくなったファイルの削除

Athena の Spark を使用して Delta Lake テーブル上で VACUUM コマンドを実行することで、そのテーブルから参照されなくなった Amazon S3 に保存されたファイルで、保持期間を超えたものを削除できます。

%%sql
VACUUM deltalakedb.noaa_delta

Delta Lake のドキュメントの Delta テーブルで参照されなくなったファイルの削除 を参照して、VACUUM で利用できるオプションを確認してください。

テーブルとデータベースの削除

次の Spark SQL を実行して、作成した Delta Lake テーブルを削除します。

%%sql
DROP TABLE deltalakedb.noaa_delta

次の Spark SQL を実行して、データベース deltalakedb を削除します。

%%sql
DROP DATABASE deltalakedb

Delta Lake テーブルとデータベースで DROP TABLE DDL を実行すると、これらのオブジェクトのメタデータが削除されますが、Amazon S3 のデータファイルは自動的には削除されません。
ノートブックのセルで次の Python コードを実行することで、S3 バケットからデータを削除できます。

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('')
bucket.objects.filter(Prefix="/noaadelta/").delete()

Athena の Spark を使用して Delta Lake テーブルで実行できる SQL ステートメントの詳細については、Delta Lake ドキュメントのクイックスタートを参照してください。

まとめ

この投稿では、Athena ノートブックで Spark SQL を使用してデータベースとテーブルを作成し、データを挿入およびクエリを実行し、Hudi、Delta Lake、Iceberg テーブルでの更新、圧縮、タイムトラベルなどの一般的な操作を実行する方法を示しました。
Open Table Format (OTF) は、 ACID トランザクション、アップサート、削除といった操作をデータレイク上で可能にし、オブジェクトストレージ上でのより高度な操作を提供します。
Athena for Spark では、別途コネクタをインストールする必要がないので、Amazon S3 上に信頼できるデータレイクを構築す際のこれらの一般的な準備や管理のオーバーヘッドを削減することができます。
データレイク上の処理における Open Table Format の選択の詳細については、 AWS でのトランザクションデータレイクのためのオープンテーブルフォーマットの選択 を参照してください。

著者について

Pathik Shahは、Amazon Athena のシニアアナリティクスアーキテクトです。2015年にAWSに加入して以来、ビッグデータアナリティクスの分野に注力し、AWSのアナリティクスサービスを使用してスケーラブルで堅牢なソリューションの構築を支援しています。

Raj Devnath は、Amazon Athena のプロダクトマネージャーです。お客様に愛される製品の構築と、お客様のデータから価値を引き出すことに情熱を注いでいます。金融、小売、スマートビル、家庭オートメーション、データ通信システムなど、複数のエンドマーケット向けのソリューションの提供経験があります。

翻訳:ソリューションアーキテクト 下佐粉 昭 ( twitter – @simosako)

原文:Use Amazon Athena with Spark SQL for your open-source transactional table formats