Amazon Web Services ブログ

AWS GlueでApache Sparkジョブをスケーリングし、データをパーティション分割するためのベストプラクティス

AWS GlueはApache Spark ETLジョブでのデータ分析・データ処理を行うために、様々なデータソースから大量のデータセットを準備(抽出および変換)し、ロードするサーバーレスな環境を提供します。この投稿のシリーズでは、Apache SparkアプリケーションとGlueのETLジョブの開発者、ビッグデータアーキテクト、データエンジニア、およびビジネスアナリストが、AWS Glue上で実行するデータ処理のジョブを自動的にスケールするのに役に立つベストプラクティスについて説明します。

まず最初の投稿では、データ処理を行うジョブのスケーリングを管理する上で重要な2つのAWS Glueの機能について説明します。1つ目は、大規模に分割可能なデータセットに対して、Apache Sparkアプリケーションを水平にスケールアウトできるようにすることです。2つ目は新しいAWS Glueのワーカータイプを使用して、メモリインテンシブ(メモリを大量に消費する)なApache Sparkアプリケーションを垂直にスケールアップするということです。また、Amazon Kinesis Data Firehoseを使ったストリーミングアプリケーションから一般的に取り込まれる多数の小さなファイルに対して、Apache SparkアプリケーションをスケールさせるAWS Glueの使い方についても説明します。最後に、AWS GlueジョブがAmazon S3上に存在する大量のデータセットのパーティション構造を利用して、Apache Sparkアプリケーションの実行時間を短縮する方法についても説明します。

AWS Glueのワーカータイプを理解する

AWS Glueには3種類のワーカータイプがあり、お客様はジョブのレイテンシーやコスト要件を満たす設定を選択することができます。これらのワーカーは、Data Processing Unit(DPU) と呼ばれる単位で表され、ワーカーのタイプとして標準、G.1X、およびG.2Xが設定できます。

標準のワーカータイプの設定は、Sparkドライバーとエグゼキューターに5GB、spark.yarn.executor.memoryOverheadに512MB、EBSストレージ50GBが割り当てられます。G.1Xのワーカータイプは、Sparkドライバーとエグゼキューターに10GB、memoryOverheadに2GB、EBSストレージ64GBが割り当てられます。G.2Xのワーカータイプは、Sparkドライバーとエグゼキューターに20GB、memoryOverheadに4GB、EBSストレージ128GBが割り当てられます。

水平スケーリングできる計算の並列性(DPUあたりのApache Sparkタスク)は、ワーカータイプの種類に関係なく同じです。例えば、標準とG.1Xのひとつのワーカーは1DPUに対応し、8つのタスクを同時に実行することができます。G.2Xのひとつのワーカーは2DPUに対応し、16のタスクを同時に実行することができます。結果的に、高いデータ並列性が求められるコンピュートインテンシブ(計算量を集約した)なAWS Glueジョブは、水平スケーリング(より多くの標準またはG.1Xワーカーを利用)による恩恵を受けることができます。多くのメモリを必要とする、または中間シャッフルの出力を保存するために多くのディスク領域を必要とするAWS Glueジョブは、垂直スケーリング(G.1XまたはG.2Xワーカータイプ)による恩恵をうけることができます。

分割可能(splittable)なデータセットの水平スケーリング

AWS GlueはDynamicFrameを使って、S3から一般的なファイルフォーマット(CSVやJSON)やモダンなファイルフォーマット(ParquetやORC)を読む場合、ファイルスプリッティング(file splitting)に自動的に対応しています。DynamicFrameについてより情報が必要な場合は「AWS Glueでパーティションされたデータを操作する」(リンク先は英語です)を確認ください。

ファイルスプリット(file split)は、AWS Glueのワーカー上で実行されるSparkタスクが、個別に読み取り処理できるファイルの一部分をさします。デフォルトでは、ファイルスプリットは行区切りのネイティブなフォーマットで利用でき、これによりAWS Glueで実行されるApache Sparkジョブは複数のエグゼキューター間で計算を並列化できます。中規模(数百MB)または大規模(数GB)のファイルサイズを持ち、大規模かつ分割可能なデータセットを処理するAWS Glueジョブは、AWS Glueワーカーを追加することにより、水平スケーリングの効果で、より高速に実行できます。

