Amazon Web Services ブログ

AWS Glue の自動コード生成機能とワークフローを利用して、データパイプラインをシンプル化する



これまでの一連の記事では、AWS Glue のジョブブックマークを使用して Amazon S3 やリレーショナルデータベースからデータを増分ロードする方法についてご紹介しました。また、AWS Glue に最適化された Apache Parquet ライターを使用してパフォーマンスを向上させ、スキーマ進化を管理する方法についても説明しました。

3 つ目の記事となる今回は、次の 3 つのトピックを取り上げます。まず、特定の列を選択する、深くネストされたレコードを展開する、ネストされたフィールドを効率的に解析 (パース) する、列データの展開処理といった一般的なユースケースにおいて、AWS Glue でデータの変換に役立つコードを自動生成方法について説明します。

次に、AWS Glue のワークフローとCrawlersApache SparkPython Shell ETL ジョブといったさまざまな Glue コンポーネントを使用してデータパイプラインを構築し、オーケストレーションする方法について説明します。

最後に、ETL ジョブで SparkSQL を活用し、Amazon S3 とリレーショナルデータベースに保存されたデータセットで SQL ベースの変換を実行する方法について説明します。

自動コード生成と変換: ApplyMapping、Relationalize、Unbox、ResolveChoice

AWS Glue では、さまざまなデータ変換タスクの実行に使用するコードを自動的に生成できます。これらの変換では、複雑で深くネストされたデータセットの処理するための、使いやすいインターフェイスを提供します。たとえば、一部のリレーショナルデータベースやデータウェアハウスは、ネストされたデータ構造をネイティブにサポートしていません。AWS Glue を使用すると、データをターゲットデータベースにロードする前にネストされたデータ構造を展開するためのコードを自動生成できるので、時間が節約できるだけでなく、技術に詳しくないユーザーでもデータを扱うことができます。

