Amazon Web Services ブログ

AWS Glue ジョブブックマークを使用して、さまざまなデータ取り込み頻度でデータを処理する

データの取り込み頻度がさまざまな複数のデータセットをマージする必要があるデータ処理の要件はよく見られます。これらのデータセットの一部は、一度にすべてが取り込まれ、受信頻度が低く、全体を通して常に使用されます。一方、その他のデータセットは増分で取り込まれ、一定の間隔で受信し、完全なデータセットと結合して出力を生成します。このような要件に対応するため、この投稿では、AWS Glue を使用して抽出、変換、ロード (ETL)パイプラインを構築する方法をご紹介します。

AWS Glue の使用

AWS Glue は、分析のために複数のソースから多数のデータセットを抽出、変換、およびロードするサーバーレス環境を提供します。AWS Glue には、スケジューリングされた間隔でジョブを再実行するときに増分データを処理するジョブブックマークと呼ばれる機能があります。ジョブブックマークは、ソース、変換、ターゲットなど、さまざまなジョブ要素の状態で構成されます。AWS Glue が古いデータを再度処理しないようにするジョブ実行からの状態情報を保持することで、これを実行します。

このユースケースでは、ジョブブックマークが有効になっている AWS Glue ジョブを使用して、さまざまな頻度で受信するファイル(一度に受信するファイルを示す完全なデータセットと、特定の定期的な間隔で受信するファイルを示す増分データセット)を処理します。これらのファイルは、まとめてマージされます。ジョブブックマークを有効にするだけでなく、AWS Glue PySpark 動的フレームでオプションのパラメータ transformation_ctx変換コンテキスト)も使用します。これは、ETL 演算子インスタンスの一意の識別子として機能するもので、特定の演算子のジョブブックマーク内の状態情報を識別します。AWS Glue は transformation_ctx を使用して、キーをブックマークの状態にインデックス付けします。

変換コンテキストを使用することで、増分データセットの状態情報を取得および維持し、再処理を回避できます。完全なデータセットファイルでは変換コンテキストが省略されるため、完全なデータセットに対するジョブ実行状態情報が取得されず、次の処理イベント全体で使用されます。AWS Glue ジョブレベルでジョブブックマークフラグが有効になっている場合でも、完全なデータセットの変換コンテキストが省略されるため、ジョブを実行するたびに、完全なデータセットのデータ全体がジョブの一部として使用されます。一方で、新しく到着したデータセットのみが増分データセットとして処理されます。

ソリューションの概要

AWS Glue のジョブブックマークのユーティリティをデモするのに、TLC 移動レコードデータのデータセットを使用します。増分データセットとしてニューヨーク市イエローキャブの移動データの月間ファイルを使用し、フルデータセットとしてニューヨーク市のタクシーゾーンルックアップを使用します。月間イエローキャブ移動データには、PULocationID (乗客を拾った場所)という名前のフィールドがあり、ニューヨーク市タクシーゾーンルックアップファイルの LocationID フィールドと結合して、ニューヨーク市のタクシーゾーンルックアップデータセットの BoroughZoneservice_zone と、月間のニューヨーク市タクシー移動データファイルのすべてのフィールド(PULocationID フィールドを除く)を含む出力データセットを作成します。

次の図は、処理のハイレベルアーキテクチャを示しています。

図の説明

  • 2 つの Amazon S3 Raw バケットの場所は、受信 CSV ソースデータ(ニューヨーク市タクシーの月間ファイル(増分データセット)とニューヨーク市タクシールックアップファイル(完全データセット))を格納するのに使用します。
  • ブックマークを有効にした Glue ジョブは、月間移動データファイルとタクシーゾーンルックアップファイル間のデータを結合して、出力 Parquet ファイルを作成し、Glue データカタログと Redshift データベースでニューヨーク市タクシー移動テーブルを作成します。
  • S3 選別バケットは、ニューヨーク市タクシーの毎月処理される Parquet ファイルを保存するのに使用します。

