AWS Big Data Blog

Tune Hadoop and Spark performance with Dr. Elephant and Sparklens on Amazon EMR

Update 7th July 2021: Dr. Elephant and Sparklens are open source applications that are currently not being maintained. As a result, this blog post is no longer being updated to support newer versions of EMR.

Data engineers and ETL developers often spend a significant amount of time running and tuning Apache Spark jobs with different parameters to evaluate performance, which can be challenging and time-consuming. Dr. Elephant and Sparklens help you tune your Spark and Hive applications by monitoring your workloads and providing suggested changes to optimize performance parameters, like required Executor nodes, Core nodes, Driver Memory and Hive (Tez or MapReduce) jobs on Mapper, Reducer, Memory, Data Skew configurations. Dr. Elephant gathers job metrics, runs analysis on them, and presents optimization recommendations in a simple way for easy consumption and corrective actions. Similarly, Sparklens makes it easy to understand the scalability limits of Spark applications and compute resources, and runs efficiently with well-defined methods instead of leaning by trial and error, which saves both developer and compute time.

This post demonstrates how to install Dr. Elephant and Sparklens on an Amazon EMR cluster and run workloads to demonstrate these tools’ capabilities. Amazon EMR is a managed Hadoop service offered by AWS to easily and cost-effectively run Hadoop and other open-source frameworks on AWS.

The following diagram illustrates the solution architecture. Data engineers and ETL developers can submit jobs to Amazon EMR cluster and based on Dr. Elephant and Sparklens tools recommendations , they can optimize their Spark applications and compute resources for better performance and efficiency.

Prerequisite Steps

Creating a new EMR cluster

To configure an EMR cluster with Dr. Elephant or Sparklens, launch an EMR cluster with your desired capacity. This post uses the 10 core nodes of r4.xlarge instances and one master node of r4.4xlarge with the default settings.

You can launch the cluster via the AWS Management Console, an AWS CloudFormation template, or AWS CLI commands. Use the following CloudFormation Stack:

The Following screenshot shows EMR cluster Summary launched from CloudFormation stack.

Enabling Dr. Elephant or Sparklens

If you already have a persistent cluster running, add these steps to enable Dr. Elephant or Sparklens. See the following code:

aws emr add-steps --cluster-id j-XXXXXXXX --steps '[{"Args":["s3://aws-bigdata-blog/artifacts/aws-blog-hadoop-spark-performance-tuning/install-dr-elephant-emr5.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":" s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Install Dr. Elephant and Sparklens"}

Validating access to the Dr. Elephant portal and Sparklens settings

To access Dr. Elephant, direct your browser to the master nodes address on port 8087:

https://<< DNS Name.compute-1.amazonaws.com>>:8087

Note: You need to set up an SSH tunnel to the master node using dynamic or local port forwarding.

The following screenshot shows Dr. Elephant dashboard listed with latest analysis of jobs submitted in EMR cluster.

To validate Sparklens, you need to SSH to the master node. For more information, see Connect to the Master Node Using SSH.

On the console, run the following command:

cd /etc/spark/conf/

Launch PySpark and check that settings are enabled. See the following code:

[hadoop@ip-172-31-20-142]$ pyspark

You should see the line qubole#sparklens added as a dependency in the code. The following screenshot shows Sparklens enabled on EMR cluster:

Sparklens – Testing Spark workloads

You can now test Spark workloads in an EMR cluster and observe them through the Sparklens logs.

This post tests a data generator example of 100 billion records using PySpark code and observes how Sparklens helps to optimize and provide recommendations to fine-tune the processing. Complete the following steps:

  1. Copy the code in the EMR cluster.
  2. Navigate to the /home/hadoop/
  3. Enter the following code via test-spark.py:
    from pyspark.sql.functions import rand, randn
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext, SQLContext
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    
    df = sqlContext.range(0, 100000000000)
    df.count()
    df2 = df.select("id", rand(seed=1000).alias("uniform"), randn(seed=555).alias("normal"))
    row1 = df2.agg({"id": "max"}).collect()[0]
    print row1["max(id)"]
    df2.createOrReplaceTempView("df2")
    df_part1 = spark.sql("select * from df2 where id between 1 and 999999 order by id desc")
    row2 = df_part1.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part1.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output1/")
    df_part2 = spark.sql("select * from df2 where id > 10000000 order by id desc")
    row2 = df_part2.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part2.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output2/")
  4. Run spark-submit test-spark.py.

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

 

The following screenshot shows duration of jobs timeline metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

Sparklens suggests the minimum possible runtime for the app resource is 23 seconds, compared to the default settings with an execution time of 50 seconds. Compute hours wasted are 76.15%, and only 30.98% of compute hours used.

You can reduce the executor count and executor core in the spark-submit job and see how it changes the outcome.

Enter the following code:

spark-submit --num-executors 1 --executor-cores 1 test-spark.py

The following screenshot shows Sparklens job application metrics after tuning the job:

The job completion time is reduced to 45 seconds, and only one executor node and one core is sufficient to run the job. It helps identify specific stages (like driver, skew, or lack of tasks) that are limiting Spark application and provides contextual information about what could be going wrong with these stages.

