Amazon Web Services ブログ

Amazon EMR で Dr. Elephant と Sparklens を使って、Hadoop と Spark のパフォーマンスを調整する



データエンジニアや ETL 開発者はさまざまなパラメータを使用しながら、かなりの時間を費やして Apache Spark ジョブを実行および調整し、パフォーマンスの評価を行うことがよくありますが、これは簡単ではなく、時間のかかる作業です。Dr.ElephantSparklens はワークロードをモニタリングしたり、推奨する変更を提案することで、Spark や Hive のアプリケーションの調整を支援し、必要とされるエグゼキューターノード、コアノード、ドライバーメモリおよび Hive (Tez または MapReduce) ジョブといったパフォーマンスパラメータをマッパー、レデューサー、メモリ、データスキューの構成で最適化します。Dr.Elephant はジョブメトリクスを収集し、そのメトリクス上で分析を行い、最適化のための推奨事項をシンプルに提示するため、使用や修正が簡単です。同様に Sparklens では、Spark アプリケーションとコンピューティングリソースのスケーラビリティの制限を簡単に把握できます。そのため、試行錯誤による方法ではなく明確に定義されたメソッドで効率的に実行でき、開発者の時間やコンピューティングに費やす時間を節約します。

この投稿は、Amazon EMR クラスターに Dr. Elephant と Sparklens をインストールし、ワークロードを実行して、これらのツールの機能を実証する方法をご紹介するためのものです。Amazon EMR は AWS が提供する Hadoop のマネージドサービスで、AWS で Hadoop やその他のオープンソースフレームワークを簡単かつコスト効率よく実行できます。

以下の図は、このソリューションのアーキテクチャを示しています。データエンジニアや ETL 開発者は Amazon EMR クラスターにジョブを送信し、Dr. Elephant と Sparklens ツールの推奨事項に基づいて、Spark アプリケーションとコンピューティングリソースを最適化し、パフォーマンスと効率を向上させることができます。

前提条件のステップ

新しい EMR クラスターの作成

Dr. Elephant または Sparklens で EMR クラスターを設定するには、希望する容量の EMR クラスターを起動します。この投稿では、r4.xlarge インスタンスの 10 個のコアノードと、デフォルト設定の r4.4xlarge の 1 つのマスターノードを使用します。

AWS マネジメントコンソールAWS CloudFormation テンプレート、または AWS CLI コマンドを使用して、クラスターを起動できます。次の CloudFormation スタックを使用します。

次のスクリーンショットは、CloudFormation スタックで起動した EMR クラスターの概要を示しています。

Dr. Elephantまたは Sparklens を有効にする

永続的なクラスターが既に実行されている場合は、これらの手順を追加して Dr. Elephant または Sparklens を有効にします。次のコードをご参照ください。

aws emr add-steps --cluster-id j-XXXXXXXX --steps '[{"Args":["s3://aws-bigdata-blog/artifacts/aws-blog-hadoop-spark-performance-tuning/install-dr-elephant-emr5.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":" s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Install Dr.Elephant and Sparklens"}

Dr. Elephant ポータルおよび Sparklens 設定へのアクセスの検証

Dr. Elephant にアクセスするには、ポート 8087 のマスターノードアドレスにブラウザを向けます。

https://<< DNS Name.compute-1.amazonaws.com>>:8087

注: ダイナミックまたはローカルポートフォワーディングを使用して、マスターノードへの SSH トンネルをセットアップする必要があります。

次のスクリーンショットは、EMR クラスターで送信されたジョブの最新の分析を一覧表示した Dr. Elephant ダッシュボードを示しています。

Sparklens を検証するには、マスターノードに SSH で接続する必要があります。詳細については、「SSH を使用してマスターノードに接続する」をご参照ください。

コンソールで、次のコマンドを実行します。

cd /etc/spark/conf/

PySpark を起動し、設定が有効になっていることを確認します。次のコードをご参照ください。

[hadoop@ip-172-31-20-142]$ pyspark

コードに依存関係として追加した行の qubole#sparklens が表示されます。次のスクリーンショットは、EMR クラスターで有効化された Sparklens を示しています。

Sparklens – Spark ワークロードのテスト

EMR クラスターで Spark ワークロードをテストします。これは Sparklens ログで確認できます。

この投稿では、PySpark コードを使用して 1,000 億件のレコードのデータジェネレーターの例をテストし、Sparklens が最適化と推奨事項の提供を実施して、処理を微調整する様子を確認します。以下の手順を完了してください。

  1. EMR クラスターでコードをコピーする
  2. /home/hadoop/ に移動する
  3. test-spark.py で次のコードを入力する
    from pyspark.sql.functions import rand, randn
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext, SQLContext
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    
    df = sqlContext.range(0, 100000000000)
    df.count()
    df2 = df.select("id", rand(seed=1000).alias("uniform"), randn(seed=555).alias("normal"))
    row1 = df2.agg({"id": "max"}).collect()[0]
    print row1["max(id)"]
    df2.createOrReplaceTempView("df2")
    df_part1 = spark.sql("select * from df2 where id between 1 and 999999 order by id desc")
    row2 = df_part1.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part1.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output1/")
    df_part2 = spark.sql("select * from df2 where id > 10000000 order by id desc")
    row2 = df_part2.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part2.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output2/")
  4. spark-submit test-spark.py を実行する

