Amazon Web Services ブログ

AWS Lake Formation による効果的なデータレイクの構築 パート 2: ストリーミングデータソース用の governed table を作成する

本記事は Amazon Web Services, Senior Big Data Architect である 関山 宜孝 によって投稿されたものです。

AWS re: Invent 2021 で、Lake Formation のトランザクション、行レベルのセキュリティ、高速化の一般提供を発表しました。 このシリーズのパート 1 では、governed table を作成して、オブジェクトを追加する方法を説明しました。この記事では、この例を拡張し、Lake Formation トランザクションを使用して governed table にストリーミングデータを取り込む方法を示します。

一般的なストリーミングのユースケースでは、新しいデータが継続的にデータレイクに取り込まれます。これにより、クエリのパフォーマンスに影響する可能性のある多くの小さなファイルが作成されます。さらに一般的な要件として,ダウンタイムや長い待ち時間を伴わずに,更新処理と分離した形で,クエリ実行をしたいというものがあります。最後に、定期レポートや反復的な機械学習のトレーニングなどの一部のユースケースでは、特定日時に戻ってクエリを実行できることが必要です。
Lake Formation トランザクションは、処理中のクエリを分離しながら、データレイクにストリーミングデータセットを追加するシンプルなメカニズムを提供します。これにより、クエリをレイテンシーなしで同時に実行できます。Lake Formation アクセラレーションは、クエリのパフォーマンスを向上させるために、小さなファイルを自動的に大きなファイルへ集約します。
この記事では、Lake Formation トランザクションを使用してリアルタイムでストリーミングデータをデータレイクに取り込む方法を紹介します。

AWS Lake Formationによる効果的なデータレイクの構築:

アーキテクチャ

AWS CloudTrail は、AWS アカウントのガバナンス、コンプライアンス、運用およびリスク監査を可能にする AWS サービスです。ユーザー、ロール、または AWS サービスによって実行されたアクションは、CloudTrail にイベントとして記録されます。これらの CloudTrail イベントを Amazon Kinesis Data Streams に連携し 、AWS Glue ストリーミングジョブを使用してこのデータストリームを処理し、AWS Lake Formation トランザクションを使用してデータを Amazon Simple Storage Service (Amazon S3)に保存します。

次の図は、ソリューションのアーキテクチャを示しています。

AWS CloudFormation でリソースを準備する

この記事では、 すぐに開始できる AWS CloudFormation テンプレートを提供しています。ニーズに合わせて確認し、カスタマイズできます。このスタックによってデプロイされるリソースの一部は、使用時にコストが発生します。AWS マネジメントコンソールを使用してこれらのリソースを手動でセットアップする場合は、この記事の最後にある付録 1 の手順を参照してください。

CloudFormation テンプレートは、次のリソースを生成します:

  • AWS Lambda 関数 (Lambda-backed カスタムリソース)
  • S3 バケット
  • AWS Identity and Access Management(IAM)ユーザー、ロール、ポリシー
  • Lake Formation データレイクの設定とパーミッション
  • Kinesis Data Streams
  • Amazon EventBridge のルール
  • AWS Glue の ETL ジョブ

これらのリソースを作成するには、次の手順を実行します:

  1. us-east-1 リージョンで CloudFormation コンソールにサインインします。
  2. Launch Stack を選択します:
  3. Next(次へ) を選択します。
  4. DatalakeAdminUserNameDatalakeAdminUserPassword には、データレイク管理者ユーザーの IAM ユーザー名とパスワードを入力します。
  5. DataAnalystUserNameDataAnalystUserPassword には、データアナリストユーザーの IAM ユーザー名とパスワードを入力します。
  6. DataLakeBucketName に、データレイクバケットの名前を入力します。
  7. DatabaseName および TableName は、デフォルトのままにします。
  8. Next(次へ) を選択します。
  9. 次のページで、Next(次へ) を選択します。
  10. 最後のページの詳細を確認し、I acknowledge that AWS CloudFormation might create IAM resources(AWS CloudFormation によって IAM リソースがカスタム名で作成される場合があることを承認します。) を選択します。
  11. Create(スタックの作成)を選択します。

スタックの作成には約 3 分かかります。

