Amazon Web Services ブログ

データを段階的に読み込み、AWS Glue で最適化された Parquet ライター



AWS Glue では、Apache Spark ETL ジョブによる分析およびデータ処理のために、さまざまなソースから大規模なデータセットを準備 (抽出および変換) およびロードするためのサーバーレス環境がご利用いただけます。シリーズの最初の記事である「AWS Glue を使った Apache Spark ジョブのスケーリングとデータパーティショニングのベストプラクティス」では、Apache Spark アプリケーションや Glue ETL ジョブの開発者、ビッグデータアーキテクト、データエンジニア、ビジネスアナリストが、AWS Glue で実行するデータ処理ジョブを自動的にスケーリングするのに役立つベストプラクティスをご紹介しました。

この記事では、JDBC を使用して Amazon S3 データレイクおよびデータベースのデータソースからデータを段階的にロードする方法を示します。また、ジョブブックマークを使用して新しく追加されたデータのみを読み取り、さらにジョブブックマークを前回のジョブ実行の最後にリセットして遅れて到着するデータを処理することで、AWS Glue ETL ジョブをスケーリングする方法についても説明します。さらにこの記事では、複雑な AWS Glue ETL スクリプトとワークロードを備えたジョブブックマークを使用したベストプラクティスについても確認しています。

最後に、データの余分なパスを回避し、実行時にスキーマを計算することにより、パフォーマンスに最適化されたカスタム AWS Glue Parquet ライターを使用する方法について説明します。AWS Glue Parquet ライターを使用すると、列を追加または削除して、データセットのスキーマを進化させることもできます。

AWS Glue ジョブブックマーク

AWS Glue の Spark ランタイムには、状態を保存するメカニズムがあります。このメカニズムは、ETL ジョブを特定の方法で実行することで処理したデータを追跡するために用いられます。永続化された状態情報は、ジョブブックマークと呼ばれます。

上記のスナップショットは、同じ ETL ジョブの異なる時間インスタンスで複数のジョブを実行している Glue コンソールのビューを示しています。ジョブブックマークは、AWS Glue ジョブが最後のジョブの実行以降の増分データを処理するために使用します。ジョブブックマークは、ソース、変換、ターゲットなど、さまざまなジョブ要素の状態で構成されます。たとえば、AWS Glue ジョブは、S3 でバックアップされたテーブルの新しいパーティションを読み取る場合があります。AWS Glue は、ジョブが正常に処理されたパーティションを追跡して、重複処理を防ぎ、同じデータを複数回ターゲットデータストアに書き込みます。

ジョブブックマーク API

AWS Glue コンソールまたは AWS Glue API を使用してジョブを開始する場合、ジョブブックマークオプションがパラメータとして渡されます。

次の 3 つのオプションがあります。

  • 有効化 – このオプションを使用すると、ジョブが正常に実行されるたびにブックマークの状態が更新され、処理したデータを追跡します。同じデータソースで実行する後続のジョブは、最後のチェックポイント以降に新しく追加されたデータのみを処理します。
  • 無効化 – ジョブが常にデータセット全体を処理することになるジョブブックマークを使用しないようにします。これがデフォルトのオプションです。
  • 一時停止 – 状態情報を読み取り、最後のチェックポイント以降の増分データを処理しますが、更新はしません。このオプションでは、後続のすべての実行で同じ時点のデータを処理できます。

いずれの場合でも、前のジョブからの出力を管理する責任があります。詳細については、このシリーズの最初の記事である「AWS Glue を使った Apache Spark ジョブのスケーリングとデータパーティショニングのベストプラクティス」を参照してください。ジョブ、特にジョブブックマークに渡されるパラメータの詳細については、「AWS Glue で使用する特別なパラメータ」を参照してください。