AWS CloudFormation スタックの作成

次の AWS CloudFormation テンプレートを使って、以下のリソースを優先 AWS アカウントとリージョンに作成します。

  • ネットワークリソース(VPC、サブネット、セキュリティグループ)
  • Amazon Redshift クラスター
  • Amazon Redshift クラスターにアクセスするためのプロキシとして使用する Amazon Elastic Compute Cloud (Amazon EC2) インスタンス
  • 必要なすべてのファイルが保存されている Amazon Simple Storage Service (Amazon S3) バケット
  • 通知用 Amazon Simple Notification Service (Amazon SNS) トピック
  • AWS Glue データカタログ
  • AWS Glue ジョブから Amazon Redshift クラスターに接続するためのAWS Glue JDBC 接続

加えて、スタックを起動するアカウントとリージョンで Amazon EC2 キーペアが作成されていることを確認してください。

スタックパラメータを提供するには、次の手順を実行します。

  1. [スタック名] で BigDataBlog-GlueBookmark-Stack と入力します。

  1. [RedshiftClusterIdentifier] で bigdatablogrscluster と入力します。
  2. [NodeType] で [ラージ] を選択します。
  3. [NumberOfNodes] で [2] を選択します。
  4. [DatabaseName] で bigdatablogdev と入力します。

  1. [MasterUserName] で bigdatabloguser と入力します。
  2. [MasterUserPassword] で、マスターユーザーアカウントのパスワードを入力します。
  3. [Maintenancewindow] で sun:05:00-sun:05:30 と入力します。
  4. [EC2InstanceType] で [マイクロ] を選択します。
  5. [SubscriptionEmail] で、ご希望のメールアドレスを入力します。
  6. [MyIPAddressCidr] で、IP アドレスを入力します。

https://www.whatismyip.com/ を参照し、My Public IPv4 is: の値を調べることで、IP アドレスを見つけることができます。末尾に /32 を追加して、CIDR 互換かつ最も制限的にします。

  1. [DestinationPrefixListId] で、プレフィックスリスト ID を入力します。

ID を見つけるには、コマンドプロンプトで aws configure と入力して AWS 認証情報を設定しますaws ec2 describe-prefix-lists を実行して、 出力から PrefixListNamecom.amazonaws.<<AWS region>>.s3PrefixListId を取得します。

  1. [NewS3BucketName] で、S3 バケットの名前を入力します。

  1. [gluedatabase] で bigdatabloggluedbと入力します。
  2. [EC2KeyName] で、キーペアの名前を入力します。

スタックの作成手順については、AWS CloudFormation コンソールでのスタックの作成をご参照ください。

次のステップに進む前に、スタックが完了していることを確認してください。

AWS Glue ジョブの作成

AWS Glue ジョブを作成するには、次の手順を実行します。

  1. 2019 年 10 月2019 年 11 月のニューヨーク市イエローキャブの月間移動データをダウンロードし、s3://<<Your S3 Bucket>>/tripdata/ のプレフィックスの下に保存します。
  2. ニューヨーク市タクシーゾーンルックアップテーブルをダウンロードし、s3://<<Your S3 Bucket>>/tripdata-lookup/ のプレフィックスの下に保存します。
  3. 次の PySpark script スクリプトを使用して、 <<…>> で囲まれたコードを変更します。

CloudFormation スタックの [出力] タブで、次のキーの値を確認できます。

    • S3Bucket
    • Snstopic

CloudFormation スタックの [パラメータ] タブで、次のキーの値を確認できます。

    • EC2KeyName
    • MyIPAddressCidr
    • NewS3BucketName
    • SubscriptionEmail

  1. AWS Glue スクリプトの準備ができたら、s3://<<Your S3 Bucket>>/glue-script/ のプレフィックスの下の S3 バケットにアップロードします。

