Amazon Web Services ブログ

AWS Lake Formation による効果的なデータレイクの構築 パート 3: governed table の ACID トランザクションを使用する

本記事は Amazon Web Services, Senior Big Data Architect である 関山 宜孝 によって投稿されたものです。
Amazon Simple Storage Service(Amazon S3)のデータレイクは、あらゆるエンタープライズデータを扱うデフォルトのリポジトリになり、さまざまな分析ツールや ML ツールからクエリを実行する多くのユーザーにとって一般的な選択肢になっています。多くの場合、複数のソースからデータをデータレイクに継続的に取り込み、同時に多くの分析ツールからデータレイクに対してクエリを実行します。以前は、一貫性のある結果を得るためには、データの整合性を維持するためにカスタムパイプラインを構築し、その結果としてツールにデータが使用可能になるまでに遅延が発生していました。
AWS re: Invent 2021 で、Lake Formation のトランザクション、行レベルのセキュリティ、高速化の一般提供を発表しました。以前の記事では、Lake Formation の governed table を設定 ( part1) し、データレイクへの取り込みをストリーミングすることに焦点を当てました ( part2)。この記事では、原子性、一貫性、分離性、耐久性(ACID)トランザクションに焦点を当てています。S3 上の Lake Formation governed table で ACID トランザクションがどのように機能するかを説明します。

AWS Lake Formation を使用した効果的なデータレイク

Lake Formation における ACID トランザクション

AWS Lake Formation の ACID トランザクションを使用すると、新しいデータを継続的に取り込んで S3のデータを更新し続けながら、同時に一貫性のある最新の結果を返す分析クエリを実行することが容易になります。Lake Formation ACID トランザクションはスナップショット・アイソレーションにより、並行トランザクションで一貫したバージョンのデータベースを参照できます。これらのバージョンは互いに分離され、互いに影響を与えずに同時に実行されます。ACID トランザクションを使用すると、複数のユーザーが同時にアトミックな方法で信頼性高く S3 オブジェクトを追加あるいは削除をしながら、データレイクに対するクエリの読み取り整合性を保つことで、既存のクエリを分離できます。

Lake Formation では、S3 オブジェクトの ACID トランザクションをサポートする governed table と呼ばれる新しいテーブルタイプが導入されています。トランザクションは複数のオペレーションを含めることができ、複数の governed table にまたがることができます。さらに、トランザクションは、複数のツールを使用している複数のユーザーから実行できます。トランザクション内のオペレーションが失敗すると、トランザクション全体がロールバックされます。進行中のトランザクションは、別のトランザクションによって実行された変更から分離されます。トランザクションは、S3 のメタデータとデータの一貫したビューを常に表示できるようにすることで、governed table の整合性を保護します。

ACID トランザクションでの読み取りの一貫性について、次の例を考えてみましょう。ユーザー Andy が Amazon Athena で、テーブルのすべてのパーティションをスキャンして売上数を集計するクエリを開始するとします。Andy のクエリのスキャンが進行中であり、まだ完了していない間に、別のユーザー Betty が同じテーブルにパーティションを追加するとします。Andy がトランザクション内でクエリを実行した場合、クエリ開始時のテーブルの状態を反映したクエリ結果が表示されます。クエリの実行結果には、Betty が新しく追加したパーティションのデータは含まれません。

環境の準備

この記事では、AWS Glue DynamicFrames を使用して、新しい governed table を作成し、governed table を読み書きする方法を示します。Jupyter Notebook Editor に表示されるコード例を ETL スクリプトにコピーし、AWS Glue を使用して実行できます。DynamicFrames を使用しているため、このコードは Glue 以外の環境では実行されないことに注意してください。

このデモの設定は、複数の製品を販売する架空の e コマースです。ACID トランザクションを説明するために、2 つのノートブックを使用します。両方のノートブックが products テーブルを更新して読み取ります。Notebook1 は products テーブルを作成し、そのテーブルが Notebook2 によって読み込まれ、変更されます。同じトランザクション内で、Notebook1 が products テーブルを読み取ると、Notebook2 による変更から分離されています。Notebook2 がトランザクションを完了した後に、Notebook1 が新しいトランザクションを開始すると、変更されたデータを取得できます。

環境に必要なリソースは以下のとおりです。

  • AWS Identity and Access Management (IAM) ユーザー、ロール、ポリシー
  • Lake Formation データレイクの設定とパーミッション
  • Glue 環境でコードを実行できる JupyterLab 環境

IAM ユーザー、ロール、ポリシーの設定については、このブログ記事の最後に記載されています。

JupyterLab ノートブックでサンプルノートブックをセットアップする