ファイルスプリッティングは、bzip2のようなブロックベースの圧縮フォーマットでも効果があります。ファイルスプリットの境界線上の圧縮ブロックを読み取り、それぞれ個別に処理をすることができます。gzipのような分割ができない圧縮ファイルフォーマットはファイルスプリッティングの恩恵を受けることができません。分割不可能なファイルまたは圧縮形式を読み取るジョブを水平方向にスケーリングするには、あらかじめ複数の中規模のデータセットを用意します。(分割できないファイルの水平スケーリングは、単一の大きなファイルにするのではなく、あらかじめ複数の中規模ファイルにわけておくことが重要です。)

各ファイルスプリット(図の青い四角)はS3から読み込まれ、AWS Glue DynamicFrameパーティションにデシリアライズされた上で、Apache Sparkタスク(図の歯車アイコン)にて処理されます。デシリアライズされたパーティションのサイズは、Parquet などの高度に圧縮されたスプリット可能なファイル形式や gzip などの分割できない圧縮形式を使用する大きなファイルの場合、ディスク上の64MBのスプリットサイズより大幅に大きくなる可能性があります。通常、デシリアライズされたパーティションはあらかじめメモリにキャッシュされるのではなく、Apache SparkにおけるTransformationsの遅延評価に基づいて必要となったときにのみ使われるため、AWS Glueワーカーにメモリ負荷を与えることはありません。遅延評価に関してより情報が必要な場合は、Apache SparkのWebサイト上にある「RDDプログラミングガイド」(リンク先は英語です)をご確認ください。

ただし、AWS Glue ETL スクリプトまたは Apache Spark アプリケーションでパーティションをメモリに明示的にキャッシュしたり、ローカルディスクにスピルアウトしたりすると、メモリ不足(OOM)またはディスク不足の例外が発生する可能性があります。 AWS Glue では、ETL ジョブ用に垂直にスケールアップされた DPU インスタンスををベースとした、より大きな AWS Glue ワーカータイプを使用することにより、このようなユースケースに対応できます。(OOM:Out Of Memoryの略)

大きいワーカータイプを使ったApache Sparkジョブの垂直スケーリング

AWS Glue ETLジョブ、Apache Sparkアプリケーション、AWS Lake Formationで提供される新しい機械学習(ML)を用いた様々なGlueジョブには、高いメモリとディスク要件があります。 これらのワークロードを実行すると、実行エンジンに非常に大きなメモリ負荷がかかることがあります。 このメモリ負荷により、OOM またはディスク領域不足の例外が起こることで、ジョブの失敗につながる場合があります。 このような場合、メモリとディスク容量に関するYARNの例外が確認できるでしょう。

YARNのメモリオーバーヘッドを超過

YARNはSparkアプリケーションが動作するために必要なクラスターリソースを割り当ててくれます。アプリケーションには、Sparkドライバーと複数のエグゼキューターのJVMが含まれます。それぞれのエグゼキューターがジョブを実行するのに必要なメモリ割り当てに加え、YARN は、JVM オーバーヘッド、インターンされた文字列、および JVM が必要とするその他のメタデータに対応するために、追加のオーバーヘッドメモリを割り当てます。 その設定パラメータspark.yarn.executor.memoryOverhead のデフォルト値は、エグゼキュータのメモリ全体の 10% です。 大きなテーブルのJOINや特定列の値の分布が偏っているデータセットの処理など、メモリインテンシブな操作は、メモリ閾値を超える可能性があり、次のエラーメッセージが表示されることがあります。(原文の“skew”はここではデータの偏りを指しています。)

18/06/13 16:54:29 ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

ディスクスペース

Apache Spark は、spark.memory.fraction パラメータで定義されたヒープメモリから溢れたデータについて、Glue ワーカーのローカルディスクを使用します。 ジョブのソートまたはシャッフル段階で、Sparkは異なるワーカー間でデータを交換する前に、中間データをローカルディスクに書き込みます。 ディスク容量が残っていない場合、次の例外が原因でジョブが失敗することがあります。

java.io.IOException: No space left on device 
UnsafeExternalSorter: Thread 20 spilling sort data of 141.0 MB to disk (90 times so far)

AWS Glueジョブのメトリクス

これはジョブが処理しているデータセット内に大きな偏りがある場合の典型的な結果です。 AWS Glue ジョブメトリクスを使用して、さまざまな Apache Spark エグゼキュータの実行タイムラインを監視することで、偏りを特定できます。 詳細については、「要求の厳しいステージと Straggler タスクのデバッグ」を参照してください。

次の AWS Glue ジョブメトリクスのグラフは、AWS Glue ETL ジョブの様々なエグゼキュータの実行タイムラインとメモリプロファイルを示しています。 エグゼキュータの1つ(赤い線)は、大きなパーティションの処理が原因で実行時間が延びており、ジョブ実行期間の大半でメモリを大量に消費しています。