次のスクリーンショットは、Sparklens ジョブの送信を示しています。

次のスクリーンショットは、Sparklens が収集したアプリケーションタスクのメトリクスを示しています。

 

次のスクリーンショットは、ジョブ期間のタイムラインメトリクスを示しています。

次のスクリーンショットは、送信したアプリケーションジョブの最小実行可能時間に関する Sparklens の推奨事項を示しています。

Sparklens は、実行時間が 50 秒のデフォルト設定と比較して、アプリリソースの最小実行可能時間は 23 秒であることを示しています。コンピューティング時間のうち 76.15% が無駄で、コンピューティング時間の 30.98% しか使用していません。

spark-submit ジョブのエグゼキューターカウントとエグゼキューターコアを減らすことで、結果がどのように変化するかを確認できます。

以下のコードを入力します。

spark-submit --num-executors 1 --executor-cores 1 test-spark.py

次のスクリーンショットは、ジョブを調整した後の Sparklens ジョブアプリケーションのメトリクスを示しています。

ジョブの完了時間が 45 秒に短縮し、ジョブを実行するには 1 つのエグゼキューターノードと 1 つのコアだけで十分になりました。これは、Spark アプリケーションを制限している特定の段階 (ドライバー、スキュー、タスクの欠如など) を識別するのに役立ち、これらの段階で問題となる可能性があるものに関するコンテキスト情報を提供します。

この投稿では、Scala、Java、Python の 3 つのネイティブサポートアプリケーションを使用して、前述の Pi の推定例をテストします。詳細については、「Spark アプリケーションの記述」をご参照ください。テストを実行するには、次の手順を実行します。

  1. 以下のコードを入力します。
    import sys
    from random import random
    from operator import add
    
    from pyspark import SparkContext
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        sc = SparkContext(appName="PythonPi")
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_):
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 < 1 else 0
    
        count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
        print "Pi is roughly %f" % (4.0 * count / n)
    
        sc.stop()
  2. Spark コード例をローカルディレクトリにコピーします。
  3. spark-submit としてコードを実行します。次のコードをご参照ください。spark-submit test-spark2.py

次のスクリーンショットは、Sparklens ジョブの送信を示しています。

次のスクリーンショットは、Sparklens が収集したアプリケーションタスクのメトリクスを示しています。

次のスクリーンショットは、送信したアプリケーションジョブの最小実行可能時間に関する Sparklens の推奨事項を示しています。

次のスクリーンショットは、クラスターのコンピューティング使用率に関する Sparklens メトリクスを示しています。

次のスクリーンショットは、コアコンピューティングノードでのクラスター使用率と推定使用率に関する Sparklens の推奨事項を示しています。

このテストで、Sparklens は推奨事項を提案し、実行時間が 14 秒のデフォルト設定と比較して、アプリリソースの最小実行可能時間は 10 秒であることを示しています。コンピューティング時間のうち 87.13% が無駄で、コンピューティング時間の 12.87% しか使用していません。

Spark アプリケーションを 1 度実行すると、Sparklens は任意の数のエグゼキューターが与えられた場合におけるアプリケーションのパフォーマンスを推定します。これは、エグゼキューターを追加する ROI を理解するのに役立ちます。内部的には、Sparklens にはアナライザーという概念があります。これは、興味深いイベントを発行するための汎用コンポーネントです。アナライザーの詳細については、GitHub リポジトリをご参照ください

spark-submit ジョブのエグゼキューターカウントとエグゼキューターコアを減らすことで、結果がどのように変化するかを確認できます。次のコードをご参照ください。

spark-submit --num-executors 2 --executor-cores 2 test-spark2.py

次のスクリーンショットは、Sparklens ジョブの送信を示しています。

次のスクリーンショットは、ジョブを調整した後の Sparklens ジョブアプリケーションのメトリクスを示しています。

ジョブの完了時間が 12 秒に短縮し、ジョブを実行するには 1 つのエグゼキューターノードと 1 つのコアだけで十分になりました。

Dr.の Dr. Elephant テスト

EMR クラスターでシナリオをテストし、Dr. Elephant ポータルでシナリオを確認できます。

Hive の負荷とパフォーマンス分析のテスト

Hive CLI コンソールを使用してサンプルデータセットを取り込み、ワークロードをどのように実行し、どのようなパフォーマンス最適化を提案するかを確認できます。