次のコード例は、Amazon S3 の場所に基づいて AWS Glue テーブルから読み取る Glue ETL ジョブでジョブブックマークを使用する方法を示しています。ジョブは、JSON 形式の Kinesis Firehose イベントストリームから新しいファイルを受信し、2 つの列の名前を変更する変換を行い、Amazon Redshift に転換して書き出します。transformation_ctx は、このデータソースに関連付けられているジョブブックマークの識別子です。適切に操作するには、job.initjob.commit を使用して、ブックマークの状態を初期化および永続化する必要があります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "firehose_s3_db",
                table_name = "firehose_s3_raw_table",
                transformation_ctx = "datasource0")
applymapping = ApplyMapping.apply(frame = datasource0,
                mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
                transformation_ctx = "applymapping1")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping, catalog_connection = "redshift", connection_options = {"dbtable": "name", "database": "kinesis_db"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path")

job.commit()

API または CLI を使用してジョブの実行を開始する場合、次の引数を追加してジョブブックマークを有効にする必要があります。

Job Arguments :

--job-bookmark-option, job-bookmark-enable
--JOB_NAME, glue-job-incremental

S3 入力ソースでは、AWS Glue ジョブブックマークはオブジェクトの最終変更時刻をチェックして、再処理するオブジェクトを確認します。Kinesis Firehose から到着する新しいファイルがある場合、または既存のファイルが変更された場合、最後のジョブの実行以降、定期的な Glue ジョブトリガーまたは S3 トリガー通知を使用してジョブが再度実行されたときに、ファイルが再処理されます。

同じジョブを使用してすべてのデータを再処理する場合は、ジョブブックマークをリセットします。ジョブブックマークの状態をリセットするには、AWS Glue コンソール、ResetJobBookmark Action (Python: reset_job_bookmark) API オペレーション、または AWS CLI を使用します。たとえば、AWS CLI で次のコマンドを入力します。

aws glue reset-job-bookmark --job-name my-job-name

ジョブ実行 ID を渡すことにより、ResetJobBookmark API をスケジュールされたジョブ実行の特定のポイントに使用することもできます。ジョブブックマークの状態を、ジョブ実行 ID が完了した後の状態にリセットします。この機能はタイムトラベルに似ています。たとえば、過去の時間から入力データを再処理したり、ETL パイプラインの AWS Glue ワークフローで調整した ETL スクリプトまたはダウンストリームジョブで別の変換セットを使用したりできます。AWS Glue コンソールから、ジョブブックマークの Rewind オプションを使用して、ジョブブックマークの状態を以前のジョブ実行のコミットにリセットできます。

AWS Glue は、各ジョブのブックマークを追跡します。ジョブを削除すると、ジョブブックマークも削除されます。JSON、CSV、Apache Avro、XML、および JDBC ソースを含む一般的な S3 ベースのストレージ形式は、ジョブブックマークをサポートしています。AWS Glue バージョン 1.0 以降では、Apache Parquet や ORC などのカラムナストレージ形式もサポートしています。

ベストプラクティス 1: ジョブブックマークで開発する

場合によっては、AWS Glue ジョブブックマークを有効にしても、AWS Glue ジョブは以前の実行ですでに処理済みのデータを再処理してしまう可能性があります。これは、次の理由で発生する可能性があります。

  • ジョブコミットの欠落 – AWS Glue ETL スクリプトの最後にある job.commit() ステートメントは、ジョブブックマークの状態を更新します。これを含めないと、ジョブは以前に処理されたファイルと新しいファイルの両方を再処理します。ジョブコミットステートメントがユーザースクリプト内のすべての潜在的なコードパスで実行されて、ジョブの完了につながるようにしてください。
  • 変換コンテキストの欠落 – 変換コンテキストは、GlueContext のオプションのパラメータですが、ジョブブックマークが正しく機能するために必要になります。DynamicFrame を作成するときに、変換コンテキストパラメータを含めるようにしてください。次のコード例を参照してください。
    sample_dynF=glueContext.create_dynamic_frame_from_catalog(database,
                table_name,
                transformation_ctx="sample_dynF") 
  • JDBC ソース – ジョブブックマークでは、JDBC 接続を使用してリレーショナルデータベースにアクセスする際に、ソーステーブルにプライマリキー列または値が増加する列が必要です。これはソースオプションで指定する必要があります。ジョブブックマークは、新しく追加された行のみをキャプチャできます。この動作は、S3 に保存されているソーステーブルには適用されません。
  • 最終変更時刻 – S3 に保存されている処理対象のファイルを識別するため、ジョブブックマークはファイル名ではなくオブジェクトの最終変更時刻を確認します。ジョブが最後に実行されてから入力オブジェクトが変更された場合、ジョブが再度実行されるときに入力オブジェクトが再処理されます。

ベストプラクティス 2: ジョブブックマークをモニタリングする

ジョブブックマークのジョブ実行時の動作を検査するには、次の 3 つの方法があります。

  • tmp ディレクトリでのファイルリストの保存 – Apache Spark を実行し、DynamicFrames を使用してデータを読み取るすべての AWS Glue ETL ジョブは、パスごとに処理されたファイルのリストを含むマニフェストファイルを出力します。マニフェストファイルは、ジョブで指定された一時的な場所に保存されます。ファイルのパスは次のとおりです:<temporary location>/partitionlisting/<job name>/<run id>/<source transformation_ctx>.input-files.json。このファイルは、有効なジョブブックマークに関係なく、対応するデータソースについて読み取られたファイルのリストをキャプチャします。
  • ジョブメトリクス – AWS Glue ジョブメトリクスを使用して、S3 読み取りおよび書き込み操作を検査し、ブックマークを使用してジョブによって読み取られたバイト数を追跡できます。また、AWS Glue コンソールで行われる複数の実行にわたってジョブが読み取るデータを追跡することもできます。詳細については、「複数のジョブの進行状況のモニタリング」を参照してください。
  • Glue ジョブログ – AWS Glue ジョブは、S3 でのパーティションの処理とスキップに関連するログを Spark ドライバーログストリームに出力します。ログは Amazon CloudWatch に保存されます。

パーティションのスキップ

ジョブが空の場合は、またはAWS Glue データカタログ内の特定のパーティションの作成タイムスタンプが、ジョブブックマークによってキャプチャされた最後のジョブ実行のタイムスタンプよりも古い場合、パーティションがスキップされます。次のログメッセージの例は、スキップされたパーティションを示しています。

19/05/21 14:49:22 WARN HadoopDataSource: Skipping Partition
{"year": "2019", "month": "03", "day": "26", "hour": "13"}
has no new files detected 
@ s3://input-s3-prefix/Year=2019/Month=03/Day=26/Hour=13/ 
or path does not exist

パーティションの処理

ジョブが最後のジョブの実行後に作成されたS3 パーティションを見つけると、または処理する新しいファイルがあると、ログメッセージを生成します。ログメッセージは、特定のパーティション内のファイルの総数の割合も示します。現在のジョブ実行の最初と最後のジョブブックマークフィルターがこれらのファイルを処理します。次の例は、ジョブブックマークフィルタリングロジックを示しています。

パーティションが新しい場合 (パーティションが、パーティション作成時間に基づいて最新のジョブを実行した後に作成された場合)、ジョブはパーティション内のすべてのファイルを処理します。パーティションの作成時間は 1559235148000 で、最後のジョブの実行後です。次のログメッセージの例を参照してください。

19/05/31 10:39:55 INFO PartitionFilesListerUsingBookmark:
Found new partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000) 
with 47 files

