Amazon Web Services ブログ

AWS DMS と AWS Glue を使用して進行中のデータレイクの変更を読み込む

Amazon S3 にデータレイクを構築することは、組織に無数のメリットをもたらします。多様なデータソースへのアクセスし、ユニークな関係の決定、カスタマイズされたカスタマーエクスペリエンスを提供するための AI / ML モデルの構築、消費のための新しいデータセットのキュレーションの加速などを可能にします。ただし、オンプレミスであれ AWS であれ、運用データストアから継続的に変化する更新をキャプチャしてデータレイクに読み込むと、時間がかかり管理が困難になる可能性があります。

この投稿では、Oracle、SQL Server、PostgreSQL、MySQL などの定評があるデータベースソースから進行中の変更をデータレイクに読み込むソリューションをデプロイする方法を示します。このソリューションは、新規および変更されたデータを Amazon S3 にストリーミングします。また、適切なデータレイクオブジェクトを作成および更新し、設定したスケジュールに基づいてソースに似たデータのビューを提供します。その後、AWS Glue Data Catalog は、分析サービスが使用するために、新しく更新され重複除外されたデータを公開します。

ソリューションの概要

このソリューションを 2 つの AWS CloudFormation スタックに分割します。この投稿で参照する AWS CloudFormation テンプレートは、パブリックの S3 バケットからダウンロードすることもできますし、後で紹介するリンクを使用して起動することもできます。同様に、この投稿の後半で参照されている AWS Glue ジョブもダウンロードできます。

最初のスタックには、再利用可能なコンポーネントが含まれています。一度だけそれをデプロイする必要があります。以下の AWS リソースを起動します。

  • AWS Glue ジョブ: 未処理の S3 ファイルから重複排除され最適化された parquet ファイルへの読み込みプロセスのワークフローを管理します。
  • Amazon DynamoDB テーブル: それぞれのデータレイクテーブルのデータ読み込み状態を保持します。
  • IAM ロール: これらのサービスを実行して、S3 にアクセスします。このロールには、昇格された権限を持つポリシーが含まれています。これらのサービスにだけこのロールを割り当て、IAM ユーザーまたはグループには割り当てないでください。
  • AWS DMS レプリケーションインスタンス: レプリケーションタスクを実行し、AWS DMS を介して進行中の変更を移行します。

2 番目のスタックには、データレイクに取り込むソースごとにデプロイする必要があるオブジェクトが含まれています。以下の AWS リソースを起動します。

  • AWS DMS レプリケーションタスク: S3 バケットにデータを書き込む各テーブルおよびストリームのソースデータベースのトランザクションログから変更を読み取ります。
  • S3 バケット: 未処理の AWS DMS の初期ロードオブジェクトと更新オブジェクト、およびクエリに最適化されたデータレイクオブジェクトを保存します。
  • AWS Glue トリガー: AWS Glue ジョブをスケジュールします。
  • AWS Glue クローラー: AWS Glue Data Catalog をスケジュールに従って構築および更新します。

スタックパラメータ

AWS CloudFormation スタックでは、取り込みおよび変換のパイプラインを設定するためのパラメータを入力する必要があります。

  • DMS ソースデータベース設定: DB エンジン、サーバー、ポート、ユーザー、パスワードなど、DMS 接続オブジェクトに必要なデータベース接続設定です。
  • DMS タスク設定: レプリケーションインスタンスの ARN、テーブルフィルター、スキーマフィルター、AWS DMS S3 バケットの場所など、AWS DMS タスクに必要な設定です。テーブルフィルターとスキーマフィルターによって、レプリケーションタスクが同期するオブジェクトを選択できます。
  • データレイク設定: S3 データレイクの場所、データレイクデータベース名、実行スケジュールなど、スタックが AWS Glue ジョブとクローラーに渡す設定です。

デプロイ後

ソリューションをデプロイした後、AWS CloudFormation テンプレートは DMS レプリケーションタスクを開始し、DynamoDB コントローラテーブルにデータを入力します。DynamoDB コントローラテーブルを確認して更新するまで、データはデータレイクに伝達されません。