この投稿では、Hiveを使用して Elastic Load Balancer アクセスログ (Amazon S3 に保存されている) を分析する方法を示します。以下の手順を完了してください。

  1. Hive CLI で、次のコードを入力します。
    CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs (
    Ts STRING,
    ELBName STRING,
    RequestIP STRING,
    RequestPort INT,
    BackendIP STRING,
    BackendPort INT,
    RequestProcessingTime DOUBLE,
    BackendProcessingTime DOUBLE,
    ClientResponseTime DOUBLE,
    ELBResponseCode STRING,
    BackendResponseCode STRING,
    ReceivedBytes BIGINT,
    SentBytes BIGINT,
    RequestVerb STRING,
    URL STRING,
    Protocol STRING
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\"$"
    ) LOCATION 's3://us-east-1.elasticmapreduce.samples/elb-access-logs/data/';

    次のスクリーンショットは、Hive による外部テーブルの生成を示しています。

  2. 簡単なカウントクエリを実行し、Dr. Elephant の推奨事項を確認します。次のコードをご参照ください。
    SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP;

  3. Dr.Elephant Portal を起動します。以下のスクリーンショットは、Dr.Elephant の出力を示しています。Hive ジョブには Tez マッパーメモリの調整が必要です。
  4. アプリケーションメトリクスの強調表示されたセクションで、Tez マッパーメモリセクションをクリックします。次のスクリーンショットは、過剰に割り当てられた Tez マッパーメモリに関する Dr.Elephant の推奨事項を示しています。
  5. マッパーメモリを減らして Hive クエリを実行します。次のコードをご参照ください。
    set hive.tez.container.size=1024;SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP

次のスクリーンショットは、送信されたジョブのアプリケーションメトリクスを示しています。

以下のスクリーンショットは、Dr.Elephant がクエリからのエラーや警告がないことを示しています。

マップ削減ジョブの調整

マップ削減ジョブを調整するには、デフォルト設定で次の Pi コード例を実行します。

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 200 1000000

次のスクリーンショットは、送信されたジョブのアプリケーションメトリクスを示しています。

Dr.Elephant の次のスクリーンショットは、マッパー時間、マップメモリ、レデューサーメモリの調整が必要であることを示しています。

次のスクリーンショットは、Dr.Elephant のマッパーメモリの改善に関する推奨事項を示しています。

次のスクリーンショットは、Dr.Elephant のレデューサーメモリの改善に関する推奨事項を示しています。

これで、マッパーおよびレデューサーのメモリを次の値に設定できます。

set mapreduce.map.memory.mb=4096

set mapreduce.reduce.memory.mb=4096

マッパーの数を減らし、マッパーごとのサンプル数を増やして、同じ Pi の結果を得ることができます。次のコードをご参照ください。

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 100 2000000

次のスクリーンショットは、推奨事項によって改善したメトリクスを示しています。

このジョブでは、効率性が約 60 秒から 38 秒へと 50% 向上しています。

10 個のマッパーを使用して実行すると、さらに効率性が向上します。次のコードをご参照ください。

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 10 2000000

次のスクリーンショットは、Dr.Elephant の推奨事項によって改善したメトリクスを示しています。

Dr.Elephant はモニタリングと分析情報の提供を行い、Hive および Hadoop ジョブを最適化しました。

本番ワークロードの設定

Dr. Elephant ツールを微調整するには、/mnt/dr-elephant-2.1.7/app-conf ディレクトリに移動し、状況に応じて設定ファイルを編集します。

たとえば、ヒューリスティックを作成し、Dr.Elephant ツールにプラグインして、特定の条件を設定し、タスクの数に基づいて重大度を変更することができます。さらに、クラスターの容量からの差分をマッピングまたは削減することも可能です。スレッドの数を変更して、完了ジョブ、またはリソースマネージャーからのフェッチの間隔を分析することもできます。

次のスクリーンショットは、要件に応じてさらに調整やカスタマイズを行うための Dr.Elephant 設定ファイルのリストを示しています。

メトリクスと設定の詳細については、GitHub リポジトリをご参照ください。

まとめ

この投稿では、Amazon EMR クラスターで Dr.Elephant ツールと Sparklens ツールを起動し、コンピューティングジョブと多くのメモリを使用するジョブの最適化とパフォーマンス調整を行う方法を解説しました。Dr.Elephant と Sparklens は、データセットを並列で処理し、かつ最適なコンピューティングノードを使用することで、ジョブ実行時間の短縮と効率的なメモリ管理を最適化および有効化できます。さらに、ワークロードとクラスターの並列処理を調整し、要求に対応することで、Spark と Hive の多くのジョブを処理する際の課題を克服する支援を行います。

 


著者について

Nivas は、アマゾン ウェブ サービスのシニアデータアーキテクトです。 AWS プラットフォームでデータレイクと分析アプリケーションを構築する企業のお客様と密接に連携しながら、サポートしています。物理学の修士号を取得している Nivas は、理論物理学概念にも夢中です。

 

 

Mert Hocanin は、AWS のビッグデータアーキテクトで、EMR、Athena、Managed Blockchain などのいくつかの製品に取り組んでいます。AWS で働く前は、シニアソフトウェア開発エンジニアとして Amazon.com の小売ビジネスに携わり、レポート作成のために会社全体から大量のデータを処理するデータレイクを構築しました。データレイクの構築や設計をしていないときは、旅行や食事を楽しんでいます。