Amazon Web Services ブログ

AWS Lambda と AWS Glue Iceberg REST エンドポイントを使用した PyIceberg による軽量な分析環境の実現

本記事は、2025/5/9 に公開された Accelerate lightweight analytics using PyIceberg with AWS Lambda and an AWS Glue Iceberg REST endpoint を翻訳したものです。翻訳は Solutions Architect の深見が担当しました。

データインサイトに基づき決定を行う現代の組織にとって、効果的なデータ管理は、高度な分析と効率的な機械学習の利用を実現するための重要な要素です。データ利用のユースケースがより複雑になるにつれ、データエンジニアリングチームには、複数のデータソースやアプリケーション全体でのバージョン管理、増加するデータ量、スキーマ変更に対処するための高度なツールが必要になります。

Apache Iceberg は、データレイクで人気の選択肢となっています。ACID (原子性、一貫性、独立性、永続性) トランザクション、スキーマ進化、タイムトラベル機能を提供します。Iceberg テーブルは、Apache Spark や Trino などの様々な分散データ処理フレームワークからアクセスできるため、多様なデータ処理のニーズに対して柔軟なソリューションとなります。そのような Iceberg を扱うためのツールの中で、PyIceberg は分散コンピューティングリソースを必要とせずに、Python スクリプト上でテーブルのアクセスと管理を可能にします。

この投稿では、AWS Glue Data CatalogAWS Lambda と統合された PyIceberg が、直感的な Python インターフェースを通じて Iceberg の強力な機能を活用するための軽量なアプローチを提供する方法を示します。この統合により、チームはほとんどセットアップやインフラストラクチャの依存関係の設定を行わずとも Iceberg テーブルの操作や利用を開始できることを説明します。

PyIceberg の主要機能と利点

PyIceberg の主な利点の 1 つは、軽量であることです。分散コンピューティングフレームワークを必要とせず、チームは Python アプリケーションから直接テーブル操作を実行できるため、学習曲線が小さく、小規模から中規模のデータ探索と分析に適しています。さらに、PyIceberg は Pandas や Polars などの Python データ分析ライブラリと統合されているため、データユーザーは既存のスキルとワークフローを活用できます。

PyIceberg を Data Catalog と Amazon Simple Storage Service (Amazon S3) で使用すると、データチームはテーブルを完全にサーバーレスな環境で利用および管理できます。つまり、データチームはインフラストラクチャの管理ではなく、分析と洞察に集中することができます。

さらに、PyIceberg を通じて管理される Iceberg テーブルは、AWS のデータ分析サービスと互換性があります。PyIceberg は単⼀ノードで動作するため、⼤量のデータを扱う場合は性能に制限がありますが、Amazon AthenaAWS Glue などのサービスを使えば、同じテーブルをより大規模に効率的に処理できます。これにより、チームは PyIceberg を使って迅速な開発とテストを行い、その後、データ管理アプローチの一貫性を維持しながら、より大規模な処理エンジンを使った本番ワークロードにシームレスに移行できます。

代表的なユースケース

次のようなシナリオでは、PyIceberg が特に役立ちます:

  • データサイエンスの実験と特徴量エンジニアリング – データサイエンスでは、信頼できる効率的な分析とモデルを維持するために、実験の再現性が重要です。しかし、組織のデータが継続的に更新されるため、重要なビジネスイベント、モデル学習、一貫した参照のためのデータスナップショットを管理することが難しくなります。データサイエンティストは、タイムトラベル機能を使ってデータの過去のスナップショットを照会し、タグ付け機能を使って重要なバージョンを記録できます。PyIceberg を使えば、Pandas などの馴染みのあるツールを使って Python 環境でこれらの利点を得られます。Iceberg の ACID 特性のおかげで、テーブルが積極的に更新されている場合でも整合性を担保したデータアクセスが可能になります。
  • AWS Lambda によるサーバーレスデータ処理 – 組織は多くの場合、複雑なインフラストラクチャを管理せずに、データを効率的に処理し、分析テーブルを維持する必要があります。PyIceberg と Lambda を使えば、チームはサーバーレスな Lamnba 関数を使ってイベント駆動のデータ処理やスケジュールされたテーブル更新を構築できます。PyIceberg の軽量な性質は、サーバーレス環境に適しており、データ検証、変換、取り込みなどのシンプルなデータ処理タスクを可能にします。これらのテーブルは、さまざまな AWS サービスを通じて更新と分析の両方にアクセスできるため、チームはサーバーやクラスターを管理せずに効率的なデータパイプラインを構築できます。