AWS Glueの垂直スケーリング機能により、メモリインテンシブなApache Sparkジョブは、より高いメモリと大容量のAWS Glueワーカーを使用して、これら2つの一般的な障害を克服できます。AWS Glue ジョブメトリクスを使用して、実行中のジョブのドライバとエグゼキュータのメモリ使用量を調べることによって、OOM をデバッグし、ジョブに最適なワーカータイプを決定できます。 詳細は、「OOM例外およびジョブ異常のデバッグ」を参照してください。

一般に、メモリを大量に消費するオペレーションを実行するジョブでは G.1X ワーカータイプが向いていて、AWS Glue の ML 変換または同様の ML ワークロードを使用するジョブは G.2X ワーカータイプが向いています。

AWS Glueジョブ用のApache Spark UI

また、AWS Glue の Spark UI を使用して、Spark の実行の有向非循環グラフ (DAG) を可視化して AWS Glue ETL ジョブを調査・スケールし、要求の厳しいステージ、大規模なシャッフル、および Spark SQL クエリプランをモニタリングすることもできます。 詳細については、「Apache Spark Web UI を使用したジョブの監視」を参照してください。

次の Spark SQL クエリプランは、S3 から 2 つのテーブルを読み取り、Spark のシャッフル処理による外部結合を実行し、結果を Parquet 形式で S3 に書き込む ETL ジョブの DAG を示しています。

実行計画からわかるように、JOINの変換を行うためのSparkのシャッフル処理および後続のソート操作は、ジョブ実行時間の大部分を占めています。AWS Glue 垂直スケーリングにより、各 AWS Glue ワーカー上でより多くの Spark タスクを実行できるため、 ネットワーク上のデータ交換の回数を削減できます。

多数の小さなファイルを処理するためのスケーリング

AWS Glue ETL ジョブは、S3 から数千または数百万のファイルを読み取る場合があります。 これは、Kinesis Data Firehose またはストリーミングアプリケーションで S3 にデータを書き込む場合に、典型的に起こります。 多数のファイルを読み取ろうとすると、Apache Spark ドライバがメモリ不足になることがあります。 この問題が発生すると、次のエラーメッセージが表示されます。

# java.lang.OutOfMemoryError: Java heap space 
# -XX:OnOutOfMemoryError="kill -9 %p" 
# Executing /bin/sh -c "kill -9 12039"...

Apache Spark v2.2 では、標準の AWS Glue ワーカータイプで約 650,000 個のファイルを管理できます。 より多くのファイルを処理するためには、AWS Glue では各ワーカーの Spark タスクごとにより大きなグループの入力ファイルを読み取るオプションを用意しています。 詳細については、「大きなグループの入力ファイルを読み込む」を参照してください。

AWS Glue ファイルのグループ化を使用することで、小さいファイルを処理する毎に1 つの Apache Spark タスクを起動するような、過剰な並列処理を抑えることができます。 また、Spark ドライバで OOM 例外が発生する可能性を減らします。 ファイルのグループ化を設定するには、groupFilesgroupSizeパラメータを設定する必要があります。 次のコード例では、これらのパラメータを持つ ETL スクリプトで 、AWS Glue DynamicFrame API を使用しています。

dyf = glueContext.create_dynamic_frame_from_options("s3",
    {'paths': ["s3://input-s3-path/"],
    'recurse':True,
    'groupFiles': 'inPartition',
    'groupSize': '1048576'}, 
    format="json")

groupFilesを設定することで、Hive形式の S3 パーティション内(inPartition)または S3 パーティション間(accrossPartition)ファイルをグループ化できます。 ほとんどのシナリオでは、パーティション内のグループ化は、Spark の同時タスクの数と Spark ドライバーのメモリフットプリントを削減するのに十分な設定です。 ベンチマークでは、パーティション内のグループ化オプション(inPartition)を設定した AWS Glue ETL ジョブは、160 の異なる S3 パーティションに配布された 320,000 個の小さな JSON ファイルを処理する場合に、ネイティブの Apache Spark v2.2 (リンク先は英語です)よりも約 7 倍高速でした。 Apache Spark の実行時間の大部分は、S3 ファイルの一覧を取得してメモリ上にインデックスを構築するのと、各ファイルを処理するための数多くの短いタスクのスケジューリングに費やされます。 グループ化を有効にしたベンチマーク対象のAWS Glue ETL ジョブでは、標準の AWS Glue ワーカータイプを使用して 、100 万を超えるファイルを処理できます。