This post tests the preceding estimating Pi example with three natively supported applications—Scala, Java, and Python. For more information, see Write a Spark Application. To run the test, complete the following steps:

  1. Enter the following code:
    import sys
    from random import random
    from operator import add
    
    from pyspark import SparkContext
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        sc = SparkContext(appName="PythonPi")
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_):
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 < 1 else 0
    
        count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
        print "Pi is roughly %f" % (4.0 * count / n)
    
        sc.stop()
  2. Copy the Spark code example into a local directory.
  3. Run the code as a spark-submit See the following code:spark-submit test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

The following screenshot shows Sparklens metrics on cluster compute utilization:

The following screenshot shows Sparklens recommendation on cluster utilization with core compute nodes and estimated utilization:

In this test, Sparklens suggests recommendations and minimum possible runtime for the app resource is 10 seconds, compared to the default settings with an execution time of 14 seconds. Compute hours wasted are 87.13% and only 12.87% of compute hours used.

In a single run of a Spark application, Sparklens can estimate how your application performs given any arbitrary number of executors. This helps you understand the ROI of adding executors. Internally, Sparklens has the concept of an analyzer, which is a generic component for emitting interesting events. For more information about analyzers, see the GitHub repo.

You can reduce the executor count and executor core in a spark-submit job and see how it changes the outcome. See the following code:

spark-submit --num-executors 2 --executor-cores 2 test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens job application metrics after tuning the jobs:

The job completion time is now reduced to 12 seconds and the job needed only one executor node and core sufficient to process the job.

Dr. Elephant Testing the Hive/MapReduce workloads

You can test scenarios in an EMR cluster and observe them through the Dr. Elephant portal.

Testing Hive load and performance analysis

You can load example datasets through the Hive CLI console and see how the workload performs and what performance optimizations it suggests.

This post demonstrates how to analyze Elastic Load Balancer access logs stored in Amazon S3 using Hive. Complete the following steps:

  1. On the Hive CLI, enter the following code:
    CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs (
    Ts STRING,
    ELBName STRING,
    RequestIP STRING,
    RequestPort INT,
    BackendIP STRING,
    BackendPort INT,
    RequestProcessingTime DOUBLE,
    BackendProcessingTime DOUBLE,
    ClientResponseTime DOUBLE,
    ELBResponseCode STRING,
    BackendResponseCode STRING,
    ReceivedBytes BIGINT,
    SentBytes BIGINT,
    RequestVerb STRING,
    URL STRING,
    Protocol STRING
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\"$"
    ) LOCATION 's3://us-east-1.elasticmapreduce.samples/elb-access-logs/data/';

    The following screenshot shows external table creation through hive:

  2. Run a simple count query and check Dr. Elephant’s recommendations. See the following code:
    SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP;

  3. Launch the Dr. Elephant Portal. The following screenshot shows the Dr. Elephant output. The Hive job needs some tuning for Tez mapper memory.
  4. Click on Tez mapper memory section from the application metrics highlighted section.The following screenshot shows that Dr. Elephant recommendation on over-allocated Tez mapper memory:
  5. Run the Hive query with reduced mapper memory. See the following code:
    set hive.tez.container.size=1024;SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP

The following screenshot shows application metrics for the submitted jobs:

The following screenshot shows that Dr. Elephant indicates there are no more errors/warning from the query:

Tuning a map reduce job

To tune a map reduce job, run the following Pi code example with the default settings:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 200 1000000

The following screenshot shows application metrics for the submitted jobs:

The following screenshot of Dr. Elephant shows that the mapper time, map memory, and reducer memory need tuning:

The following screenshot shows Dr. Elephant recommendation on improving Mapper Memory:

The following screenshot shows Dr. Elephant recommendation on improving Reducer Memory:

Now, you can set the memory for Mapper and Reducer to the following value:

set mapreduce.map.memory.mb=4096

set mapreduce.reduce.memory.mb=4096

You can reduce the number of mappers and increase the number of samples per mapper to get the same Pi results. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 100 2000000

The following screenshot shows improved metrics from recommendation:

The job shows a 50% improvement in efficiency—from approximately 60 seconds to 38 seconds.

You can run with 10 mappers and observe even more improved efficiency. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 10 2000000

The following screenshot shows improved metrics from Dr. Elephant recommendation:

Dr. Elephant on an Amazon EMR cluster monitored and provided insights to optimize your Hive and Hadoop jobs.

Configuring your production workloads

To fine-tune the Dr. Elephant tool, navigate to the /mnt/dr-elephant-2.1.7/app-conf directory and edit the configuration files accordingly.

For example, you can write heuristics and plug them into the Dr. Elephant tool to set certain conditions to change in severity based on the number of tasks, and map or reduce numbers deviations from cluster capacity. You can also change the number of threads to analyze the completed jobs, or intervals between fetches from the resource manager.

The following screenshot shows list of Dr. Elephant configuration file for further tuning and customization according to requirements:

For more information about the metrics and configurations, see the GitHub repo.

Conclusion

This post showed how you can launch Dr. Elephant and Sparklens tools on an Amazon EMR cluster and try yourselves on optimizing and performance tuning for both compute and memory-intensive jobs. Dr. Elephant and Sparklens can help you optimize and enable faster job execution times and efficient memory management by using the parallelism of the dataset and optimal compute node usage. It also helps you overcome the challenges of processing many Spark and Hive jobs by adjusting the parallelism of the workload and cluster to meet your demands.


About the Author

Nivas Shankar is a Senior Data Architect at Amazon Web Services. He helps and works closely with enterprise customers building data lakes and analytical applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts.

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.