PyIceberg を使用したイベント駆動のデータ取り込みと分析

このセクションでは、NYC yellow taxi trip dataを使用して、PyIceberg によるデータ処理と分析の実践的な例を探ります。リアルタイムのタクシー走行記録の処理をシミュレートするために、Lambda を使用してサンプルデータを Iceberg テーブルに挿入します。この例では、効率的なデータ取り込みと柔軟な分析機能を組み合わせることで、ワークフローをより合理的なものにする方法を示します。

チームが次のような複数の要件対応する必要がある場面を想像してください:

  • データ処理ソリューションは、中規模のデータセットを処理するケースで分散コンピューティングクラスターの管理の複雑さを避けるため、コストパフォーマンスが良く、メンテナンス性が高い必要があります。
  • アナリストは、Python のツールを使って柔軟なクエリと探索を行えるようにする必要があります。例えば、過去のスナップショットと現在のデータを比較して、時間の経過に伴うトレンドを分析する必要があるかもしれません。
  • ソリューションは、将来的により拡張性の高いものにする能力を持つ必要があります。

これらの要件に対処するため、PyIceberg で動作する Lambda によるデータ処理と Jupyter ノートブックによる分析を組み合わせたソリューションを実装します。このアプローチにより、データの整合性を維持しながら柔軟な分析ワークフローを可能にする、軽量でありながら堅牢なアーキテクチャが実現されます。ウォークスルーの最後では、Athena を使用してこのデータを照会し、複数の Iceberg 対応ツールとの互換性を示すとともに、このアーキテクチャのスケール性を示します。

大まかな手順は以下の通りです:

  1. Lambda を使用して、AWS Glue Iceberg REST エンドポイント経由で PyIceberg を利用し、サンプルの NYC yellow taxi trip data を Amazon S3 上の Iceberg テーブルに書き込みます。実際のシナリオでは、この Lambda 関数は Amazon Simple Queue Service (Amazon SQS) などのキューイングコンポーネントからのイベントによってトリガーされます。詳細は、Lambda と Amazon SQS の併用を参照してください。
  2. Jupyter ノートブックで AWS Glue Iceberg REST エンドポイントを経由して PyIceberg を使用し、テーブルデータを分析します。
  3. Athena を使用してデータをクエリし、Iceberg の柔軟性を実証します。

次の図はアーキテクチャを示しています。

Overall Architecture

このアーキテクチャを実装する際に重要になる点は、Lambda 関数がイベントによってトリガーされると、複数の同時実行が発生する可能性があることです。この同時実行により、Iceberg テーブルへの書き込み時にトランザクション競合が発生する可能性があります。これを処理するには、適切な再試行メカニズムを実装し、トランザクション分離レベルを慎重に管理する必要があります。Amazon SQS をイベントソースとして使用する場合は、SQS イベントソースの最大同時実行設定を使って同時実行を制御できます。

前提条件

このユースケースには、次の前提条件が必要です。

AWS CloudFormation によるリソースのセットアップ

次の CloudFormation テンプレートを使用して、以下のリソースをセットアップできます:

次の手順に従ってリソースをデプロイしてください。

  1. Launch stack を選択します。

  1. スタックのパラメータでは、データベース名として pyiceberg_lambda_blog_database がデフォルトで設定されています。デフォルト値を変更することもできます。データベース名を変更した場合は、以降のすべてのステップで pyiceberg_lambda_blog_database を選択した名前に置き換えることを忘れないでください。次に、次へを選択します。
  2. 次へを選択します。
  3. AWS CloudFormation によって IAM リソースがカスタム名で作成される場合があることを承認しますを選択します。
  4. 送信を選択します。

Lambda 関数の構築と実行

PyIceberg を使って着信レコードを処理する Lambda 関数を構築しましょう。この関数は、Data Catalog の pyiceberg_lambda_blog_database データベース内に nyc_yellow_table という Iceberg テーブルが存在しない場合に新規でテーブルを作成します。次に、サンプルの NYC yellow taxi trip data を生成して、レコードをシミュレートし、nyc_yellow_table に挿入します。

この例では、この関数を手動で呼び出していますが、実際のシナリオでは、この Lambda 関数は Amazon SQS からのメッセージなどの実際のイベントによってトリガーされます。実際のユースケースを実装する際は、関数コードをイベントデータを受け取り、要件に基づいて処理するように変更する必要があります。