groupSizeは、各 Spark タスクが読み取り・処理するデータ量を、単一の AWS Glue DynamicFrame パーティションとして設定できるオプションです。 ジョブを実行する前にファイルサイズの分布がわかっている場合は、groupSize を設定できます。 groupSize パラメータを使用すると、AWS Glue DynamicFrameパーティションの数や変換後の出力ファイル数も制御できます。 ただし、相当小さいまたは大きい groupSize を使用すると、タスクの並列性が大幅に増大したり、クラスタの利用率が低下したりする可能性があります。

デフォルトでは、AWS Glue では、入力ファイルまたはタスクの並列数が閾値の50,000 を超えた場合、手動による設定なしで自動的にグループ化が有効になります。 groupFiles パラメータのデフォルト値は inPartition であるため、各 Spark タスクは同じ S3 パーティション内のファイルだけ読み取ります。 AWS Glue は groupSize パラメータを自動で計算し、過剰な並列処理を減らすように設定することで、適切な数の Spark タスクが並行して実行するようにしてクラスタのコンピューティングリソースを有効に使用します。

データとPushdown Predicatesのパーティショニング

パーティショニングは、さまざまなビッグデータシステムがデータを効率的にクエリできるように、データセットを整理する重要な手法です。 階層的なディレクトリ構造により、1 つ以上の異なるカラムの値に基づいてデータを構成します。 たとえば、S3 のアプリケーションログを年、月、日ごとに分割できます。 1 日分のデータに対応するファイルには、次のようなプレフィックスが付きます。

s3://my_bucket/logs/year=2018/month=01/day=23/

パーティション列のPushdown Predicates

AWS Glue では、Pushdown Predicatesを利用できます。これは、AWS Glue データカタログのテーブルにおけるパーティション列のフィルタ条件を定義します。 実行時にすべてのデータを読み取って結果をフィルタリングする代わりに、パーティション列のカラムをSQLのWHERE 句に指定することができます。 たとえば、テーブルが年の列でパーティション化され、SELECT * FROM table WHERE year = 2019を実行するとします。年はパーティション列を表し、2019はフィルタ条件です。

AWS Glue は、述語の内容を満たし、処理に必要な S3 パーティションのファイルのみを一覧表示して読み込みます。

AWS Glue DynamicFrameのgetCatalogSource メソッドの追加パラメータとして、Spark SQLを使った述語を指定することでこれを実現します。 この述語は、フィルタリングにパーティション列を使用するだけで、ブール値に評価される任意の SQL 式またはユーザー定義関数に使えます。

この例では、年、月、日ごとに分割された Github イベントのデータセットを使用して、この機能を示しています。 次のコード例では、週末に発生したイベントに関連する S3 パーティションのみを読み取ります。

%spark

val partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

ここでは、日付文字列を構築するために、SparkSQLの文字列のconcat関数を使用することができます。to_date関数は、日付オブジェクトに変換し、'E'パターンを持つdate_format関数は、日付を3文字の曜日(例えば、MonやTue)に変換します。 これらの関数、Spark SQL の式、およびユーザー定義関数の詳細については、Apache Spark の Web サイトにある Spark SQL、データフレームおよびデータセットガイド(リンク先は英語です)および関数のリスト(リンク先は英語です)を参照してください。

AWS Glue データカタログでPartition Pruning(※1)すると、AWS Glue ETL ジョブのパフォーマンスが大幅に向上します。 Spark クエリエンジンが S3 にファイルを一覧表示し、実行時にデータを読み込んで処理するのに必要な時間を短縮します。 より選択性の高い述語を使用することにより、追加のパーティションを除外することができ、さらなる改善を実現できます。
(※1 Partition Pruningとは、クエリ実行時にSparkが読み取るファイルとパーティションの数を制限するパフォーマンス最適化のことです。)

S3への書き込み前と書き込み中のデータのパーティショニング

デフォルトでは、AWS Glue DynamicFrame からの結果を書き込む際は、データはパーティション化されません。すべての出力ファイルは、指定された出力パスの最上位レベルで書き込まれます。 AWS Glue では、シンクを作成するときに partitionKeys オプションを渡すことによって、DynamicFrame 結果のパーティション化が可能になります。 たとえば、次のコード例では、Parquet 形式でデータセットを type 列で分割された S3 に書き出します。

%spark

glueContext.getSinkWithFormat( 
connectionType = "s3", 
options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))), 
format = "parquet").writeDynamicFrame(projectedEvents)

この例では、$outpathはS3に出力先の基本となるプレースホルダーです。 partitionKeysパラメーターは、S3で出力を分割するために使用される列の名前に対応します。 書き込み操作を実行すると、個々のレコードからtype列が削除され、ディレクトリ構造にエンコードされます。 これを示すために、AWS CLIから次のaws s3 lsコマンドを使用して出力パスをリストできます