既存のパーティションは、最初のブックマークフィルターをトリガーします。このフィルターは、最後のジョブの実行以降に変更タイムスタンプを持つファイルを選択します。次のログメッセージの例では、パーティション内の 47 個のファイルのうち 15 個が新しく、処理する必要があります。

19/05/31 10:40:31 INFO PartitionFilesListerUsingBookmark:
After initial job bookmarks filter,
processing 31.91% of 47 files 
in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@aa39e364,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

最後のブックマークフィルターは追加のフィルター処理を実行して、S3 の結果整合性に関連する競合状態を回避します。同じ変更時間で非常に多くのファイルが到着した場合、このフィルターはそれらを処理から除外する場合があります。次のログメッセージの例では、フィルターは最初のブックマークフィルターによってキャプチャされた 15 個のファイルをすべて処理しました。

19/05/31 10:50:31 INFO PartitionFilesListerUsingBookmark:
After final job bookmarks filter, processing 100.00% of 15 files 
in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@35309577,
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

最適化された Apache Parquet ライター

AWS Glue は、DynamicFrames を使用してパフォーマンスを向上させるときに最適化された Apache Parquet ライターを提供します。一般的に、Apache Parquet 形式は、列方向のストレージレイアウトと、データとともにファイルに書き込まれる事前計算されたスキーマのため、書き込みよりも読み取りの方が高速です。AWS Glue の Parquet ライターは、高速の書き込みパフォーマンスと柔軟性を実現し、進化するデータセットを処理します。デフォルトの Apache Spark Parquet ライターとは異なり、事前に計算されたスキーマや、入力データセットの追加スキャンを実行することによって推測されるスキーマは必要ありません。

