Amazon Web Services ブログ

Amazon EMR 5.24 での Apache Spark のパフォーマンスが改善 – Amazon EMR 5.16 と比較して最大 13 倍のパフォーマンス向上

Amazon EMR のリリース 5.24.0 には Spark の最適化がいくつか含まれており、クエリのパフォーマンスが向上しました。パフォーマンスの向上を評価するため、Amazon S3 のデータを使用して、6 ノードの c4.8xlarge EMR クラスターで 3 TB 規模の TPC-DS ベンチマーククエリを実行しました。同様の設定で操作した EMR 5.16 と比較して、EMR 5.24 のクエリパフォーマンスが最大で 13 倍向上したことを確認しました。

大規模な変換からストリーミング、データサイエンス、そして機械学習に至るまでの幅広い分析ユースケースで、Sparkを使用できます。Spark を EMR で実行すると、EMR は安定した最新のオープンソースコミュニティといった革新をもたらし、さらに Amazon S3 での高性能ストレージスポットインスタンスAuto Scaling の独自のコスト削減機能も提供します。

毎月配信される EMR のリリースには、最新のオープンソースパッケージとともに、複数のマスターノードクラスターの再構成などの新機能も含まれています。各リリースで、パフォーマンスの改善も行っています。

これら各リリースの最適化はそれぞれ、より高速な実行とコスト削減を支援してきました。今回の EMR 5.24 でも新しい最適化をいくつか行っており、この記事では中でも重要な 3 つの最適化について詳しく説明します。

設定

EMR を開始するには、コンソールにサインインしクラスターを起動してデータを処理します

ベンチマーククエリの設定をレプリケートするには、次の設定を使用します。

  • クラスターにインストールしているアプリケーション: Ganglia、Hive、Spark、Hadoop (デフォルトでインストール済み)。
  • EMR リリース: EMR 5.24.0
  • クラスター設定
    • マスターインスタンスグループ: 512 GiB の GP2 EBS ストレージを備えた、c4.8xlarge インスタンス 1つ (それぞれ 128 GiB のもの 4 つ)
    • コアインスタンスグループ: 512 GiB の GP2 EBSストレージを備えた、c4.8xlarge インスタンス 5つ (それぞれ 128 GiB のもの 4 つ)
分類 プロパティ
yarn-site yarn.nodemanager.resource.memory-mb : 53248
yarn.scheduler.maximum-allocation-vcores : 36
spark-defaults spark.executor.memory : 4743m
spark.driver.memory : 2g
spark.sql.optimizer.distinctBeforeIntersect.enabled : true
spark.sql.dynamicPartitionPruning.enabled : true
spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled : true
spark.executor.cores : 4
spark.executor.memoryOverhead : 890m

TPC-DS のベンチマークを使用して見られた結果

次の 2 つのグラフは、EMR の各リリースでの TPC-DS 3TB クエリデータセットのすべてのクエリについて、合計集計ランタイムと幾何平均を比較したものです。

EMR 5.16 と EMR 5.24 での各クエリのランタイムが改善していることが、次の図にも表れています。横軸は TPC-DS 3 TB ベンチマークの各クエリを示しています。縦軸は、クエリの実行時間で測定した EMR 5.24.0 でのパフォーマンスの改善の程度 (EMR 5.16.0 と比較) の順位を示します。パフォーマンスの向上が最大となったクエリは 26 個あります。これらの各クエリでは、パフォーマンスが EMR 5.16 よりも少なくとも 2 倍優れたものになっていました。

EMR 5.24 でのパフォーマンスの最適化

今回 AWS は全体的な高速化に焦点を当て、パフォーマンス向上をいくつか行っていますが、この記事ではお客様の一般的なワークロードに最も影響を与える EMR 5.24 での 3 つの主要な改善点について解説します。

  • 動的パーティションプルーニング
  • スカラーサブクエリのフラット化
  • INTERSECT の前に DISTINCT

動的パーティションプルーニング

動的パーティションプルーニングでは、クエリの読み取りや処理の必要があるテーブル内にある特定のパーティションを選択することで、ジョブのパフォーマンスを向上します。読み込み処理するデータの量が減り、クエリがより迅速に実行されます。オープンソース版の Spark (2.4.2) は、計画時に解決できる静的述語のプッシュダウンのみをサポートしています。静的述語のプッシュダウンの例には、次のものがあります。

partition_col = 5

partition_col IN (1,3,5)

partition_col BETWEEN 1 AND 3

partition_col = 1 + 3

動的パーティションプルーニングを有効にすると、EMR の Spark はランタイム時に読み込みが必要なパーティションを推測します。動的パーティションプルーニングはデフォルトでは無効になっていますが、Spark 内からまたはクラスターの作成時に Spark プロパティ spark.sql.dynamicPartitionPruning.enabled を設定することで有効にできます。詳細については、「 Spark の設定」をご参照ください。

以下は、2 つのテーブルを結合し、動的パーティションプルーニングを採用して、パフォーマンスを向上した例です。store_sales テーブルにはリージョン別にパーティション化した売上合計データが、そして store_regions テーブルには各国のリージョンのマッピングが含まれています。この代表的なクエリでは、特定の国からのデータのみを取得します。

SELECT ss.quarter, ss.region, ss.store, ss.total_sales 
FROM store_sales ss, store_regions sr
WHERE ss.region sr.region AND sr.country = ’North America’

動的パーティションプルーニングを使用しないと、このクエリはサブクエリの結果と一致するリージョンのサブセットを除外する前に、すべてのリージョンを読み取ります。動的パーティションプルーニングでは、サブクエリで戻されたリージョンのパーティションのみを読み込み、処理します。このため、ストレージから読み込むデータや処理するレコードが少なくなり、時間とリソースを節約できるのです。