PRE type=CommitCommentEvent/ 
PRE type=CreateEvent/ 
PRE type=DeleteEvent/ 
PRE type=ForkEvent/ 
PRE type=GollumEvent/ 
PRE type=IssueCommentEvent/ 
PRE type=IssuesEvent/ 
PRE type=MemberEvent/ 
PRE type=PublicEvent/ 
PRE type=PullRequestEvent/ 
PRE type=PullRequestReviewCommentEvent/ 
PRE type=PushEvent/ 
PRE type=ReleaseEvent/ 
PRE type=WatchEvent/

詳細については、AWS CLI コマンドリファレンスの aws s3 ls (リンク先は英語です)を参照してください。

一般に、カーディナリティ(※2)が少なく、クエリ結果のフィルタまたはグループ化で最もよく使用される partitionKeys の列を選択する必要があります。 たとえば、AWS CloudTrail ログを分析する場合、特定の期間に発生したイベントを探すのが一般的です。 したがって、CloudTrail データを年、月、日ごとに分割すると、クエリのパフォーマンスが向上し、回答を返すためにスキャンするデータ量が減少します。
(※2 カーディナリティとは、カラムに格納されているデータの種類を指します。)

出力用のパーティショニングの利点は、2 つあります。 まず1つ目は、エンドユーザーのクエリ実行時間を短縮します。 2つ目は適切なパーティショニングスキームを使用することで、複数のジョブを 1 つのデータパイプラインに結合する際、AWS Glue ETL ジョブのダウンストリームで、コストがかかる Spark シャッフル操作を回避できます。 詳細については、「AWS Glue でパーティション化されたデータを操作する」を参照してください。

S3・Hive形式のパーティションは、Spark RDD・DynamicFrameのパーティションとは異なります。 Spark パーティショニングは、Spark または AWS Glue が大きなデータセットを小さくて管理しやすいチャンクに分割し、変換を並行して読み込みを可能にします。 AWS Glue ワーカーは、パーティションをメモリ内で管理します。 ジョブの実行中およびデータが S3 に書き込まれる前に、DynamicFrames のrepartition関数またはconcat関数を使用して、Spark パーティションをさらに制御することができます。 パーティション数を設定するには、パーティションの合計数を明示的に指定するか、データをパーティション化するための列を選択します。

repartition関数またはconcat関数を使用して、データセットを再パーティション化すると、AWS Glue ワーカー上でデータを交換(シャッフル)することがあります。これにより、ジョブの実行時間に影響を与え、メモリ負荷を増加させることがあります。 それとは対照的に、Hive形式のパーティション設定を使用して 、S3 にデータを書き込むにはデータのシャッフルは必要なく、各ワーカーノードでローカルにソートするだけです。 Hive形式のパーティショニングを使用しないS3の出力ファイル数は、Sparkパーティションの数とほぼ同じです。 対照的に、Hive 形式のパーティション分割を使用する S3 の出力ファイルの数は、各 AWS Glue ワーカーのパーティションキーの分散によって異なる場合があります。

まとめ

この投稿では、AWS Glue で ETL ジョブと Apache Spark アプリケーションをコンピューティングとメモリを大量に消費する両方のジョブでスケーリングする方法について説明しました。 AWS Glue では、データセットとさまざまな種類の AWS Glue ワーカーの並列処理を使用して、ジョブの実行時間を短縮し、効率的なメモリ管理を実現します。 また、ワークロードとクラスタの並列処理を自動的に調整することで、多数の小さなファイルを処理する際の課題を克服することもできます。 AWS Glue ETL ジョブは AWS Glue データカタログを使用し、述語のプッシュダウンを使用してシームレスなPartition Pruningを可能にします。 また、S3 のデータセットを効率的に分割して、ダウンストリームの Apache Spark アプリケーションや Amazon AthenaAmazon Redshift などの他の分析エンジンによるクエリの高速化も実現できます AWS Glue で Apache Spark アプリケーションに関するこれらのベストプラクティスを試してみてください。

このシリーズの 2 番目の投稿では、AWS Glue 機能を使用して、大規模な履歴データセットをバッチ処理し、S3 データレイクで差分を段階的に処理する方法を示します。 また、カスタムの AWS Glue Parquet Writerを使用してジョブの実行を高速化する方法についても示します。

 

原文はこちらをご覧ください。

このブログ記事はソリューションアーキテクトの倉光が翻訳しました。