ご利用の環境に同じコードを複製するために、2 つの Jupyter ノートブックを提供しています。

  1. DatalakeAdmin3 ユーザーを使用して、同じリージョンで Amazon SageMaker コンソールにサインインします。
  2. Terminal(ターミナル)を選択します。
  3. ターミナルで次のコマンドを実行します:
    $ cd /home/ec2-user/SageMaker/
    $ curl 'https://aws-bigdata-blog.s3.amazonaws.com/artifacts/lakeformation_acid_transactions/notebook1.ipynb' -o notebook1.ipynb
    $ curl 'https://aws-bigdata-blog.s3.amazonaws.com/artifacts/lakeformation_acid_transactions/notebook2.ipynb' -o notebook2.ipynb
    Bash

これで、JupyterLab の左ペインに、これら 2 つの新しいノートブックが表示されます。

 

Jupyter ノートブックで ACID トランザクションを実行する

ノートブックで Lake Formation のトランザクションがどのように機能するかを見てみましょう。

  1. notebook1.ipynb のタブを開きます。このノートブックは ETL ジョブをシミュレートし、会社カタログに製品を作成します。
  2. 最初のセルで、datalake-bucket を CloudFormation テンプレートの DataLakeBucketName セクションで使用したバケット名に置き換えます。
  3. 初期化するための最初のセルを実行します。

最初のセルが完了すると、Spark アプリケーションの情報が表示されます。

  1. 最初に必要な手順は、governed table を作成することです

  1. Lake Formation コンソールで、Tables(テーブル)を選択します。

Lake Formation コンソールで products テーブルを確認できます。

  1. Create a DynamicFrame from sample data の下のセルを実行して、Spark DataFrame としてサンプルデータを作成し、それを AWS Glue DynamicFrame に変換します。

これで、TelevisionUSB chargerBlender の 3 つの製品レコードを含む DynamicFrame のサンプルを見ることができます。

  1. Start a new transaction の下のセルを実行して、 start_transaction API を呼び出して、新しい Lake Formation トランザクションを開始します。

トランザクション ID が表示されます。このトランザクション ID は、データの書き込みに使用します。

  1. Write the DynamicFrame into a Governed table の下のセルを実行して、サンプルの DynamicFrame を新しい Lake Formation governed table に書き込み、トランザクションをコミットします。

  1. Amazon S3 コンソールで、データレイクバケットを選択します。

新しいデータが Amazon S3 に取り込まれていることがわかります。

  1. JupyterLab に戻り、Get transaction ID on notebook 1 の下のセルを実行して、別のトランザクションを開始します。
  2. Read original table from notebook1 の下のセルを実行して、governed table から読み取ります。

トランザクション ID を使用して、governed table から読み取られているデータを確認できます。

次に、別のノートブックを開いてセルを実行して、他ユーザーからクエリ実行をシミュレートしてみましょう。

  1. notebook2.ipynb のブラウザタブを開きます。
  2. 最初のセルで、datalake-bucket を CloudFormation テンプレートの DataLakeBucketName セクションで使用したバケット名に置き換えます。
  3. 初期化のために最初のセルを実行します。
  4. Get transaction ID on notebook 2 の下のセルを実行して、新しい Lake Formation トランザクションを開始します。これにより、notebook2 はすべてのオペレーションで新しいトランザクション ID を使用するようになります。
  5. Read original table from notebook 2 の下のセルを実行して、notebook1.ipynb で作成した governed table から読み取ります。この読み取りでは、notebook 1 で使用されているものとは異なる新しい トランザクション ID を使用します。

notebook1 で作成した governed table から読み取られているデータが確認できます。

  1. Notebook 2 – Write 2 additional rows の下の 2 つのセルを実行して、2 つの新しいレコード (product_id=00004および product_id=00005) を governed table に書き込みます。

このトランザクションはまだコミットされていないことに注意してください。

  1. 次のセルを実行して、まだコミットされていない同じトランザクション ID を使用して governed table から読み取ります。

レコードの作成時に使用したトランザクション ID と同じトランザクション ID を使用しているため、コミットされていない変更 (product_id=00004 および product_id=00005) を確認できます。

次に、notebook1.ipynb に移動して、コミットされていない変更が他のユーザーのノートブックでどのように見えるかを見てみましょう。

  1. notebook1.ipynb のブラウザタブに切り替えます。
  2. No change – waiting for notebook 2 の下のセルを実行して、2 つの新しいレコードがまだ表示されないことを確認します。

これは、notebook2.ipynb で実行したトランザクションがコミットされていないためです。

それでは、notebook2.ipynb に移動して進行中のトランザクションをコミットして、コミット後の notebook1.ipynb でどのように見えるかを見てみましょう。

  1. notebook2.ipynb のブラウザタブに切り替えます。
  2. Commit notebook 2 updates の下のセルを実行して、notebook2.ipynb の変更をコミットします。
  3. notebook1.ipynb のブラウザタブに切り替えます。
  4. Notebook 2 committed but still notebook 1 pointing original transaction ID の下のセルを実行して、ノートブック 2 が作成した新しいレコードがまだ表示されていないことを確認します。