governed table のセットアップ

これで、Lake Formation で governed table を作成して設定できます。

governed table の作成

governed table を作成するには、次の手順を実行します:

  1. DataLakeAdmin2 ユーザーを使用して、 us-east-1 リージョンの Lake Formation コンソールにサインインします。
  2. Tables を選択します。
  3. Create table を選択します。
  4. Name に、 cloudtrail_governed と入力します。
  5. Databaseに、 lakeformation_tutorial_cloudTrail と入力します。
  6. Enable governed data access and management を選択します。
  7. Data is located in で、Specified path in my account を選択します。
  8. s3://<datalake-bucket>/data/cloudtrail_governed/ のパスを入力します。
  9. Classification PARQUET を選択します。
  10. Upload Schema を選択します。
  11. テキストボックスに次の JSON 配列を入力します:
[
    {
        "Name": "eventversion",
        "Type": "string"
    },
    {
        "Name": "eventtime",
        "Type": "string"
    },
    {
        "Name": "awsregion",
        "Type": "string"
    },
    {
        "Name": "sourceipaddress",
        "Type": "string"
    },
    {
        "Name": "useragent",
        "Type": "string"
    },
    {
        "Name": "errorcode",
        "Type": "string"
    },
    {
        "Name": "errormessage",
        "Type": "string"
    },
    {
        "Name": "requestid",
        "Type": "string"
    },
    {
        "Name": "eventid",
        "Type": "string"
    },
    {
        "Name": "eventtype",
        "Type": "string"
    },
    {
        "Name": "apiversion",
        "Type": "string"
    },
    {
        "Name": "readonly",
        "Type": "string"
    },
    {
        "Name": "recipientaccountid",
        "Type": "string"
    },
    {
        "Name": "sharedeventid",
        "Type": "string"
    },
    {
        "Name": "vpcendpointid",
        "Type": "string"
    }
]
JSON
  1. Upload を選択します。

eventsourceeventname の2つのパーティション列を追加します。

  1. Add column を選択します。
  2. Column name に、 eventsource と入力します。
  3. Data type で、String を選択します。
  4. Partition Key を選択します。
  5. Add を選択します。
  6. Add column を選択します。
  7. Column name に、 eventname と入力します 。
  8. Data typeで、String を選択します。
  9. Partition Key を選択します。
  10. Add を選択します。
  11. Submit を選択します。

Lake Formation のパーミッションの設定

governed table に対して、Lake Formation のパーミッションを設定する必要があります。 以下の手順を実行します:

データのパーミッション

  1. DatalakeAdmin2 ユーザーを使用して、 us-east-1 リージョンの Lake Formation コンソールにサインインします 。
  2. PermissionsData permissions を選択します。
  3. Data permissions で、Grant を選択します。
  4. PrincipalsIAM users and roles を選択し、GlueETLServiceRole-<CloudFormation stack name>LFRegisterLocationServiceRole-<CloudFormation stack name>というロール、および DatalakeAdmin2 というユーザーを選択します。
  5. Policy tags or catalog resources で、Named data catalog resources を選択します。
  6. Database で、lakeformation_tutorial_cloudtrail を選択します。
  7. Table で、cloudtrail_governed を選択します。
  8. Table permissions で、Select, Insert, Delete, Describe, Alter, Drop を選択します。
  9. Data permissions で、All data access を選択します。
  10. Grant を選択します。
  11. PermissionsData permissions を選択します。
  12. Data permission で、Grant を選択します。
  13. Principals で、IAM users and roles を選択し、ユーザー DataAnalyst2 を選択します。
  14. Policy tags or catalog resources で、Named data catalog resources を選択します。
  15. Database で、lakeformation_tutorial_cloudtrail を選択します。
  16. Table で、cloudtrail_governed を選択します。
  17. Table permissions で、SelectDescribe を選択します。
  18. Data permissions で、All data access を選択します。
  19. Grant を選択します。

データロケーションのパーミッション

  1. Permissions で、Data locations を選択します。
  2. Data locations で、Grant を選択します。
  3. IAM users and roles で、ロール GlueETLServiceRole-<CloudFormation stack name> を選択します。
  4. Storage location に、 s3://<datalake-bucket>/ と入力します。
  5. Grant を選択します。