DynamoDB コンソールで、以下のフィールドを設定して、次のテーブルに示すデータロードプロセスをコントロールします。

フィールド 説明
ActiveFlag 必須です。 true に設定すると、このテーブルの読み込みが有効になります。
PrimaryKey 列名のコンマ区切りリストです。設定すると、AWS Glue ジョブは更新および削除のトランザクションの処理でこれらのフィールドを使用します。「null」に設定すると、AWS Glue ジョブは挿入だけを処理します。
PartitionKey 列名のコンマ区切りリストです。設定すると、AWS Glue ジョブはこれらのフィールドを使用して、S3 の出力ファイルを複数のサブフォルダに分割します。大きなテーブルを照会して処理する場合、パーティションは有益ですが、小さなテーブルでは複雑にする可能性があります。「null」に設定すると、AWS Glue ジョブはデータを 1 つのパーティションにだけ読み込みます。
LastFullLoadDate 最後のフルロードの日付です。AWS Glue ジョブは、この値を DMS で作成されたフルロードファイルの日付と比較します。このフィールドを以前の値に設定すると、AWS Glue はフルロードファイルを再処理します。
LastIncrementalFile 最後の増分ファイルのファイル名です。AWS Glue ジョブは、これを DMS で作成された新しい増分ファイルと比較します。このフィールドを以前の値に設定すると、AWS Glue はより大きな名前のファイルを再処理します。

この時点で、設定は完了です。次に予定されている間隔で、AWS Glue ジョブはすべての初期ファイルと増分ファイルを処理し、それらをデータレイクに読み込みます。次に予定されている AWS Glue クローラーの実行時に、AWS Glue はダウンストリーム分析アプリケーションで使用するためにテーブルを AWS Glue Data Catalog に読み込みます。

Amazon Athena および Amazon Redshift

これでパイプラインは自動的にテーブルを作成および更新します。Amazon Athena を使用している場合は、すぐにこれらのテーブルのクエリを開始できます。Amazon Redshift を使用している場合は、これらのテーブルを外部スキーマとして公開してクエリを開始できます。

これらのテーブルを直接分析することも、データウェアハウスに既に存在するテーブルに結合することも、あるいは抽出、変換、ロード (ETL) プロセスへの入力として使用することもできます。詳細については、Amazon Redshift Spectrum 用の外部スキーマの作成を参照してください。

AWS Lake Formation

この投稿の執筆時点では、AWS Lake Formation は発表されていますが、まだリリースはされていません。AWS Lake Formation を使用すると、安全なデータレイクを簡単に設定できます。このソリューションに Lake Formation を組み込むには、起動時に指定された S3 ロケーションを「データレイクストレージ」ロケーションとして追加し、Lake Formation を使用して IAM ユーザーに認証情報を提供します。

AWS Lake Formation を使用すると、ユーザー、グループ、バケットポリシーを介して S3 アクセスを許可する必要がなくなり、代わりにデータレイクへのアクセスを許可および監査するための集中型コンソールが提供されます。

主な機能

いくつかの AWS CloudFormation に組み込まれた主な機能により、このソリューションが可能になります。こうした機能を理解すると、この戦略を他の目的のために複製したり、ニーズに合わせてアプリケーションをカスタマイズしたりするのに役立ちます。