コンテナイメージをデプロイパッケージとして使用して、この関数をデプロイします。ここではコンテナイメージから Lambda 関数を作成するために、CloudShell でイメージをビルドし、ECR リポジトリにプッシュします。以下の手順を実行してください。

  1. AWS マネジメントコンソールにサインインし、CloudShell を起動します。
  2. 作業ディレクトリを作成します。
mkdir pyiceberg_blog 
 cd pyiceberg_blog
Bash
  1. Lambda スクリプト lambda_function.py をダウンロードします。
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-5013/lambda_function.py .
Bash

このスクリプトは以下のタスクを実行します:

  • Data Catalog に NYC yellow taxi trip data のスキーマを持つ Iceberg テーブルを作成します
  • ランダムな NYC yellow taxi trip data にもとづくデータセットを生成します
  • このデータをテーブルに挿入します

この Lambda 関数の重要な部分を掘り下げてみましょう:

  • Iceberg カタログの構成 – 次のコードは、AWS Glue Iceberg REST エンドポイントに接続する Iceberg カタログを定義しています:
# Configure the catalog 
 catalog_properties = {
   "type": "rest",
   "uri": f"https://glue.{region}.amazonaws.com/iceberg",
   "s3.region": region,
   "rest.sigv4-enabled": "true",
   "rest.signing-name": "glue",
   "rest.signing-region": region 
}
 catalog = load_catalog(**catalog_properties)
Python
  • テーブルスキーマ定義 – 次のコードは、NYC taxi データセットの Iceberg テーブルスキーマを定義しています。このテーブルには以下が含まれています。
    • Schema で定義されたスキーマのカラム
    • vendoridtpep_pickup_datetime による パーティショニング (PartitionSpec を使用)
    • tpep_pickup_datetime に適用された day 変換
    • tpep_pickup_datetimetpep_dropoff_datetime による ソートオーダー

タイムスタンプ列に day 変換を適用する際、Iceberg は階層的に日付ベースのパーティション分割を自動的に処理します。つまり、単一の day 変換で、年、月、日のレベルでパーティションプルーニングを可能にするため、各レベルに明示的な変換を必要としません。Iceberg のパーティション分割の詳細については、パーティション分割を参照してください。

# Table Definition 
 schema = Schema(
    NestedField(field_id=1, name="vendorid", field_type=LongType(), required=False),
    NestedField(field_id=2, name="tpep_pickup_datetime", field_type=TimestampType(), required=False),
    NestedField(field_id=3, name="tpep_dropoff_datetime", field_type=TimestampType(), required=False),
    NestedField(field_id=4, name="passenger_count", field_type=LongType(), required=False),
    NestedField(field_id=5, name="trip_distance", field_type=DoubleType(), required=False),
    NestedField(field_id=6, name="ratecodeid", field_type=LongType(), required=False),
    NestedField(field_id=7, name="store_and_fwd_flag", field_type=StringType(), required=False),
    NestedField(field_id=8, name="pulocationid", field_type=LongType(), required=False),
    NestedField(field_id=9, name="dolocationid", field_type=LongType(), required=False),
    NestedField(field_id=10, name="payment_type", field_type=LongType(), required=False),
    NestedField(field_id=11, name="fare_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=12, name="extra", field_type=DoubleType(), required=False),
    NestedField(field_id=13, name="mta_tax", field_type=DoubleType(), required=False),
    NestedField(field_id=14, name="tip_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=15, name="tolls_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=16, name="improvement_surcharge", field_type=DoubleType(), required=False),
    NestedField(field_id=17, name="total_amount", field_type=DoubleType(), required=False),
    NestedField(field_id=18, name="congestion_surcharge", field_type=DoubleType(), required=False),
    NestedField(field_id=19, name="airport_fee", field_type=DoubleType(), required=False),
)

# Define partition spec 
 partition_spec = PartitionSpec(
    PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="vendorid_idenitty"),
    PartitionField(source_id=2, field_id=1002, transform=DayTransform(), name="tpep_pickup_day"),
)

# Define sort order 
 sort_order = SortOrder(
    SortField(source_id=2, transform=DayTransform()),
    SortField(source_id=3, transform=DayTransform())
)

 database_name = os.environ.get('GLUE_DATABASE_NAME')
 table_name = os.environ.get('ICEBERG_TABLE_NAME')
 identifier = f"{database_name}.{table_name}"