governed table へオブジェクトを追加する

既存のテーブル(JDBC や Amazon S3 など)から governed table にデータをコピーする場合は、標準の AWS Glue ジョブを実行してデータをロードすることをお勧めします。一方、ストリーミングソース(Kinesis または Kafka)から governed table にデータをストリーミングする場合は、AWS Glue ストリーミングジョブを実行してデータをロードすることをお勧めします。

governed table のテーブルロケーションにデータを配置しても、データはまだ認識されません。governed table にデータを認識させるには、Lake Formation トランザクションを使用して、データを含むファイルを governed table に追加する必要があります。

S3 オブジェクトを governed table に追加するには、 UpdateTableObjects API を呼び出します。 AWS コマンドラインインターフェイス(AWS CLI)と SDK、および AWS Glue ETL ライブラリ(ライブラリからAPIを呼び出します)を使用して呼び出すことができます。この記事では、AWS Glue ストリーミングジョブを使用して、 AWS Glue ETL ライブラリで CloudTrail のデータを Kinesis Data Streams から governed table に取り込み、Amazon S3 にオブジェクトを配置し、S3 オブジェクトの UpdateTableObjects API を呼び出します。CloudFormation テンプレートで自動的に作成される AWS Glue ジョブ cloudtrail_ingestion-<CloudFormation stack name> を実行してみましょう。

  1. AWS Glue コンソールで、Jobs (ジョブ) を選択します。
  2. cloudtrail_ingestion-<CloudFormation stack name> という名前のジョブを選択します。
  3. Actions (アクション) メニューの Run job (jジョブの実行) を選択します。

このジョブは、データストリームからデータを読み取り、100 秒間隔で governed table cloudtrail_governed に書き出します。数分後、新しいファイルがデータレイクバケットに書き込まれていることがわかります。

Amazon Athena を使用してgoverned tableのクエリを実行する

これですべての準備が完了です! Amazon Athena を使用して、governed table のクエリを開始しましょう。

Athena でクエリを初めて実行する場合は、クエリの結果が置かれる場所を設定する必要があります。詳細は、「クエリー結果の場所の指定」を参照してください。

Lake Formation のプレビュー機能を使用するには、 AmazonAthenaLakeFormation という名前の特別なワークグループを作成し、そのワークグループに参加する必要があります。詳細は、「ワークグループの管理」を参照してください。

シンプルなクエリの実行

DataAnalyst2 ユーザーを使用して、 us-east-1 リージョンの Athena コンソールにサインインします。まず、governed table に格納されている 10 個のレコードをクエリして、テーブルをプレビューしてみましょう。

SELECT * 
FROM lakeformation_tutorial_cloudtrail.cloudtrail_governed
LIMIT 10
Code

次のスクリーンショットは、クエリを実行した結果を示しています。

分析クエリの実行

次に、集計を使用する分析クエリを実行して、実際のユースケースを再現しましょう:

SELECT count (*) as TotalEvents, eventsource, eventname
FROM lakeformation_tutorial_cloudtrail.cloudtrail_governed
GROUP BY eventsource, eventname
ORDER BY TotalEvents desc
Code

次のスクリーンショットは、クエリを実行した結果を示しています。このクエリは、 eventsource および eventname ごとの API 呼び出しの合計を返します。

タイムトラベルを使用した分析クエリの実行

governed table を使用すると、特定のタイムスタンプに戻り、そのタイムスタンプに対してクエリを実行できます。タイムスタンプを含む governed table に対して Athena クエリを実行して、特定の日時のデータの状態をターゲットにすることができます。

Athena でタイムトラベルクエリを実行するには、Athena エンジンバージョン 2 を使用する必要があります。ワークグループがまだAthena エンジンのバージョン 1 を使用している場合は、Athena エンジンバージョン 2 を使用するようにワークグループを更新します

タイムトラベルクエリを送信するには、 次の構文の例のように、 SELECT 文でテーブル名の後ろに FOR SYSTEM_TIME AS OF timestamp を使用します:

SELECT * 
FROM database.table
FOR SYSTEM_TIME AS OF <timestamp>
SQL