AWS DMS

  • 最初の AWS CloudFormation テンプレートは、AWS DMS レプリケーションインスタンスをデプロイします。2 番目の AWS CloudFormation テンプレートを起動する前に、レプリケーションインスタンスがオンプレミスデータソースに接続していることを確認します。
  • S3 ターゲットの AWS DMS エンドポイントには追加の接続属性があります: addColumnName=true。この属性は、出力ファイルに列ヘッダーを追加するように DMS に指示します。プロセスはこのヘッダーを使用して、parquet ファイルと AWS Glue Data Catalog のメタデータを構築します。
  • AWS DMS レプリケーションタスクが開始されると、初期ロードプロセスはファイルを次の場所に書き込みます: s3://<bucket>/<schema>/<table>/LOAD00000001.csv という名前の初期ロードに対して、テーブルごとに 1 つのファイルを書き込みます。<datetime>.csv という名前のデータの変更については、毎分 1 ファイルまで書き込みます。ロード処理では、これらのファイル名を使用して新しいデータを段階的に処理します。
  • AWS DMS データ変更キャプチャ (CDC) プロセスにより、データセット「Op」にさらにフィールドが追加されます。 このフィールドは、特定のキーに対する最後の操作を示します。変更検出ロジックは、DynamoDB テーブルに保存されているプライマリキーと共にこのフィールドを使用して、受信データに対して実行する操作を決定します。プロセスはこのフィールドをデータレイクに渡し、データを照会するときにそれを見ることができます。
  • AWS CloudFormation テンプレートは、以前に DMS を使用した場合はすでに配置されている可能性がある DMS 固有の 2 つのロール (DMS-CloudWatch-logs-role、DMS-VPC-role) をデプロイします。これらのロールが原因でスタックの構築に失敗した場合は、これらのロールをテンプレートから安全に削除できます。

AWS Glue

  • AWS Glue には、Python シェルと Apache Spark の 2 種類のジョブがあります。Python シェルジョブを使用すると、わずかなコンピューティングリソースとわずかなコストで小さなタスクを実行できます。Apache Spark ジョブを使用すると、分散処理フレームワークを使用して、コンピューティングやメモリを多く消費する中規模から大規模のタスクを実行できます。このソリューションは、Python シェルジョブを使用して、処理するファイルを決定し、DynamoDB テーブルの状態を維持します。また、データ処理と読み込みにも Spark ジョブを使用します。
  • リレーショナルデータベースから変更が送信されると、新しいトランザクションが特定のフォルダ内の新しいファイルとして表示されることがあります。このロードプロセスの動作により、すでに読み込まれているデータへの影響が最小限に抑えられます。これによってファイルサイズやクエリのパフォーマンスに矛盾が生じる場合は、圧縮 (ファイルマージ) プロセスを組み込むことを検討してください。
  • ジョブの実行の間に、AWS Glue シーケンスはトランザクションをファイル名と順序で同じプライマリキーに複製します (挿入、更新など)。最後のトランザクションを決定し、それを使用して影響を受けるオブジェクトを S3 に書き換えます。
  • 構成の設定により、Spark タイプの AWS Glue ジョブは最大 2 DPU の処理能力を実行できます。ロードジョブのパフォーマンスが不十分な場合は、この値を増やすことを検討してください。ジョブ DPU を増やすことは、パーティションキーで設定されたテーブル、または DMS プロセスが実行の間に複数のファイルを生成するときに最も効果的です。
  • 組織ですでに長期にわたって Amazon EMR クラスターが導入されている場合は、経費を最適化するために、AWS Glue ジョブを EMR クラスター内で実行されている Apache Spark ジョブに置き換えることを検討してください。