AWS Glue ジョブを作成する際には、これを参照します。

  1. AWS Glue コンソールの [ETL] で、[ジョブ] を選択します。
  2. [ジョブを作成] を選択します。
  3. [名前] で、ジョブの名前を入力します。AWS Glue ジョブの命名の詳細については、ジョブをご参照ください。
  4. [IAM ロール] で、CloudFormation テンプレートが作成したロールを選択します。スタック出力のキー Glueaccessrole の値を使用します。
  5. [タイプ] で [Spark] を選択します。
  6. [Glue バージョン] で、Spark 2.4、Python 3 (Glue バージョン 1.0) を選択します。
  7. [このジョブを実行] で [自分が提供する既存のスクリプト] を選択します。
  8. [スクリプトを保存している S3 パス] で、 前に s3://<<Your S3 Bucket>>/Glue-script/ のプレフィックスの下に保存したスクリプトファイルを選択します。
  9. 高度なプロパティ 」セクションの [ジョブブックマーク] で [有効にする] を選択します。
  10. [カタログオプション] で、[Glue データカタログを Hive メタストアとして使用する] を選択します。
  11. [接続] で、スタック出力のキー GlueConnection の値を入力します。
  12. [ジョブを保存してスクリプトを編集] をクリックします。

Amazon Redshift データベーススキーマの作成

AWS Glue ジョブを実行する前に、Amazon Redshift クラスターに接続し、Glue_bookmark_redshift_schema という名前の Amazon Redshift データベーススキーマを作成する必要があります。クラスターに接続するには、SQL Workbench/J などの JDBC クライアントベースの SQL ツールのうちいずれかを使用します。手順については、ローカルマシンからプライベート Amazon Redshift クラスターにアクセスする方法を教えて下さいをご参照ください。

クラスターにアクセスするには、Amazon Redshift マスターユーザー bigdatabloguser(CloudFormation スタックの [パラメータ] タブの MasterUserName の値)と、スタックの作成時に指定したパスワードを使用します。

AWS Glue ジョブの実行

Glue ジョブは 1 つの引数(処理中のファイルの名前)のみを受け取ります。ファイルの処理中に、yellow_tripdata_2019-10.csv などのファイル名を渡します。これにより、特定のファイルに属するレコードを追跡できるため、さまざまなファイルを使用した複数のジョブ実行の結果を簡単に評価できます。

Glue ジョブの実行が成功すると、CloudFormation テンプレートを実行して作成した S3 バケット内の /tripdata-joined-output/ のプレフィックスの下に出力 Parquet ファイルが表示されます。Amazon Athena を使用して、データカタログで作成したテーブルのデータをクエリすることもできます。詳細については、Amazon Athena を使用した SQL クエリの実行をご参照ください。

redshift_bookmark_table という名前の Amazon Redshift データベーステーブルをクエリし、出力を確認します。

ソリューションの説明

ブックマークが有効な AWS Glue ジョブが(PySpark 内に)作成されて、ニューヨーク市イエローキャブの月間移動ファイルを読み込みます。ニューヨーク市タクシーゾーンルックアップファイルと結合し、Parquet 形式でファイルが作成され、Amazon S3 の場所に保存されます。

Amazon S3 の Parquet ファイルの場所を参照するデータカタログテーブルが作成されます。AWS Glue PySpark ジョブを使って、結果のデータセットが Amazon Redshift テーブルにも読み込まれます。

AWS Glue ジョブブックマーク変換コンテキストは、AWS Glue 動的フレームが毎月のニューヨーク市タクシーファイルを読み込んで作成されている間に使用されます。一方で、タクシーゾーンルックアップファイルの動的フレームを読み込んで作成している間は、変換コンテキストは無効になります(ファイル全体が毎月の移動ファイルの処理に必要となるからです)。このため、毎月の移動ファイルを 1 度処理するだけで、タクシーゾーンルックアップファイル全体を必要な回数だけ再利用できます。これは、ルックアップファイルに変換コンテキストがないので、ブックマークコンテキストをそのファイルに設定できないためです。