これは、データを読み込むときに元のトランザクション ID txId2 を使用しているからです。こうすることで、このトランザクション外で発生する変更から分離されます。

  1. Notebook 1 gets new transaction ID which reflects the changes from notebook 2 の下に続くセルを実行して、 txId2 をコミットし、別の新しいトランザクション txId3 を開始し、Notebook 2 でコミットされたデータを読み取ります。

これで、コミットされた変更を確認できます。

  1. Notebook 2 のブラウザタブに切り替えます。
  2. Both notebooks using the most recent transaction ID の下のセルを実行して、新たなトランザクションを開始し、データを読み取ります。

これで、最新のトランザクション ID を使用すると、両方のノートブックに最新のデータが表示されていることが確認できます。

クリーンアップ

最後のステップとして、リソースをクリーンアップします。

  1. Amazon S3 データレイクバケットを空にして、バケットを削除します。
  2. CloudFormation スタックを削除します。

CloudFormation スタックを削除すると、作成した governed table も自動的に削除されます。

まとめ

この記事では、Lake Formation トランザクションの使用方法について説明し、同じテーブルの読み取りと書き込みを同時に行う 2 つのアプリケーション間の原子性と分離性について説明しました。Lake Formation トランザクションを使用すると、複数のテーブルやステートメントで ACID トランザクションを実現でき、一貫性のある最新の結果を得ることができます。


付録: コンソールを使用したリソースのセットアップ

IAM ロールとユーザーの設定

まず、2 つの IAM ロールを設定する必要があります。1 つは AWS Glue ETL ジョブ用、もう 1 つは Lake Formation データレイクのロケーション用です。

IAM ポリシー

ロールで使用する IAM ポリシーを作成するには、次の手順を実行します:

  1. IAM コンソールで、 Amazon S3 の新しいポリシーを作成します
  2. 以下の手順に従って、IAM ポリシーを S3DataLakePolicy として保存します (<datalake-bucket> をバケット名に置き換えてください):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<datalake-bucket>/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::<datalake-bucket>/*"
                ]
            }
        ]
    }
    JSON
  3. 次のステートメントを使用して、LFTransactionPolicy という名前の新しい IAM ポリシーを作成します:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:DescribeTransaction",
                    "lakeformation:ListTransactions",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }
    JSON
  4. 次のステートメントを使用して、LFLocationPolicy という名前の新しい IAM ポリシーを作成します:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:GetTableObjects",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            },
            {
                "Action": [
                    "glue:GetDatabase",
                    "glue:GetDatabases",
                    "glue:GetTableVersions",
                    "glue:GetPartitions",
                    "glue:GetTable",
                    "glue:GetTables",
                    "glue:UpdateTable"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }
    JSON
  5. 次のステートメントを使用して、LFQueryPolicywith という名前の新しい IAM ポリシーを作成します:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:StartQueryPlanning",
                    "lakeformation:GetTableObjects",
                    "lakeformation:GetQueryState",
                    "lakeformation:GetWorkUnits",
                    "lakeformation:GetWorkUnitResults"
                ],
                "Resource": "*"
            }
        ]
    }
    JSON

AWS Glue 開発エンドポイントの IAM ロール

AWS Glue 開発エンドポイント用の新しい IAM ロールを作成します:

  1. IAM コンソールで、AWS Glue との信頼関係を設定した GlueETLServiceRole という名前のロールを作成します:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "glue.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    JSON
  2. 次の AWS 管理ポリシーをアタッチします:
    1. AWSGlueServiceRole
    2. AWSLakeFormationDataAdmin
  3. 次のカスタマー管理ポリシーをアタッチします:
    1. S3DataLakePolicy
    2. LFTransactionPolicy

AWS Lake Formation のデータレイクロケーション用の IAM ロール

Lake Formation の IAM ロールを作成するには、次の手順を実行します:

  1. LFRegisterLocationServiceRolewith という新しい IAM ロールを作成し、Lake Formationとの信頼関係を設定します:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    JSON
  2. 次のカスタマー管理ポリシーをアタッチします:
    1. S3DataLakePolicy
    2. LFLocationPolicy

IAM ユーザー

IAM ユーザーを作成するには、次の手順を実行します:

  1. DatalakeAdmin3 という名前の IAM ユーザーを作成します
  2. 次の AWS 管理ポリシーをアタッチします:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. AWSGlueConsoleFullAccess
    4. AWSGlueConsoleSageMakerNotebookFullAccess
    5. IAMReadOnlyAccess
  3. カスタマー管理ポリシー LFQueryPolicy をアタッチします。

原文はこちらです。
本ブログは Solutions Architect の宮田が翻訳しました。