IAM

  • ソリューションは、DMSCDC_Execution_Role という名前の IAM ロールをデプロイします。ロールは AWS のサービスにアタッチされており、インラインポリシーだけでなく AWS 管理ポリシーにも関連付けられています。
  • ロールの AssumeRolePolicyDocument 信頼ドキュメントには、実行に必要な権限がジョブにあることを確認するために AWS Glue サービスと AWS DMS サービスにアタッチされる以下のポリシーが含まれています。AWS CloudFormation カスタムリソースも、AWS Lambda によってサポートされ、このロールを使用して環境を初期化します。
       Principal :
         Service :
           - lambda.amazonaws.com
           - glue.amazonaws.com
           - dms.amazonaws.com
       Action :
         - sts:AssumeRole
    
  • IAM ロールには、以下の AWS マネージドポリシーが含まれています。詳細については、マネージドポリシーとインラインポリシーを参照してください。
    ManagedPolicyArns:
         - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
         - arn:aws:iam::aws:policy/AmazonS3FullAccess
         - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
  • IAM ロールには、以下のインラインポリシーが含まれています。このポリシーには、Lambda ベースの AWS CloudFormation カスタムリソースを実行し、DynamoDB テーブルを初期化および管理し、DMS レプリケーションタスクを初期化するためのアクセス許可が含まれています。
       Action:
         - lambda:InvokeFunction
         - dynamodb:PutItem
         - dynamodb:CreateTable
         - dynamodb:UpdateItem
         - dynamodb:UpdateTable
         - dynamodb:GetItem
         - dynamodb:DescribeTable
         - iam:GetRole
         - iam:PassRole
         - dms:StartReplicationTask
         - dms:TestConnection
         - dms:StopReplicationTask
       Resource:
         - arn:aws:dynamodb:${AWS::Region}:${AWS::Account}:table/DMSCDC_*
         - arn:aws:lambda:${AWS::Region}:${AWS::Account}:function:DMSCDC_*
         - arn:aws:iam::${AWS::Account}:role/DMSCDC_*
         - arn:aws:dms:${AWS::Region}:${AWS::Account}:*:*"
       Action:
         - dms:DescribeConnections
         - dms:DescribeReplicationTasks
       Resource: '*'

サンプルデータベース

以下の例は、サンプルデータベースを使用してこのソリューションをデプロイした後に表示される内容を示しています。

サンプルデータベースには、product、store、productorder の 3 つのテーブルがあります。AWS CloudFormation テンプレートをデプロイすると、未処理の S3 バケットの各テーブル用に作成されたフォルダが表示されます。

各フォルダには、初期ロードファイルが含まれています。

テーブルリストは、DynamoDB テーブルを生成します。

これらのテーブルのアクティブフラグ、プライマリキー、パーティションキーの値を設定します。この例では、product テーブルと store テーブルにプライマリキーを設定して、確実に更新を処理しています。更新トランザクションは想定していないので、productorder テーブルのプライマリキーだけはそのままにしておきます。ただし、日付によってデータが確実にパーティション化されるようにパーティションキーを設定しました。

次に予定されている AWS Glue ジョブが実行されると、データレイク S3バケット内の各テーブルのフォルダが作成されます。

次に予定されている AWS Glue クローラーが実行されると、AWS Glue Data Catalog にこれらのテーブルが一覧表示されます。これで、Athena を使って照会することができます。

同様に、最初に外部データベースをカタログした後に、Amazon Redshift クラスタ内からデータレイクを照会することができます。

以後の AWS Glue ジョブの実行時に、プロセスは初期ファイルのタイムスタンプを DynamoDB テーブルの [LastFullLoadDate] フィールドと比較し、初期ファイルを再度処理する必要があるかどうかを判断します。また、新しい増分ファイル名と DynamoDB テーブルの [LastIncrementalFile] フィールドを比較して、増分ファイルを処理する必要があるかどうかを判断します。以下の例では、product テーブル用の新しい増分ファイルを作成しています。

ファイルを調べると、更新と削除という 2 つのトランザクションがあります。

AWS Glue ジョブが再度実行されると、DynamoDB テーブルが更新されて [LastIncrementalFile] の新しい値が一覧表示されます。

最後に、このソリューションは parquet ファイルを再処理します。データを照会して、更新されたレコードの新しい値を確認し、それが削除済みレコードを確実に削除するようにすることができます。

まとめ

この投稿では、トランザクションデータベースを AWS データレイクと迅速かつ簡単に同期できるようにする一連の AWS CloudFormation テンプレートを提供しました。AWS データレイクのデータを使用して、複数のデータソースからのデータの分析を実行し、機械学習モデルを構築し、データコンシューマ向けに豊富な分析を作成することができます。

ご質問またはご提案については、以下でコメントを残してください。

 


著者について

Rajiv Gupta は、アマゾン ウェブ サービスのデータウェアハウス専門のソリューションアーキテクトです