write_dynamic_frame.from_options 関数の format パラメータを glueparquet に設定することにより、AWS Glue Parquet ライターを有効にできます。データが AWS Glue ジョブを介してストリーミングされて S3 に書き込まれると、最適化されたライターが実行時にスキーマを動的に計算およびマージし、ジョブの実行時間を短縮します。AWS Glue Parquet ライターは、新しい列の削除と追加をサポートすることにより、スキーマの進化も可能にします。

format_options パラメータを設定することにより、AWS Glue Parquet ライターをさらに調整できます。次のコード例を参照してください。

block_size = 128*1024*1024
page_size = 1024*1024
glueContext.write_dynamic_frame.from_options(frame = dyFrame,
connection_type = "s3", connection_options = {"path": output_dir},
format = "glueparquet",
format_options = {"compression": "snappy",
                  blockSize = block_size, pageSize = page_size})

format_options のデフォルト値は次のとおりです。

  • compression"snappy"
  • blockSize は 128 MB
  • pageSize は 1 MB

blockSize は、メモリにバッファされる Parquet ファイル内の行グループのサイズを指定します。pageSize は、単一レコードにアクセスするために完全に読み取る必要がある Parquet ファイル内の最小単位のサイズを指定します。

まとめ

この記事では、AWS Glue のジョブブックマークが、S3 とリレーショナルデータベースから収集したデータを段階的に処理するのにどう役立つかについて説明しました。また、ジョブブックマークを使用して履歴データを簡単に埋めることができることも学びました。ジョブブックマークとのやり取りは簡単です。有効化、無効化、一時停止、および前の時点まで巻き戻すことができます。ジョブブックマークの進行状況と状態をモニタリングすることで、ジョブをより適切に調整し、チェックを行って、すべてのデータが正しく処理されることを確認できます。

AWS Glue Parquet ライターを使用して、データレイクに Apache Parquet ファイルを書き込むパフォーマンスを最適化することもできます。最適化されたライターにより、Parquet ファイルのスキーマを進化させることができるため、データの変更を自動的に管理できます。

これらの機能を試して、AWS Glue の Apache Spark アプリケーションにデータを読み込んだり書き込んだりすることをお勧めします。

このシリーズの 3 番目の記事では、AWS Glue の自動コード生成機能を使って、複雑なデータセットを簡単に処理および変換する方法について説明します。その記事ではまた、AWS Glue ETL スクリプトから直接データセットに対して SQL クエリを実行する方法、および AWS Glue ワークフローを使ってデータパイプラインをスケジュールして調整する方法も示します。

 


著者について

Mohit Saxena は AWS Glue のテクニカルリードマネージャーです。彼はクラウドのデータの効率的な管理を目指して、スケーラブルな分散システム構築に熱心に取り組んでいます。また、映画鑑賞や最新テクノロジーについての本を読むことが好きです。

 

 

 

Bijay Bisht は AWS のシニアソフトウェア開発エンジニアです。