新しいニューヨーク市移動データの月間ファイルが到着して AWS Glue ジョブが実行される際は、新しく到着した月間ファイルのみが処理され、以前に処理した月間ファイルは無視されます。同様に、データカタログテーブルデータが Amazon Redshift にコピーされる際は、新しく処理した基礎となる Parquet ファイルのデータのみがコピーされ、Amazon Redshift テーブルに追加されます。この時点で、変換コンテキストがジョブブックマークを利用できるようになり、データカタログテーブルを読み込むことで AWS Glue 動的フレームが作成されます。

次の PySpark コードは、変換コンテキストを使用して、毎月の増分ファイルを読み込んで AWS Glue 動的フレームを作成します。

taxidata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths": [InputDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'},transformation_ctx = "taxidata")

ただし次のコードでは、ルックアップファイルの AWS Glue 動的フレームを作成する際に、変換コンテキストを省略しています。

Lookupdata  = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})

さらに、次のコードでは、Amazon Redshift テーブルにロードしたデータカタログテーブルを読み込む際に、変換コンテキストを使用しています。

datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")

以下のスクリーンショットを見ると、2019 年 10 月のイエローキャブの移動データファイルが(増分データセットを)処理するために到着したことがわかります。

毎月のデータを処理するには、タクシーゾーンルックアップ(完全なデータセット)が必要です。

次のスクリーンショットは、Parquet 形式で保存した 2019 年 10 月の移動データを処理した後の AWS Glue ジョブの出力です。

次の 2 つのスクリーンショットは Amazon Redshift テーブルです。2019 年 10 月のタクシーデータのレコード数と、これまでに処理した 2019 年 10 月のタクシーデータファイルのレコード数のみを表示しています。

次のスクリーンショットは、2019 年 11 月のニューヨーク市タクシーデータファイルが処理のために到着したことを示しています。

次のスクリーンショットは、Parquet 形式で保存した 2019 年 11 月の移動データを処理した後の AWS Glue ジョブの出力です。ジョブのブックマークと変換コンテキストが有効になっているため、ジョブは 11 月のデータのみを処理し、10 月の(再処理される)データを無視しています。

次のスクリーンショットは、Amazon Redshift テーブルが 10 月と 11 月の両方のデータを含んでいることを表しており、合計レコード数を示しています。

次のスクリーンショットは、各月の個々のレコード数を示しています。

Athena を使ったクエリ

同じ Glue データカタログを使用する Athena のデータセットを確認することもできます。次の Athena のクエリのスクリーンショットには、データカタログテーブルに 10 月と 11 月の両方のデータがあり、合計レコード数が示されています。

次の Athena のクエリのスクリーンショットは、各月の個々のレコード数を示しています。

次のスクリーンショットは、行政区、ゾーン、サービスゾーンなどの位置情報を示しています。これは、タクシーゾーンルックアップで利用でき、10 月のタクシー移動データに結合しています。

次のスクリーンショットは、11 月のデータで同じクエリの出力を示しています。

クリーンアップ

このソリューションを使い終えたら、CloudFormation スタックを削除して、追加料金が発生しないようにする必要があります。

まとめ

この投稿では、AWS Glue ジョブブックマークを使った ETL パイプライン処理の一部として、さまざまな頻度で受信したデータセットをマージする方法について解説しました。今回のユースケースでは、ジョブブックマークと変換コンテキストを使用して、いくつかの増分データセットを処理する ETL パイプラインを構築する方法をご紹介しました。

 


著者について

Dipankar は AWS プロフェッショナルサービスのシニアデータアーキテクトで、お客様の分析プラットフォームとソリューションの構築を支援しています。分散コンピューティングに大きな関心を寄せています。チェスをしたり、昔のハリウッド映画を観たりするのが趣味です。

 

 

 

Ashok Padmanabhan は AWS プロフェッショナルサービスのビッグデータコンサルタントで、お客様のビッグデータと分析のプラットフォーム、ソリューションの構築を支援しています。データレイクの構築や設計をしていないときは、フロリダの自宅近くのビーチで過ごすのが好きです。