timestamp パラメータは、タイムスタンプまたはタイムゾーン付きのタイムスタンプのいずれかです。タイムゾーンを指定しない場合、Athena は UTC 時間のタイムスタンプとして値を扱います。

5分前の時点のデータを取得するには、以下のようなタイムトラベルクエリを実行します:

SELECT count (*) as TotalEvents, eventsource, eventname
FROM lakeformation_tutorial_cloudtrail.cloudtrail_governed
FOR SYSTEM_TIME AS OF (current_timestampinterval '5' minute)
GROUP BY eventsource, eventname
ORDER BY TotalEvents desc
SQL

次のスクリーンショットは、クエリの実行結果を示しています。この結果は古いタイムスタンプを指しているため、 TotalEvents の数は前の結果よりも少なくなります。

governed table による自動コンパクション

governed table は、データの格納方法を監視し、小さいファイルを大きなファイルに圧縮することで自動的に最適化します。コンパクションジョブは非同期に実行され、新しい大きなファイルを生成して S3 のテーブルロケーションに追加します。その結果、クエリのパフォーマンスが向上し、時間が経過しても governed table の最適な動作が維持されます。詳細については、Lake Formation 開発者ガイドを参照してください。

現在、自動コンパクションは Apache Parquet 形式でデータを格納する governed table のみをサポートしています。テーブルでコンパクションが有効になっているかどうかを確認するには、以下のコマンドを使用します:

$ aws lakeformation list-table-storage-optimizers --database-name lakeformation_tutorial_cloudtrail --table-name cloudtrail_governed

次のようなレスポンスが表示されます:

{"StorageOptimizerList": [{"StorageOptimizerType": "compaction",
  "Config": {"is_enabled": "true"},
  "LastRunDetails": "Status=completed, RunTime=177685, StartTime=2021-10-29T20:29:12.251Z, CompactedFiles=1200, CompactedBytes=30553968"},
  .....
]}

コンパクションの is_enabled は true に設定する必要があります。コンパクションが実行されている場合は、前回の実行の詳細も表示されます。前回の実行が行われた時刻と、圧縮されたデータの量が表示されます。コンソールで Lake Formation コンソールのテーブルを見ると、コンパクションのステータスを確認することもできます。
コンパクションが無効になっている場合は、次のコマンドを実行してコンパクションを有効にできます。

$ aws lakeformation update-table-storage-optimizer --database-name lakeformation_tutorial_cloudtrail --table-name cloudtrail_governed --storage-optimizer-config '{"compaction": {"is_enabled":"true"}}'

コンパクションの実行が成功すると、以下のようなレスポンスが表示されます。

{"Result": "true"}

コンパクションを実行すると、多数の小さなファイルが読み込まれ、少数の大きなファイルに圧縮されます。S3 を確認すると、小さいファイルは削除されていないことがわかります。これは、コンパクション・ジョブがトランザクション内で実行され、小さいファイルを削除して大きいファイルを追加するよう、テーブルマニフェストファイルが更新さるからです。以下を実行すると、テーブルマニフェストに登録されているオブジェクトを表示できます:

$ aws lakeformation get-table-objects --database-name lakeformation_tutorial_cloudtrail --table-name cloudtrail_governed --query-as-of-time <current timestamp>

クリーンアップ

最後に、リソースをクリーンアップします。

  1. Amazon S3 データレイクバケットを空にしてから、バケットを削除します。
  2. CloudFormation スタックを削除します。作成した governed table も、スタックの削除時に自動的に削除されます。
  3. Athenaのワークグループ AmazonAthenaLakeFormation を削除します。

まとめ

この記事では、Lake Formation の governed table を作成し、 EventBridge から送信される CloudTrail のデータを Kinesis Data Streams 経由で追加する方法について説明しました。さらに、governed table に対してクエリを行う方法と、governed table に対してタイムトラベルクエリを実行する方法についても説明しました。Lake Formation で governed table を使用すると、トランザクション、行レベルのセキュリティ、クエリアクセラレーションを実現できます。シリーズのパート 3 では、 governed table で ACID トランザクションを使用して、複数の操作をアトミックに実行し、操作間の分離を実現する方法について説明します。


