Amazon Web Services ブログ
AWS Lambda と AWS Glue Iceberg REST エンドポイントを使用した PyIceberg による軽量な分析環境の実現
本記事は、2025/5/9 に公開された Accelerate lightweight analytics using PyIceberg with AWS Lambda and an AWS Glue Iceberg REST endpoint を翻訳したものです。翻訳は Solutions Architect の深見が担当しました。
データインサイトに基づき決定を行う現代の組織にとって、効果的なデータ管理は、高度な分析と効率的な機械学習の利用を実現するための重要な要素です。データ利用のユースケースがより複雑になるにつれ、データエンジニアリングチームには、複数のデータソースやアプリケーション全体でのバージョン管理、増加するデータ量、スキーマ変更に対処するための高度なツールが必要になります。
Apache Iceberg は、データレイクで人気の選択肢となっています。ACID (原子性、一貫性、独立性、永続性) トランザクション、スキーマ進化、タイムトラベル機能を提供します。Iceberg テーブルは、Apache Spark や Trino などの様々な分散データ処理フレームワークからアクセスできるため、多様なデータ処理のニーズに対して柔軟なソリューションとなります。そのような Iceberg を扱うためのツールの中で、PyIceberg は分散コンピューティングリソースを必要とせずに、Python スクリプト上でテーブルのアクセスと管理を可能にします。
この投稿では、AWS Glue Data Catalog と AWS Lambda と統合された PyIceberg が、直感的な Python インターフェースを通じて Iceberg の強力な機能を活用するための軽量なアプローチを提供する方法を示します。この統合により、チームはほとんどセットアップやインフラストラクチャの依存関係の設定を行わずとも Iceberg テーブルの操作や利用を開始できることを説明します。
PyIceberg の主要機能と利点
PyIceberg の主な利点の 1 つは、軽量であることです。分散コンピューティングフレームワークを必要とせず、チームは Python アプリケーションから直接テーブル操作を実行できるため、学習曲線が小さく、小規模から中規模のデータ探索と分析に適しています。さらに、PyIceberg は Pandas や Polars などの Python データ分析ライブラリと統合されているため、データユーザーは既存のスキルとワークフローを活用できます。
PyIceberg を Data Catalog と Amazon Simple Storage Service (Amazon S3) で使用すると、データチームはテーブルを完全にサーバーレスな環境で利用および管理できます。つまり、データチームはインフラストラクチャの管理ではなく、分析と洞察に集中することができます。
さらに、PyIceberg を通じて管理される Iceberg テーブルは、AWS のデータ分析サービスと互換性があります。PyIceberg は単⼀ノードで動作するため、⼤量のデータを扱う場合は性能に制限がありますが、Amazon Athena や AWS Glue などのサービスを使えば、同じテーブルをより大規模に効率的に処理できます。これにより、チームは PyIceberg を使って迅速な開発とテストを行い、その後、データ管理アプローチの一貫性を維持しながら、より大規模な処理エンジンを使った本番ワークロードにシームレスに移行できます。
代表的なユースケース
次のようなシナリオでは、PyIceberg が特に役立ちます:
- データサイエンスの実験と特徴量エンジニアリング – データサイエンスでは、信頼できる効率的な分析とモデルを維持するために、実験の再現性が重要です。しかし、組織のデータが継続的に更新されるため、重要なビジネスイベント、モデル学習、一貫した参照のためのデータスナップショットを管理することが難しくなります。データサイエンティストは、タイムトラベル機能を使ってデータの過去のスナップショットを照会し、タグ付け機能を使って重要なバージョンを記録できます。PyIceberg を使えば、Pandas などの馴染みのあるツールを使って Python 環境でこれらの利点を得られます。Iceberg の ACID 特性のおかげで、テーブルが積極的に更新されている場合でも整合性を担保したデータアクセスが可能になります。
- AWS Lambda によるサーバーレスデータ処理 – 組織は多くの場合、複雑なインフラストラクチャを管理せずに、データを効率的に処理し、分析テーブルを維持する必要があります。PyIceberg と Lambda を使えば、チームはサーバーレスな Lamnba 関数を使ってイベント駆動のデータ処理やスケジュールされたテーブル更新を構築できます。PyIceberg の軽量な性質は、サーバーレス環境に適しており、データ検証、変換、取り込みなどのシンプルなデータ処理タスクを可能にします。これらのテーブルは、さまざまな AWS サービスを通じて更新と分析の両方にアクセスできるため、チームはサーバーやクラスターを管理せずに効率的なデータパイプラインを構築できます。
PyIceberg を使用したイベント駆動のデータ取り込みと分析
このセクションでは、NYC yellow taxi trip dataを使用して、PyIceberg によるデータ処理と分析の実践的な例を探ります。リアルタイムのタクシー走行記録の処理をシミュレートするために、Lambda を使用してサンプルデータを Iceberg テーブルに挿入します。この例では、効率的なデータ取り込みと柔軟な分析機能を組み合わせることで、ワークフローをより合理的なものにする方法を示します。
チームが次のような複数の要件対応する必要がある場面を想像してください:
- データ処理ソリューションは、中規模のデータセットを処理するケースで分散コンピューティングクラスターの管理の複雑さを避けるため、コストパフォーマンスが良く、メンテナンス性が高い必要があります。
- アナリストは、Python のツールを使って柔軟なクエリと探索を行えるようにする必要があります。例えば、過去のスナップショットと現在のデータを比較して、時間の経過に伴うトレンドを分析する必要があるかもしれません。
- ソリューションは、将来的により拡張性の高いものにする能力を持つ必要があります。
これらの要件に対処するため、PyIceberg で動作する Lambda によるデータ処理と Jupyter ノートブックによる分析を組み合わせたソリューションを実装します。このアプローチにより、データの整合性を維持しながら柔軟な分析ワークフローを可能にする、軽量でありながら堅牢なアーキテクチャが実現されます。ウォークスルーの最後では、Athena を使用してこのデータを照会し、複数の Iceberg 対応ツールとの互換性を示すとともに、このアーキテクチャのスケール性を示します。
大まかな手順は以下の通りです:
- Lambda を使用して、AWS Glue Iceberg REST エンドポイント経由で PyIceberg を利用し、サンプルの NYC yellow taxi trip data を Amazon S3 上の Iceberg テーブルに書き込みます。実際のシナリオでは、この Lambda 関数は Amazon Simple Queue Service (Amazon SQS) などのキューイングコンポーネントからのイベントによってトリガーされます。詳細は、Lambda と Amazon SQS の併用を参照してください。
- Jupyter ノートブックで AWS Glue Iceberg REST エンドポイントを経由して PyIceberg を使用し、テーブルデータを分析します。
- Athena を使用してデータをクエリし、Iceberg の柔軟性を実証します。
次の図はアーキテクチャを示しています。
このアーキテクチャを実装する際に重要になる点は、Lambda 関数がイベントによってトリガーされると、複数の同時実行が発生する可能性があることです。この同時実行により、Iceberg テーブルへの書き込み時にトランザクション競合が発生する可能性があります。これを処理するには、適切な再試行メカニズムを実装し、トランザクション分離レベルを慎重に管理する必要があります。Amazon SQS をイベントソースとして使用する場合は、SQS イベントソースの最大同時実行設定を使って同時実行を制御できます。
前提条件
このユースケースには、次の前提条件が必要です。
- Lambda、AWS Glue、Amazon S3、Athena、および AWS CloudFormation へのアクセスを提供するアクティブな AWS アカウント。
- CloudFormation スタックを作成およびデプロイする権限。詳細については、自己管理権限を使用した CloudFormation StackSets の作成を参照してください。
- AWS CloudShell とその機能に対する完全なアクセス権を持つユーザー。詳細については、AWS CloudShell の概要を参照してください。
AWS CloudFormation によるリソースのセットアップ
次の CloudFormation テンプレートを使用して、以下のリソースをセットアップできます:
- Iceberg テーブルのメタデータとデータファイルを格納する S3 バケット
- Amazon Elastic Container Registry (Amazon ECR) のリポジトリ (Lambda 関数のコンテナイメージを格納)
- テーブルを格納するデータカタログデータベース
- Amazon SageMaker AI の ノートブックインスタンス (Jupyter ノートブック環境用)
- Lambda 関数と SageMaker AI ノートブックインスタンス用の AWS Identity and Access Management (IAM) ロール
次の手順に従ってリソースをデプロイしてください。
- Launch stack を選択します。
- スタックのパラメータでは、データベース名として
pyiceberg_lambda_blog_database
がデフォルトで設定されています。デフォルト値を変更することもできます。データベース名を変更した場合は、以降のすべてのステップでpyiceberg_lambda_blog_database
を選択した名前に置き換えることを忘れないでください。次に、次へを選択します。 - 次へを選択します。
- AWS CloudFormation によって IAM リソースがカスタム名で作成される場合があることを承認しますを選択します。
- 送信を選択します。
Lambda 関数の構築と実行
PyIceberg を使って着信レコードを処理する Lambda 関数を構築しましょう。この関数は、Data Catalog の pyiceberg_lambda_blog_database
データベース内に nyc_yellow_table
という Iceberg テーブルが存在しない場合に新規でテーブルを作成します。次に、サンプルの NYC yellow taxi trip data を生成して、レコードをシミュレートし、nyc_yellow_table
に挿入します。
この例では、この関数を手動で呼び出していますが、実際のシナリオでは、この Lambda 関数は Amazon SQS からのメッセージなどの実際のイベントによってトリガーされます。実際のユースケースを実装する際は、関数コードをイベントデータを受け取り、要件に基づいて処理するように変更する必要があります。
コンテナイメージをデプロイパッケージとして使用して、この関数をデプロイします。ここではコンテナイメージから Lambda 関数を作成するために、CloudShell でイメージをビルドし、ECR リポジトリにプッシュします。以下の手順を実行してください。
- AWS マネジメントコンソールにサインインし、CloudShell を起動します。
- 作業ディレクトリを作成します。
- Lambda スクリプト
lambda_function.py
をダウンロードします。
このスクリプトは以下のタスクを実行します:
- Data Catalog に NYC yellow taxi trip data のスキーマを持つ Iceberg テーブルを作成します
- ランダムな NYC yellow taxi trip data にもとづくデータセットを生成します
- このデータをテーブルに挿入します
この Lambda 関数の重要な部分を掘り下げてみましょう:
- Iceberg カタログの構成 – 次のコードは、AWS Glue Iceberg REST エンドポイントに接続する Iceberg カタログを定義しています:
- テーブルスキーマ定義 – 次のコードは、NYC taxi データセットの Iceberg テーブルスキーマを定義しています。このテーブルには以下が含まれています。
タイムスタンプ列に day 変換を適用する際、Iceberg は階層的に日付ベースのパーティション分割を自動的に処理します。つまり、単一の day 変換で、年、月、日のレベルでパーティションプルーニングを可能にするため、各レベルに明示的な変換を必要としません。Iceberg のパーティション分割の詳細については、パーティション分割を参照してください。
- データの生成と挿入 – 次のコードはランダムなデータを生成し、テーブルに挿入します。この例では、ビジネスイベントやトランザクションを追跡するために、新しいレコードが継続的に追加される append-only のパターンを想定しています。
Dockerfile
をダウンロードします。これは、Lambda 関数のコンテナイメージを定義します。
requirements.txt
をダウンロードします。これは、Lmbmda 関数に必要な Python パッケージを定義しています。
この時点で、作業ディレクトリには次の 3 つのファイルが含まれているはずです。
- 環境変数を設定します。
を自分の AWS アカウント ID に置き換えてください:
- Docker イメージをビルドします:
- イメージにタグを設定します:
- AWS CloudFormation によって作成された ECR リポジトリにログインします:
- イメージを ECR リポジトリにプッシュします:
- Amazon ECR にプッシュしたコンテナイメージを使用して、Lambda 関数を作成します:
- 次のセクションで確認するため、少なくとも 5 回関数を呼び出して複数のスナップショットを作成してください。今回は手動で関数を呼び出してイベント駆動型のデータ取り込みをシミュレートしていますが、実際のシナリオでは Lambda 関数がイベント駆動型で自動的に呼び出されます。
ここまでで、Lambda 関数をデプロイして実行しました。この関数は、pyiceberg_lambda_blog_database
データベース内に nyc_yellow_table
Iceberg テーブルを作成します。また、このテーブルにサンプルデータを生成して挿入します。後の手順で、このテーブルのレコードを確認します。
コンテナを使用した Lambda 関数の構築に関する詳細情報は、コンテナイメージを使用した Lambda 関数の作成をご覧ください。
Jupyter を使った PyIceberg によるデータ探索
このセクションでは、Data Catalog に登録された Iceberg テーブルのデータにアクセスし、分析する方法を示します。PyIceberg を使用した Jupyter ノートブックから、Lambda 関数によって作成されたタクシー運行データにアクセスし、新しいレコードが到着するたびに作成される異なるスナップショットを検査します。また、重要なスナップショットにタグを付けて保持し、さらなる分析のために新しいテーブルを作成します。
SageMaker AI ノートブックインスタンスで Jupyter を使ってノートブックを開くには、次の手順を実行してください。
- SageMaker AI コンソールで、ナビゲーションペインから Notebooks を選択します。
- CloudFormation テンプレートを使用して作成したノートブックインスタンスの横にある Open JupyterLab を選択します。
- ノートブックをダウンロードし、SageMaker AI ノートブックの Jupyter 環境で開いてください。
- アップロードした
pyiceberg_notebook.ipynb
を開きます。 - カーネル選択のダイアログでは、デフォルトのオプションのままにして Select を選択します。
ここからは、セルを順番に実行してノートブックを進めていきます。
カタログへの接続とテーブルのスキャン
PyIceberg を使用して Iceberg テーブルにアクセスできます。次のコードは、AWS Glue Iceberg REST エンドポイントに接続し、pyiceberg_lambda_blog_database
データベース上の nyc_yellow_table
テーブルを読み込みます。
Iceberg テーブルからフルデータを Apache Arrow テーブルとしてクエリし、Pandas の DataFrame に変換できます。
スナップショットの操作
Iceberg の重要な機能の 1 つがスナップショットベースのバージョン管理です。スナップショットは、テーブルのデータに変更があるたびに自動的に作成されます。次の例のように、特定のスナップショットからデータを取得できます。
スナップショットに基づいて、任意の時点の過去のデータと現在のデータを比較できます。この場合、最新のテーブルとスナップショットテーブルの間のデータ分布の違いを比較しています。
スナップショットのタグ付け
特定のスナップショットに任意の名前を付けてタグ付けし、後でその名前で特定のスナップショットを取得できます。これは、重要なイベントのスナップショットを管理する際に便利です。
この例では、checkpointTag タグを指定してスナップショットへクエリをしています。ここでは polars を使用して、既存のカラム tpep_dropoff_datetime
と tpep_pickup_datetime
に基づいて trip_duration
という新しいカラムを追加することで、新しい DataFrame を作成しています。
処理済みの DataFrame から trip_duration
列を使って新しいテーブルを作成します。このステップは、将来の分析のためにデータを準備する方法を示しています。基になるテーブルが変更されていても、タグを使うことで、処理済みデータが参照しているデータのスナップショットを明示的に指定できます。
処理済みデータから作成した新しいテーブルを Athena でクエリし、Iceberg テーブルの相互運用性を実証しましょう。
Amazon Athena からのデータクエリ
- 前のセクションのノートブックから作成された
pyiceberg_lambda_blog_database.processed_nyc_yellow_table
テーブルを、Athena クエリエディタでクエリできます:
これらのステップを通して、Lambda と AWS Glue Iceberg REST エンドポイントを使用したサーバーレスのデータ処理ソリューションを構築し実際に利用する流れを体験しました。また、PyIceberg を使用してスナップショット管理やテーブル操作を含む Python によるデータ管理と分析を行いました。さらに、別のエンジンである Athena を使用してクエリを実行し、Iceberg テーブルの互換性を示しました。
クリーンアップ
この記事で使用したリソースをクリーンアップするには、次の手順を実行してください。
- Amazon ECR コンソールで、リポジトリ
pyiceberg-lambda-repository
に移動し、リポジトリ内のすべてのイメージを削除します。 - CloudShell で、作業ディレクトリ
pyiceberg_blog
を削除します。 - Amazon S3 コンソールで、CloudFormation テンプレートを使用して作成した S3 バケット
pyiceberg-lambda-blog--
に移動し、バケットを空にします。 - リポジトリとバケットが空になったことを確認したら、CloudFormation スタック
pyiceberg-lambda-blog-stack
を削除します。 - Docker イメージを使用して作成した Lambda 関数
pyiceberg-lambda-function
を削除します。
結論
この記事では、AWS Glue Data Catalog と PyIceberg を使用することで、堅牢なデータ管理機能を維持しながら、効率的で軽量なデータワークフローを実現できることを示しました。チームがインフラストラクチャの依存関係を最小限に抑えながら、Iceberg の強力な機能を活用できることを紹介しました。このアプローチにより、組織は分散コンピューティングリソースのセットアップや管理の複雑さなしに、すぐに Iceberg テーブルの利用を開始できます。
Iceberg の機能を低いハードルで導入しようとしている組織にとって、これは特に価値があります。PyIceberg の軽量な性質により、チームはすぐに Iceberg テーブルを使い始めることができ、慣れ親しんだツールを使用し、追加の学習を最小限に抑えることができます。データニーズが高まれば、同じ Iceberg テーブルを Athena や AWS Glue などの AWS 分析サービスからシームレスにアクセスでき、将来的なスケーラビリティへの明確なパスが提供されます。
PyIceberg と AWS の分析サービスの詳細については、PyIceberg のドキュメントとApache Iceberg とは?を参照することをお勧めします。
著者について
Sotaro Hikita は、AWS でアナリティクスに特化したスペシャリストソリューションアーキテクトで、ビッグデータ技術とオープンソースソフトウェアを扱っています。仕事以外では、いつも美味しい食べ物を探し求め、最近はピザに夢中になっています。
Shuhei Fukami は、AWS におけるアナリティクスに特化したスペシャリストソリューションアーキテクトです。趣味で料理をするのが好きで、最近はピザ作りにはまっています。