AWS Glue が提供する、データ処理をシンプル化するための変換のうち、よく利用されるものを次に示します。

  1. ApplyMapping は、列の投影やデータ型の変更に使用される変換処理です。この例では、action.id などいくつかのフィールドのネストを解除し、トップレベルの action.id フィールドにマッピングします。また、id 列を long 型に変換します。
    medicare_output = medicare_src.apply_mapping(
        [('id, 'string', id, 'string'),
        ('type, string, type', string),
        ('actor.id, 'int', actor.id', int),
        ('actor.login', 'string', actor.login', 'string'),
        ('actor.display_login', 'string', 'actor.display_login', 'string'),
        ('actor.gravatar_id', 'long', 'actor.gravatar_id', 'long'),
        ('actor.url', 'string','actor.url', 'string'),
        ('actor.avatar_url', 'string', 'actor.avatar_url', string)]
    )
  1. Relationalize は、DynamicFrameto に保存されているネスト化されたデータセットを、リレーショナル (行と列) 形式に変換します。ネスト化された構造は、最上位の列や配列へとネスト解除され、適切なプライマリキーや外部キーを挿入してさまざまなテーブルへと分解されます。結果として、テーブルのセットを表す DynamicFrames のコレクションを、リレーショナルデータベースに直接挿入できます。Relationalize の詳細については、こちらをご参照ください。
    ## An example relationalizing and writing to Redshift
    dfc = history.relationalize("hist_root", redshift_temp_dir)
    ## Cycle through results and write to Redshift.
    for df_name in dfc.keys():
        df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name, " ..."
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = df,
            catalog_connection = "redshift3",
            connection_options = {"dbtable": df_name, "database": "testdb"},
            redshift_tmp_dir = redshift_temp_dir)
  2. Unbox は、JSON などの特定の型の文字列フィールドを解析 (パース) して、対応するデータ型を持つ個別のフィールドにし、結果を DynamicFrame に保存します。たとえば、JSON 形式のフィールド {“a”: 3, “b”: “foo”, “c”: 1.2} を 1 つ持つ CSV ファイルがあるとします。Unbox により、JSON 文字列は 3 つのフィールド (int、string、double) に再フォーマットされます。Unbox 変換は、データの再フォーマットに必要な Python ユーザー定義関数の代わりによく利用されます。Python ユーザー定義関数は高コストで、かつ Apache Spark にメモリ不足の例外を発生させる恐れがあるためです。次の例は、Unbox の使用方法を示します。
    df_result = df_json.unbox('json', "json")
  3. ResolveChoice: AWS Glue Dynamic Frames は、1 つの列に異なる型の複数のフィールドを持つことができるデータをサポートします。これらの列は、Dynamic Frame のchoice で表されます。たとえば、medicare データセットの Dynamic Frame スキーマは次のように表示されます。
    root
     |-- drg definition: string
     |-- provider id: choice
     |    |-- long
     |    |-- string
     |-- provider name: string
     |-- provider street address: string

    その理由は、 “provider id” 列が、long 型または string 型のいずれかになるためです。Apache Spark Dataframe は、データセット全体を考慮するほか、最も一般的な型、すなわち string へ強制的に型変換 (キャスト) します。Dynamic Frames では、ResolveChoice を使用した型の変換が可能です。たとえば、次のように列を long 型に変換できます。

    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
    
    medicare_res.printSchema()
     
    root
     |-- drg definition: string
     |-- provider id: long
     |-- provider name: string
     |-- provider street address: string

    この変換では、値が変換できない文字列の場合、null が挿入されます。結果として、null 値に変換された string 型を持つレコードが特定できるようにもなりました。一方、choice 型も struct に変換可能ですが、この場合、両方の型の値が保持されます。

AWS Glue Workflows を使用してデータパイプラインを構築し、オーケストレーションする

AWS Glue ワークフローは、スキーマ検出のための Glue クローラと、データ変換のための Glue Spark ジョブおよび Python ジョブとを組み合わせることによってデータパイプラインを作成するためのビジュアルツールを提供します。タスクノード間でリレーションシップを定義したり、パラメータを渡したりできるので、さまざまな複雑さでパイプラインを構築できます。ワークフローは、定期的に実行させることも、プログラムによってトリガーさせることもできます。各ノードの進捗は個別での追跡やワークフロー全体での追跡が可能で、パイプラインのトラブルシューティングがさらに簡単になりました。

ETL ワークロードの典型的なワークフロー編成は次のとおりです。

  1. Glue Python コマンドが手動、スケジュール、または外部 CloudWatch イベントによってトリガーされます。これにより、前処理としてベースロケーションの下にあるテーブルに対して Amazon S3 のパーティションをリストします。たとえば、CloudTrail ログのパーティションは、 s3://AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/DAY/HOUR/ となります。Python コマンドは、各リージョンでさまざまな Glue Data Catalog テーブルを作成するための、すべてのリージョンとスケジュールクローラをリストできます。
  2. 次に、Glue クローラがトリガーされ、最近 Amazon S3 に取り込まれたグルーデータカタログの新しいパーティションに 1 時間ごとにデータが追加されまます。
  3. 複数の同時実行 Glue ETL ジョブがトリガーされ、各パーティションまたはパーティショングループが個別にフィルタリングし、処理します。たとえば、先週に対応している CloudTrail イベントは、Glue ETL ジョブの実行時にパーティションプレフィックス内で Glue ジョブパラメータとして渡すことで読み込むことができるほか、Glue ETL プッシュダウン述語を使用することで、そのプレフィックス内のすべてのパーティションを読み込むことができます。同時実行 Glue ETL ジョブのパーティション設定と調整は、Glue データカタログテーブル内のパーティションのサブセットのみを処理することで、個々の Apache Spark アプリケーションのスケーリングと確実な実行を可能にします。変換されたデータは、すべての個々の Glue ETL ジョブによって、Amazon S3 データレイク、AWS Redshift、またはその他のデータベース内の共通のターゲットテーブルに同時に書き戻されます。

最後に、Glue Python コマンドがトリガーされ、Glue クローラや並列 Glue ETL ジョブを含むさまざまな Glue エンティティの完了ステータスが取得されます。失敗したコンポーネントがあれば後処理されるか再試行されます。

AWS Glue で SparkSQL を使用して SQL を実行する

Hive 互換メタストアとしての AWS Glue データカタログ

AWS Glue データカタログは、Apache Hive Metastore API と互換性のあるマネージド型メタデータリポジトリです。Glue データカタログを使用するために AWS Glue ETL ジョブと開発エンドポイントを設定する手順の詳細については、こちらをご参照ください。対応するフォーマットに対してデータをシリアル化または逆シリアル化するには、AWS Glue ジョブのクラスパスに Hive SerDes を追加する必要があります。これにより、データカタログに保存されたテーブルに対して、Apache Spark SQL クエリをネイティブに実行できます。

次の例は、s3://awsglue-datasets/examples/us-legislators で入手可能な US legislators データセットをクロールしたと想定しています。 AWS Glue 開発者エンドポイントで実行されている Spark シェルを使用して、AWS Glue データカタログでカタログ化されている legislator テーブル上で直接 SparkSQL クエリを実行します。

>>> spark.sql("use legislators")
DataFrame[]
>>> spark.sql("show tables").show()
+-----------+------------------+-----------+
|   database|         tableName|isTemporary|
+-----------+------------------+-----------+
|legislators|        areas_json|      false|
|legislators|    countries_json|      false|
|legislators|       events_json|      false|
|legislators|  memberships_json|      false|
|legislators|organizations_json|      false|
|legislators|      persons_json|      false|

>>> spark.sql("select distinct organization_id from memberships_json").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

S3 からデータを読み込む AWS Glue DynamicFrame API でも、上と同様のアプローチが使用されます。その後、DynamicFrame は toDF メソッドを使用して Spark DataFrame に変換されます。次に、DataFrame に対して一時ビューを登録できます。これは SparkSQL を使用してクエリ可能です。 2 つのアプローチ間の重要な違いは、1 つ目のアプローチでは Hive SerDes を使用し、2 つ目のアプローチではネイティブの Glue/Spark リーダーを使用している点です。ネイティブの Glue/Spark を使用すると、ランタイム時のスキーマの計算、スキーマの進化、Glue Dynamic Frames に対するジョブブックマークのサポートなど、パフォーマンスと柔軟性の面でメリットが得られます。

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
>>> memberships.toDF().createOrReplaceTempView("memberships")
>>> spark.sql("select distinct organization_id from memberships").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

ワークフローと S3 の整合性

S3 にデータを取り込む外部プロセスのワークフローがある場合、またはワークフロー内のダウンストリームジョブによって使用されたテーブルに対してアップストリームの AWS Glue ジョブがテーブルに入力を生成している場合、次の Apache Spark エラーが発生する場合があります。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 16.0 failed 4 times, most recent failure: Lost task 10.3 in stage 16.0 (TID 761, ip-<>.ec2.internal, executor 1): 
java.io.FileNotFoundException: No such file or directory 's3://<bucket>/fileprefix-c000.snappy.parquet'
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 
'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

これらのエラーは、ダウンストリームのジョブがリストまたは読み込みしている S3 オブジェクトを、アップストリームジョブが同時に上書きすると発生します。また、S3 の結果整合性により、ダウンストリームジョブの読み込み中に、上書きまたは削除されたオブジェクトが更新されることでも発生します。このエラーは、ダウンストリームジョブで SparkSQL ビューを作成したり SQL クエリを実行する際によく見られます。これらのエラーを回避するベストプラクティスは、アップストリームジョブとダウンストリームジョブを異なる時点にスケジュールし、時間に基づいて異なる S3 パーティションに読み取り/書き込みが実行されるワークフローを設定することです。

また、特殊ジョブパラメータである「–enable-s3-parquet-optimized-committer」を true に設定することで、ジョブの S3 最適化出力コミッターを有効化できます。このコミッターは、ジョブやタスクのコミットフェーズ中における Amazon S3 のリストおよび名前変更オペレーションを回避することで、アプリケーションのパフォーマンスを向上させます。さらに、ジョブやタスクのコミットフェーズ中に Amazon S3 の結果整合性に起因して発生する問題を回避し、タスク失敗を最小限に抑えます。

まとめ

この記事では、AWS Glue ETL の自動コード生成プロセスを活用して、データ型の変換や複雑な構造の展開といった一般的なデータ操作タスクをシンプル化する方法について説明しました。また、AWS Glue ワークフローを使用して、さまざまな複雑さを持つデータパイプラインを構築、およびオーケストレーションする方法について説明しました。さらに、AWS Glue ETL と Glue データカタログを使用し、SQL の機能を活用しながら、データのクエリと変換を行う方法について学びました。

次回の記事では、AWS Glue Apache Spark ジョブのパフォーマンス、拡張性、オペレーションをより良く管理するのに役立つ、AWS Glue の具体的な機能とベストプラクティスを紹介します。

 


著者について

Mohit Saxena は AWS Glue のテクニカルリードマネージャーです。彼はクラウドのデータの効率的な管理を目指して、スケーラブルな分散システム構築に熱心に取り組んでいます。また、映画鑑賞や最新テクノロジーについての本を読むことが好きです。