付録 1: コンソールを使用してリソースを準備する

IAM ロールと IAM ユーザーの設定

まず、2 つの IAM ロールを設定する必要があります。1 つは AWS Glue ETL ジョブ用、もう 1 つは Lake Formation のデータレイクロケーション用です。

IAM ポリシー

IAM ロールで利用する IAM ポリシーを作成するには、以下の手順を実行します:

  1. IAM コンソールで、Amazon S3 の新しいポリシーを作成します
  2. IAM ポリシーを S3DataLakePolicy として次のように保存します( <datalake-bucket> をバケット名に置き換えてください):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<datalake-bucket>/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::<datalake-bucket>/*"
                ]
            }
        ]
    }
    
    
    JSON
  3. 次のステートメントを使用して、 LFTransactionPolicy という名前の新しい IAM ポリシーを作成します:
    {
        "Veion": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:DescribeTransaction",
                    "lakeformation:ListTransactions",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }
    
    JSON
  4. 次のステートメントを使用して、 LFLocationPolicy という名前の新しい IAM ポリシーを作成します:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFtransactions",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:GetTableObjects",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }
    
    JSON
  5. 次のステートメントを使用して、 LFQueryPolicy という名前の新しい IAM ポリシーを作成します:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFtransactions",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:StartTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:CancelTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:PlanQuery",
                    "lakeformation:StartQueryPlanning",
                    "lakeformation:GetTableObjects",
                    "lakeformation:GetQueryState",
                    "lakeformation:GetWorkUnits",
                    "lakeformation:Execute",
                    "lakeformation:GetWorkUnitResults"
                ],
                "Resource": "*"
            }
        ]
    }
    JSON

AWS Glue ETL ジョブの IAM ロール

AWS Glue ETL ジョブを実行する新しい IAM ロールを作成します。

  1. IAM コンソールで、AWS Glue との信頼関係を設定した GlueETLServiceRole という名前のロールを作成します:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "glue.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    JSON
  2. 次の AWS 管理ポリシーをアタッチします:
    1. AWSGlueServiceRole
    2. AWSLakeFormationDataAdmin
    3. AmazonKinesisReadOnlyAccess
  3. 次のカスタマー管理ポリシーをアタッチします:
    1. S3DataLakePolicy
    2. LFTransactionPolicy

AWS Lake Formationのデータレイクロケーション用の IAM ロール

Lake Formation の IAM ロールを作成するには、以下の手順を実行します:

  1. LFRegisterLocationServiceRole という新しい IAM ロールを作成し、Lake Formation との信頼関係を設定します:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    JSON
  2. 次のカスタマー管理ポリシーをアタッチします:
    1. S3DataLakePolicy
    2. LFLocationPolicy

このロールは、ロケーションを Lake Formation に登録するために使用されます。このロールは、Athena がクエリを実行する時に 認証情報として利用されます。

IAM ユーザー

IAM ユーザーを作成するには、以下の手順を実行します:

  1. DatalakeAdmin2 という名前の IAM ユーザーを作成します
  2. 次の AWS 管理ポリシーをアタッチします:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. IAMReadOnlyAccess
  3. カスタマー管理ポリシー LFQueryPolicy をアタッチします。
  4. Athena を使用してデータをクエリできる DataAnalyst2 という名前の IAM ユーザーを作成します。
  5. AWS 管理ポリシー AmazonAthenaFullAccess をアタッチします。
  6. カスタマー管理ポリシー LFQueryPolicy をアタッチします。

Kinesis Data Streams の設定

データストリームを作成するには、次の手順を実行します:

  1. Kinesis Data Streams コンソールで、Create data stream (データストリームの作成) を選択します。
  2. Data stream name (データストリーム名)に、 cloudtrail と入力します。
  3. Number of open shards (開いているシャードの数) に 2 と入力します。
  4. Create data stream (データストリームの作成) を選択します。

EventBridgeの設定

この記事では、CloudTrail API ログをキャプチャし、Kinesis Data Streams に連携する EventBridge ルールを設定しました。

  1. EventBridge コンソールで、Rules (ルール) を選択します。
  2. Create rule (ルールを作成) を選択します。
  3. Name (名前) に、 cloudtrail-to-kinesis と入力します。
  4. Event pattern (イベントパターン) を選択します。
  5. Event matching pattern (イベント一致パターン) で、Custom pattern (カスタムパターン) を選択します。
  6. Event pattern (イベントパターン) に、次の JSON を入力します(<account-id> を AWS アカウント ID に置き換えてください)。
{
  "detail-type": [
    "AWS API Call via CloudTrail"
  ],
  "account": [
    "<account-id>"
  ]
}
JSON
  1. Target (ターゲット) で、Kinesis stream (Kinesis ストリーム) を選択します。
  2. Stream (ストリーム)cloudtrail を選択します。
  3. Configure input (入力の設定) で、Part of the matched event (一致したイベントの一部) を選択します。
  4. $.detail と入力します。
  5. Create (作成) を選択します。

Glue Data Catalog でのデータストリーム用テーブルの作成

このステップでは、AWS Glue Data Catalog でデータストリーム用テーブルを作成します。

  1. AWS Glue コンソールで、Table (テーブル) を選択します。
  2. Add tables (テーブルの追加) を選択します。
  3. Add table manually (手動でのテーブルの追加) を選択します。
  4. Table name (テーブル名) に、 cloudtrail_kinesis と入力します。
  5. Database (データベース) に、 lakeformation_tutorial_cloudtrail と入力します。
  6. Next (次へ) を選択します。
  7. Select the type of source で、Kinesis を選択します。
  8. Kinesis stream namecloudtrail を選択します。
  9. Region は、us-east-1 を選択します。
  10. Next (次へ) を選択します。
  11. Classification (分類) で JSON を選択します。
  12. Next (次へ) を選択します。
  13. Add column (列の追加) を選択します。
  14. Column name (列名) に、 eventversion と入力します。
  15. Column type (列のタイプ) で、string を選択します。
  16. Add (追加) を選択します。
  17. 以下の列について、手順 13-16 を繰り返します。
    1. Column name: useridentity, Column type: struct, StructSchema(Structスキーマ)STRUCT<type:STRING,principalid:STRING,arn:STRING,accountid:STRING,invokedby:STRING,accesskeyid:STRING,userName:STRING,sessioncontext:STRUCT<attributes:STRUCT<mfaauthenticated:STRING,creationdate:STRING>,sessionissuer:STRUCT<type:STRING,principalId:STRING,arn:STRING,accountId:STRING,userName:STRING>>>
    2. Column nameeventtimeColumn type: string
    3. Column nameeventsourceColumn type: string
    4. Column nameeventnameColumn typestring
    5. Column namesourceipaddressColumn typestring
    6. Column nameuseragentColumn typestring
    7. Column nameerrorcodeColumn typestring
    8. Column nameerrormessageColumn typestring
    9. Column namerequestparametersColumn typestring
    10. Column nameresponseelementsColumn typestring
    11. Column nameadditionaleventdataColumn typestring
    12. Column namerequestidColumn typestring
    13. Column nameeventidColumn typestring
    14. Column nameresourcesColumn typearray, ArraySchema(Arrayスキーマ)ARRAY<STRUCT<ARN:STRING,accountId:STRING,type:STRING>>
    15. Column nameeventtypeColumn typestring
    16. Column nameapiversionColumn typestring
    17. Column namereadonlyColumn typeboolean
    18. Column namerecipientaccountidColumn typestring
    19. Column nameserviceeventdetailsColumn typestring
    20. Column namesharedeventidColumn typestring
    21. Column namevpcendpointidColumn typestring
  18. Next (次へ) ボタンをクリックします。
  19. Finish (完了) ボタンをクリックします。

Lake Formation の設定

Lake Formation のパーミッションを次の手順で設定します:

データレイク設定

  1. Lake Formation コンソールの Permissions で、Administrative roles and tasks を選択します。
  2. Data lake administrators セクションで、Choose administrators を選択します。
  3. IAM users and roles に、IAM ユーザー DatalakeAdmin2 を選択します。
  4. Save を選択します。
  5. Database creators セクションで、Grant を選択します。
  6. IAM users and roles に、 LFRegisterLocationServiceRole を選択します。
  7. Create Database を選択します。
  8. Grant を選択します。

Data lake ロケーション

  1. Register and ingest から、Data lake locations を選択します。
  2. Register location を選択します。
  3. Amazon S3 paths3://<datalake-bucket>/と入力します。これは、 LFLocationPolicy にリストしたバケットと同じである必要があります 。 Lake Formation はこのロールを使用して、バケットとその下にあるすべてのプレフィックスへの読み取り/書き込みアクセスを必要とするクエリサービスに一時的な Amazon S3 認証情報を付与します。
  4. IAM role に、 LFRegisterLocationServiceRole を選択します。
  5. Register location を選択します。

Data catalog 設定

  1. Data catalog から、Settings を選択します。
  2. Use only IAM access control for new databasesUse only IAM access control for new tables in new databases両方のチェックボックスがオフになっていることを確認します 。
  3. Data catalog で、Databases を選択します。
  4. Create database を選択します。
  5. Database を選択します。
  6. Name に、 lakeformation_tutorial_cloudtrail と入力します。
  7. Create database を選択します。

データベースレベルのパーミッション

  1. Permissions から Data permissions を選択します。
  2. Data permissions で、Grant を選択します。
  3. Principals で、IAM users and roles を選択し、ロール GlueETLServiceRole を選択します。
  4. Policy tags or catalog resource で、Named data catalog resources を選択します。
  5. Database で、lakeformation_tutorial_cloudtrail を選択します。
  6. Database permissions を選択します。
  7. Database permissions で、Create Table, Alter, Drop, Describe を選択します
  8. Grant を選択します。

以上により、 GlueETLServiceRole ロールに対して、governed table を作成するのに必要なデータベース内のテーブル作成と変更の権限が付与されます。

テーブルレベルのパーミッション

  1. PermissionsData permissions を選択します。
  2. Data permission から、Grant を選択します。
  3. Principals で、IAM users and roles を選択し、ロール GlueETLServiceRole を選択します。
  4. Policy tags or catalog resources で、Named data catalog resources を選択します。
  5. Database で、lakeformation_tutorial_cloudtrail を選択します。
  6. Table で、 cloudtrail_kinesis を選択します。
  7. Table permissions を選択します。
  8. Table permissions で、SelectDescribe を選択します。
  9. Data permissions で、All data access を選択します。
  10. Grant を選択します。

AWS Glue ストリーミングジョブを作成する

AWS Glue ストリーミングジョブを作成するには、次の手順を実行します:

  1. AWS Glue コンソールで、Jobs (ジョブ) を選択します。
  2. Add job (ジョブの追加) を選択します。
  3. Name (名前) に、 cloudtrail_ingestion と入力します。
  4. IAM role (IAM ロール) に、 GlueETLServiceRole を選択します。
  5. Type で、Spark Streaming を選択します。
  6. Glue version に、Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0) を選択します。
  7. This job runs (このジョブの実行) で、A new script authored by you (ユーザーが作成する新しいスクリプト) を選択します。
  8. S3 path where the script is stored (スクリプトが保存されている S3 パス)に、s3://aws-glue-scripts-<account-id>-us-east-1/lakeformation_tutorial_cloudtrail と入力します。
  9. Temporary directory (一時ディレクトリ) に、 s3://aws-glue-temporary-<account-id>-us-east-1/lakeformation_tutorial_cloudtrail と入力します。
  10. Security configuration, script libraries, job parameters (optional) (セキュリティ設定、スクリプトライブラリおよびジョブパラメータ (任意)) で、Job parameters (ジョブパラメータ)に次の項目を指定します。
    1. --datalake_bucket_name:<datalake-bucket>
    2. --database_name: lakeformation_tutorial_cloudtrail
    3. --src_table_name: cloudtrail_kinesis
    4. --dst_table_name: cloudtrail_governed
  11. Next (次へ) を選択します。
  12. Save job and edit script (ジョブを保存してスクリプトを編集する)を選択します。(ここでは接続を選択しないでください)
  13. 次のコードを入力します:
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame

""" Job Parameters ---------- datalake_bucket_name : string S3 bucket name for storing a governed table database_name : string A Lake Formation database which stores a governed table and the source Kinesis table src_table_name : string A Glue table name for a Kinesis Data Stream dst_table_name : string A Lake Formation governed table name """
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'datalake_bucket_name', 'database_name', 'src_table_name', 'dst_table_name'])
datalake_bucket_name = args["datalake_bucket_name"]
database_name = args["database_name"]
src_table_name = args["src_table_name"]
dst_table_name = args["dst_table_name"]

sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)


    def transaction_write(context, dfc) -> DynamicFrameCollection:
    """Write DynamicFrame into a governed table using Lake Formation transactions It is called in a processBatch method per a batch, so it makes one transaction per a batch. """
    dynamic_frame = dfc.select(list(dfc.keys())[0])
    tx_id = context.begin_transaction(read_only=False)
    sink = context.getSink(
        connection_type="s3", 
        path=f"s3://{datalake_bucket_name}/data/{dst_table_name}/",
        partitionKeys=["eventsource", "eventname"],
        transactionId=tx_id
    )
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName=dst_table_name)
    try:
        sink.writeFrame(dynamic_frame)
        context.commit_transaction(tx_id)
    except Exception:
        context.abort_transaction(tx_id)
        raise 
    return dfc


# Create Spark DataFrame from the source Kinesis table
data_frame_kinesis = glue_context.create_data_frame.from_catalog(
    database=database_name,
    table_name=src_table_name,
    transformation_ctx="data_frame_kinesis",
    additional_options={
        "startingPosition": "TRIM_HORIZON", 
        "inferSchema": "true"
    }
)

# Applying a mapping to drop useridentity and resources column because struct and array are not supported yet.
cloudtrail_mappings = [
    ("eventversion", "string", "eventversion", "string"), 
    ("eventtime", "string", "eventtime", "string"), 
    ("eventsource", "string", "eventsource", "string"), 
    ("eventname", "string", "eventname", "string"), 
    ("awsregion", "string", "awsregion", "string"), 
    ("sourceipaddress", "string", "sourceipaddress", "string"), 
    ("useragent", "string", "useragent", "string"), 
    ("errorcode", "string", "errorcode", "string"), 
    ("errormessage", "string", "errormessage", "string"), 
    ("requestid", "string", "requestid", "string"), 
    ("eventid", "string", "eventid", "string"), 
    ("eventtype", "string", "eventtype", "string"), 
    ("apiversion", "string", "apiversion", "string"), 
    ("readonly", "boolean", "readonly", "string"), 
    ("recipientaccountid", "string", "recipientaccountid", "string"), 
    ("sharedeventid", "string", "sharedeventid", "string"), 
    ("vpcendpointid", "string", "vpcendpointid", "string")
]


def processBatch(data_frame, batchId):
    """Process each batch triggered by Spark Structured Streaming. It is called per a batch in order to read DataFrame, apply the mapping, and call TransactionWrite method """
    if data_frame.count() > 0:
        dynamic_frame = DynamicFrame.fromDF(data_frame, glue_context, "from_data_frame")
        dynamic_frame_apply_mapping = ApplyMapping.apply(
            frame=dynamic_frame,
            mappings=cloudtrail_mappings,
            transformation_ctx="apply_mapping"
        )
        dynamic_frame_collection = transaction_write(
            glue_context,
            DynamicFrameCollection({"dynamic_frame": dynamic_frame_apply_mapping}, glue_context)
        )

# Read from the DataFrame coming via Kinesis, and run processBatch method for batches in every 100 seconds 
glue_context.forEachBatch(
    frame=data_frame_kinesis,
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds", 
        "checkpointLocation": f"{args['TempDir']}/checkpoint/{database_name}/{src_table_name}/"
    }
)
job.commit()
Python
  1. Save (保存) を選択します。
  2. Run (実行)を選択します。

付録 2: 大規模なデータレイクをシミュレートするために、CloudTrail のデータを増やす設定をする

CloudTrail でより多くのイベントを生成し、より大きなデータレイクをシミュレートするには、2 つの方法があります:

  • CloudTrail で S3 データイベントを有効にする
  • イベントが多い他のアカウントで EventBridge イベントバスをセットアップする

原文はこちらです。
本ブログは Solutions Architect の宮田が翻訳しました。