# Create the table if it doesn't exist 
 location = f"s3://pyiceberg-lambda-blog-{account_id}-{region}/{database_name}/{table_name}"
 if not catalog.table_exists(identifier):
    table = catalog.create_table(
        identifier=identifier,
        schema=schema,
        location=location,
        partition_spec=partition_spec,
        sort_order=sort_order 
    )
 else:
    table = catalog.load_table(identifier=identifier)
Python
  • データの生成と挿入 – 次のコードはランダムなデータを生成し、テーブルに挿入します。この例では、ビジネスイベントやトランザクションを追跡するために、新しいレコードが継続的に追加される append-only のパターンを想定しています。
# Generate random data 
 records = generate_random_data()
# Convert to Arrow Table 
 df = pa.Table.from_pylist(records)
# Write data using PyIceberg 
 table.append(df)
Python
  1. Dockerfile をダウンロードします。これは、Lambda 関数のコンテナイメージを定義します。
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-5013/Dockerfile .
Bash
  1. requirements.txt をダウンロードします。これは、Lmbmda 関数に必要な Python パッケージを定義しています。
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-5013/requirements.txt .
Bash

この時点で、作業ディレクトリには次の 3 つのファイルが含まれているはずです。

  • Dockerfile
  • lambda_function.py
  • requirements.txt
  1. 環境変数を設定します。 を自分の AWS アカウント ID に置き換えてください:
export AWS_ACCOUNT_ID=<account_id>
Bash
  1. Docker イメージをビルドします:
docker build --provenance=false -t localhost/pyiceberg-lambda .

# Confirm built image 
 docker images | grep pyiceberg-lambda
Bash
  1. イメージにタグを設定します:
docker tag localhost/pyiceberg-lambda:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/pyiceberg-lambda-repository:latest
Bash
  1. AWS CloudFormation によって作成された ECR リポジトリにログインします:
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
Bash
  1. イメージを ECR リポジトリにプッシュします:
docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/pyiceberg-lambda-repository:latest
Bash
  1. Amazon ECR にプッシュしたコンテナイメージを使用して、Lambda 関数を作成します:
aws lambda create-function \ 
--function-name pyiceberg-lambda-function \ 
--package-type Image \ 
--code ImageUri=${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/pyiceberg-lambda-repository:latest \ 
--role arn:aws:iam::${AWS_ACCOUNT_ID}:role/pyiceberg-lambda-function-role-${AWS_REGION} \ 
--environment "Variables={ICEBERG_TABLE_NAME=nyc_yellow_table, GLUE_DATABASE_NAME=pyiceberg_lambda_blog_database}" \ 
--region ${AWS_REGION} \ 
--timeout 60 \ 
--memory-size 1024
Bash
  1. 次のセクションで確認するため、少なくとも 5 回関数を呼び出して複数のスナップショットを作成してください。今回は手動で関数を呼び出してイベント駆動型のデータ取り込みをシミュレートしていますが、実際のシナリオでは Lambda 関数がイベント駆動型で自動的に呼び出されます。
aws lambda invoke \ 
--function-name arn:aws:lambda:${AWS_REGION}:${AWS_ACCOUNT_ID}:function:pyiceberg-lambda-function \ 
--log-type Tail \ 
 outputfile.txt \ 
--query 'LogResult' | tr -d '"' | base64 -d
Bash

ここまでで、Lambda 関数をデプロイして実行しました。この関数は、pyiceberg_lambda_blog_database データベース内に nyc_yellow_table Iceberg テーブルを作成します。また、このテーブルにサンプルデータを生成して挿入します。後の手順で、このテーブルのレコードを確認します。

コンテナを使用した Lambda 関数の構築に関する詳細情報は、コンテナイメージを使用した Lambda 関数の作成をご覧ください。

Jupyter を使った PyIceberg によるデータ探索

このセクションでは、Data Catalog に登録された Iceberg テーブルのデータにアクセスし、分析する方法を示します。PyIceberg を使用した Jupyter ノートブックから、Lambda 関数によって作成されたタクシー運行データにアクセスし、新しいレコードが到着するたびに作成される異なるスナップショットを検査します。また、重要なスナップショットにタグを付けて保持し、さらなる分析のために新しいテーブルを作成します。

SageMaker AI ノートブックインスタンスで Jupyter を使ってノートブックを開くには、次の手順を実行してください。

  1. SageMaker AI コンソールで、ナビゲーションペインから Notebooks を選択します。
  2. CloudFormation テンプレートを使用して作成したノートブックインスタンスの横にある Open JupyterLab を選択します。

notebook list

  1. ノートブックをダウンロードし、SageMaker AI ノートブックの Jupyter 環境で開いてください。

upload notebook

  1. アップロードした pyiceberg_notebook.ipynb を開きます。
  2. カーネル選択のダイアログでは、デフォルトのオプションのままにして Select を選択します。

select kernel

ここからは、セルを順番に実行してノートブックを進めていきます。

カタログへの接続とテーブルのスキャン

PyIceberg を使用して Iceberg テーブルにアクセスできます。次のコードは、AWS Glue Iceberg REST エンドポイントに接続し、pyiceberg_lambda_blog_database データベース上の nyc_yellow_table テーブルを読み込みます。

import pyarrow as pa 
 from pyiceberg.catalog import load_catalog 
 import boto3 

# Set AWS region 
 sts = boto3.client('sts')
 region = sts._client_config.region_name 

# Configure catalog connection properties 
 catalog_properties = {
    "type": "rest",
    "uri": f"https://glue.{region}.amazonaws.com/iceberg",
    "s3.region": region,
    "rest.sigv4-enabled": "true",
    "rest.signing-name": "glue",
    "rest.signing-region": region 
}

# Specify database and table names 
 database_name = "pyiceberg_lambda_blog_database"
 table_name = "nyc_yellow_table"

# Load catalog and get table 
 catalog = load_catalog(**catalog_properties)
 table = catalog.load_table(f"{database_name}.{table_name}")
Python

Iceberg テーブルからフルデータを Apache Arrow テーブルとしてクエリし、Pandas の DataFrame に変換できます。

scan table

スナップショットの操作

Iceberg の重要な機能の 1 つがスナップショットベースのバージョン管理です。スナップショットは、テーブルのデータに変更があるたびに自動的に作成されます。次の例のように、特定のスナップショットからデータを取得できます。

working with snapshots

# Get data from a specific snapshot ID 
 snapshot_id = snapshots.to_pandas()["snapshot_id"][3] 
 snapshot_pa_table = table.scan(snapshot_id=snapshot_id).to_arrow()
 snapshot_df = snapshot_pa_table.to_pandas()
Python

スナップショットに基づいて、任意の時点の過去のデータと現在のデータを比較できます。この場合、最新のテーブルとスナップショットテーブルの間のデータ分布の違いを比較しています。

# Compare the distribution of total_amount in the specified snapshot and the latest data.
 import matplotlib.pyplot as plt 

 plt.figure(figsize=(4, 3))
 df['total_amount'].hist(bins=30, density=True, label="latest", alpha=0.5)
 snapshot_df['total_amount'].hist(bins=30, density=True, label="snapshot", alpha=0.5)
 plt.title('Distribution of total_amount')
 plt.xlabel('total_amount')
 plt.ylabel('relative Frequency')
 plt.legend()
 plt.show()
Python

matplotlib graph

スナップショットのタグ付け

特定のスナップショットに任意の名前を付けてタグ付けし、後でその名前で特定のスナップショットを取得できます。これは、重要なイベントのスナップショットを管理する際に便利です。

この例では、checkpointTag タグを指定してスナップショットへクエリをしています。ここでは polars を使用して、既存のカラム tpep_dropoff_datetimetpep_pickup_datetime に基づいて trip_duration という新しいカラムを追加することで、新しい DataFrame を作成しています。

# retrive tagged snapshot table as polars data frame 
 import polars as pl 

# Get snapshot id from tag name 
 df = table.inspect.refs().to_pandas()
 filtered_df = df[df["name"] == tag_name] 
 tag_snapshot_id = filtered_df["snapshot_id"].iloc[0] 

# Scan Table based on the snapshot id 
 tag_pa_table = table.scan(snapshot_id=tag_snapshot_id).to_arrow()
 tag_df = pl.from_arrow(tag_pa_table)

# Process the data adding a new column "trip_duration" from check point snapshot.
 def preprocess_data(df):
    df = df.select(["vendorid", "tpep_pickup_datetime", "tpep_dropoff_datetime", 
                    "passenger_count", "trip_distance", "fare_amount"])
    df = df.with_columns(
        ((pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime"))
         .dt.total_seconds() // 60).alias("trip_duration"))
    return df 

 processed_df = preprocess_data(tag_df)
 display(processed_df)
 print(processed_df["trip_duration"].describe())
Python

processed-df

処理済みの DataFrame から trip_duration 列を使って新しいテーブルを作成します。このステップは、将来の分析のためにデータを準備する方法を示しています。基になるテーブルが変更されていても、タグを使うことで、処理済みデータが参照しているデータのスナップショットを明示的に指定できます。

# write processed data to new iceberg table 
 account_id = sts.get_caller_identity()["Account"] 

 new_table_name = "processed_" + table_name 
 location = f"s3://pyiceberg-lambda-blog-{account_id}-{region}/{database_name}/{new_table_name}"

 pa_new_table = processed_df.to_arrow()
 schema = pa_new_table.schema 
 identifier = f"{database_name}.{new_table_name}"

 new_table = catalog.create_table(
                identifier=identifier,
                schema=schema,
                location=location 
            )
            
# show new table's schema 
 print(new_table.schema())
# insert processed data to new table 
 new_table.append(pa_new_table)
Python

処理済みデータから作成した新しいテーブルを Athena でクエリし、Iceberg テーブルの相互運用性を実証しましょう。

Amazon Athena からのデータクエリ

  1. 前のセクションのノートブックから作成された pyiceberg_lambda_blog_database.processed_nyc_yellow_table テーブルを、Athena クエリエディタでクエリできます:
SELECT * FROM "pyiceberg_lambda_blog_database"."processed_nyc_yellow_table" limit 10 ;
SQL

query with athena

これらのステップを通して、Lambda と AWS Glue Iceberg REST エンドポイントを使用したサーバーレスのデータ処理ソリューションを構築し実際に利用する流れを体験しました。また、PyIceberg を使用してスナップショット管理やテーブル操作を含む Python によるデータ管理と分析を行いました。さらに、別のエンジンである Athena を使用してクエリを実行し、Iceberg テーブルの互換性を示しました。

クリーンアップ

この記事で使用したリソースをクリーンアップするには、次の手順を実行してください。

  1. Amazon ECR コンソールで、リポジトリ pyiceberg-lambda-repository に移動し、リポジトリ内のすべてのイメージを削除します。
  2. CloudShell で、作業ディレクトリ pyiceberg_blog を削除します。
  3. Amazon S3 コンソールで、CloudFormation テンプレートを使用して作成した S3 バケット pyiceberg-lambda-blog-- に移動し、バケットを空にします。
  4. リポジトリとバケットが空になったことを確認したら、CloudFormation スタック pyiceberg-lambda-blog-stack を削除します。
  5. Docker イメージを使用して作成した Lambda 関数 pyiceberg-lambda-function を削除します。

結論

この記事では、AWS Glue Data Catalog と PyIceberg を使用することで、堅牢なデータ管理機能を維持しながら、効率的で軽量なデータワークフローを実現できることを示しました。チームがインフラストラクチャの依存関係を最小限に抑えながら、Iceberg の強力な機能を活用できることを紹介しました。このアプローチにより、組織は分散コンピューティングリソースのセットアップや管理の複雑さなしに、すぐに Iceberg テーブルの利用を開始できます。

Iceberg の機能を低いハードルで導入しようとしている組織にとって、これは特に価値があります。PyIceberg の軽量な性質により、チームはすぐに Iceberg テーブルを使い始めることができ、慣れ親しんだツールを使用し、追加の学習を最小限に抑えることができます。データニーズが高まれば、同じ Iceberg テーブルを Athena や AWS Glue などの AWS 分析サービスからシームレスにアクセスでき、将来的なスケーラビリティへの明確なパスが提供されます。

PyIceberg と AWS の分析サービスの詳細については、PyIceberg のドキュメントApache Iceberg とは?を参照することをお勧めします。

著者について

Sotaro Hikita は、AWS でアナリティクスに特化したスペシャリストソリューションアーキテクトで、ビッグデータ技術とオープンソースソフトウェアを扱っています。仕事以外では、いつも美味しい食べ物を探し求め、最近はピザに夢中になっています。

Shuhei Fukami は、AWS におけるアナリティクスに特化したスペシャリストソリューションアーキテクトです。趣味で料理をするのが好きで、最近はピザ作りにはまっています。