次のグラフでは、3 TB のデータでテストした TPC-DS スイートのクエリ 72、80、17、25 でパフォーマンスが向上したことを示しています。

スカラーサブクエリのフラット化

この最適化により、特定のテーブルの行に複数の条件を適用する必要がある場合のクエリパフォーマンスが向上しました。この最適化で、各条件でテーブルを何度も読み取ることを防ぎます。この最適化がこのようなケースを検出し、テーブルを 1 回だけ読み込むようにクエリを最適化します。

スカラサブクエリのフラット化はデフォルトで無効になっており、Spark 内から、またはクラスタの作成時に Spark プロパティ spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled を設定することで有効にできます。

これがどのように機能するかを示す例は、前の最適化と同じ total_sales テーブルを使用します。この例では、店舗の平均売上が特定の範囲内にあるときに、平均売上で店舗をグループ化します。

SELECT (SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 5000000 AND 10000000) AS group1,
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 10000000 AND 15000000) AS group2,
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 15000000 AND 20000000) AS group3  

この最適化を無効にすると、total_sales テーブルを各サブクエリが読み込みます。最適化を有効にすると、クエリは次のように書き換えられて、テーブルを 1 回だけ読み込むことによって返される行に各条件が適用されるようになります。

SELECT c1 AS group1, c2 AS group2, c3 AS group3 
FROM (SELECT avg (IF(total_sales BETWEEN 5000000 AND 10000000, total_sales, null)) AS c1,
avg (IF(total_sales BETWEEN 10000000 AND 15000000, total_sales, null)) AS c2,
avg (IF(total_sales BETWEEN 15000000 AND 20000000, total_sales, null)) AS c3 FROM store_sales);  

この最適化により、ストレージから読み込むデータや処理するレコードが少なくなるため、時間とリソースを節約できます。

TPCDS スイートの Q9 の例で説明します。関連する Spark プロパティが有効となっている場合、バージョン 5.16 と比較して 5.24 はクエリを 2.9 倍速く実行します。

INTERSECT の前に DISTINCT

2 つのコレクションを交差させると、交差結果として各コレクションで見つかった一意の値がそろいます。大規模なコレクションを扱うときは、多くの重複レコードを処理し、ホスト間でシャッフルして、最後に交差を計算する必要があります。この最適化では、交差を計算する前に各コレクション内の重複する値を排除し、ホスト間でシャッフルされるデータ量を減らすことによってパフォーマンスを向上させます。

この最適化はデフォルトで無効になっており、Spark 内からまたはクラスターの作成時に Spark プロパティ spark.sql.optimizer.distinctBeforeIntersect.enabled を設定することで有効にできます。

たとえば (TPC-DS query14 を単純化した) 例でいうと、店舗販売とカタログ販売の両方で販売しているブランドをすべて見つけるとします。この例では、store_sales テーブルには店舗での販売が、catalog_sales テーブルにはカタログでの販売が、そして item テーブルには一意の各製品の情報 (ブランド、製造元など) が含まれます。

(SELECT item.brand ss_brand FROM store_sales, item
WHERE store_sales.item_id = item.item_id)
INTERSECT
(SELECT item.brand cs_brand FROM catalog_sales, item 
WHERE catalog_sales.item_id = item.item_id) 

この最適化が無効になっていると、最初の SELECT ステートメントは 1,200 個の一意のブランドのみがある 260 万個のレコード (store_sales と同じ数のレコード) を生成します。2 つ目の SELECT ステートメントは、300 個の一意のブランドがある 150 万個のレコード (catalog_sales と同じレコード数) を生成します。このため、410 万行すべてを交差演算に入力することになり、両方の結果に存在する 200 個のブランドが生成されます。

最適化が有効になっていると、交差演算子に渡される前に各コレクションに対して個別の操作を実行するため、交差演算子に渡されるレコードは 1,200 + 300 個のレコードのみになります。この最適化により、ホスト間でシャッフルするデータが減り、時間とリソースを節約できます。

まとめ

このように Apache Spark でのパフォーマンスを最適化することによって、EMR 5.16 と比較して EMR 5.24 のクエリパフォーマンスも向上します。これらの最適化が実際のワークロードにどのように役立っているか、ぜひフィードバックをお寄せください。

さらに、EMR で Apache Spark のパフォーマンスを向上する追加の更新が公開されるので、どうぞご期待ください。常に最新の状態に保つためにも、ビッグデータブログの RSS フィードに登録してください。より優れた Apache Spark の最適化や設定のベストプラクティスについて学んだり、チューニングに関するアドバイスも得られます。Spark で S3 Select を使用したり、以前の EMR リリースにあった EMRFS S3 向けに最適化されたコミッター のように優れた最適化方法は他にもありますので、お見逃しのないように。

 


著者について

Paul Codding はアマゾン ウェブ サービスの EMR シニアプロダクトマネージャです

 

 

 

 

Peter Gvozdjak はアマゾン ウェブ サービスの EMR シニアエンジニアリングマネージャです

 

 

 

 

Joseph Marques はアマゾン ウェブ サービスの EMR プリンシパルエンジニアです

 

 

 

 

Yuzhou Sun はアマゾン ウェブ サービスの EMR ソフトウェア開発エンジニアです

 

 

 

 

Atul Payapilly はアマゾン ウェブ サービスの EMR ソフトウェア開発エンジニアです

 

 

 

 

Surya Vadan Akivikolanu はアマゾン ウェブ サービスの EMR ソフトウェア開発エンジニアです.