Run Spark and Shark on Amazon Elastic MapReduce

Articles & Tutorials>Elastic MapReduce>Run Spark and Shark on Amazon Elastic MapReduce
Learn how to run Spark (in-memory MapReduce) and Shark (Hive on Spark) on Amazon EMR. This tutorial walks through installing Spark/Shark, running some sample queries, monitoring the cluster with CloudWatch, and processing real-time data in Kinesis.

Details

Submitted By: Parviz Deyhim
AWS Products Used: Elastic MapReduce
Created On: February 23, 2013 6:38 PM GMT
Last Updated: March 11, 2014 8:39 PM GMT

A common business scenario is the need to store and query large data sets. You can do this by running a data warehouse on a cluster of computers. By distributing the data over many computers, you return results quickly because the computers share the load of processing the query. One limitation on the speed at which queries can be returned, however, is the time it takes to retrieve the data from disk.

You can increase the speed of queries returned from a data warehouse by using the Shark data warehouse system. Shark runs on top of Spark, an open-source cluster computing system optimized for speed. Spark speeds up data analytics by loading data into memory, providing much faster performance than a disk-based system like Hadoop. For more information on Spark, see http://spark-project.org/.

Shark is compatible with Apache Hive, which means that you can query it using the same HiveQL statements as you would a Hive data warehouse. The difference is that Shark can return results up to 30 times faster than the same queries run on Hive. Shark supports other features of Apache Hive as well, such as a metastore, serialization formats, and user-defined functions (UDFs). For more information about Shark, see http://shark.cs.berkeley.edu/.

In this article, we will explain how to install Shark and Spark on a cluster managed by Amazon EMR. By combining these technologies, you will be able to enjoy the speed enhancements of the Shark data warehouse as well as the operational and financial advantages of running your cluster on Amazon EMR.

Amazon EMR is a web service that makes it easy to run Hadoop clusters using AWS resources such as Amazon EC2 virtual server instances. This means that you can launch clusters without having to maintain a data center of physical hardware, and you only pay for the capacity you need. This is especially useful for applications that use the Shark framework. Since Shark stores data in memory, its CPU and memory requirements can vary depending on the workload. When you run your cluster on Amazon EMR, you can size your cluster to meet those needs, and change the number of nodes in a cluster as Sharks resource requirements change. For more information about Amazon EMR, see http://aws.amazon.com/elasticmapreduce/.

The following diagram illustrates running a Shark data warehouse on a cluster managed by Amazon EMR. When you launch a cluster, Amazon EMR provisions virtual server instances from Amazon EC2. You can then use an Amazon EMR bootstrap action to install Shark and Spark on the master node of the cluster. Initial data for the warehouse can be pulled in from a source such as an Amazon S3 bucket.

Install Spark/Shark on an Amazon EMR cluster

In order to install Spark/Shark, we will use an EMR bootstrap action. (You can use Amazon EMR bootstrap actions to install applications, libraries, or software on your Amazon EMR cluster. For example, you can install applications such as Matlab, Nutch, and Accumulo.) For more information about bootstrap actions, see http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/Bootstrap.html. Note: The bootstrap action used in this tutorial assumes that you are using the default AMI. It may not work if you specify an alternate AMI.

  1. Sign up for AWS, if you do not already have an account. You can do this at https://aws-portal.amazon.com/gp/aws/developer/registration/index.html
  2. Install the Amazon EMR CLI
  3. Optional: If you want to monitor Spark cluster metrics using Amazon CloudWatch or use Spark to process data in Amazon Kinesis (discussed below), you will need to create an IAM role that grants access to these services. Click here to learn how to create an IAM role. See the Appendix of this article for an example IAM policy.
  4. Run the following command using the EMR CLI if you want to use Amazon CloudWatch or Amazon Kinesis with your Spark cluster (assumes the role you created is named spark):
    elastic-mapreduce --create --alive --name "Spark/Shark Cluster"  --bootstrap-action s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh --bootstrap-name "Spark/Shark"  --instance-type m1.xlarge --instance-count 3 --jobflow-role spark	
    
    Run the following command using the EMR CLI if you do not plan to use Amazon CloudWatch or Amazon Kinesis with your Spark cluster:
    elastic-mapreduce --create --alive --name "Spark/Shark Cluster"  --bootstrap-action s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh --bootstrap-name "Spark/Shark"  --instance-type m1.xlarge --instance-count 3	
    

The CLI will return a statement similar to the following. The j-xxxxxxxxxx value is the identifier for your cluster (also called a jobflow in Amazon EMR).

Created jobflow j-367J67T8QGKAD

The cluster should be ready in 5-10 minutes. You can check the status of the cluster as it is initializing by running the following command.

elastic-mapreduce --list

This returns a list of the clusters with their current status. When your cluster is in the WAITING status, you are ready for the next step.

Sample Data for our Queries

To test that the Spark infrastructure is installed and working properly, we will sort the Wikistat public data set. Wikistat contains Wikipedia article traffic statistics covering a 16-month period from October 1, 2008 to February 6, 2010. The full data set is online at http://aws.amazon.com/datasets/4182. The data in Wikistat is formatted as follows:

  • Each log file is named with the date and time of collection: pagecounts-20090430-230000.gz.
  • Each line in the log file has four fields: projectcode, pagename, pageviews, and bytes.
A sample of the type of data stored in Wikistat is shown below.

en Barack_Obama 997 123091092
en Barack_Obama%27s_first_100_days 8 850127
en Barack_Obama,_Jr 1 144103
en Barack_Obama,_Sr. 37 938821
en Barack_Obama_%22HOPE%22_poster 4 81005
en Barack_Obama_%22Hope%22_poster 5 102081

Because the full Wikistat dataset is fairly large, we have excerpted a single file and copied it to the Amazon S3 bucket at https://s3.amazonaws.com/bigdatademo/sample/wiki/pagecounts-20100212-050000.gz. In our queries, we will parse the data file and sort the dataset based on number of pageviews.

Using Spark

To analyze this data using Spark:

  1. Connect to the master node of your Amazon EMR cluster using SSH. For information on how to do so, see http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html.
  2. From the command line, run the following command to open the Spark shell.

       
    SPARK_MEM="2g" /home/hadoop/spark/spark-shell
    

  3. From the Spark command line, run the following commands. The first line tells Spark which file to process. In the second line we split each line of the dataset into multiple fields, taking the first and the second fields (page title and pageview count) and perform a groupBy based on the key (pagetitle). The third line caches the data in memory in case we need to re-run this job. This eliminates the need to read our dataset from Amazon S3 again. The last line sorts the list and provides the result.

       
    val file = sc.textFile("s3://bigdatademo/sample/wiki/")
     
    val reducedList = file.map(l => l.split(" ")).map(l => (l(1), l(2).toInt)).reduceByKey(_+_, 3)
     
    reducedList.cache
     
    val sortedList = reducedList.map(x => (x._2, x._1)).sortByKey(false).take(50)
    

Spark should return a result similar to the following:

sortedList: Array[(Int, java.lang.String)] = Array((328476,Special:Search), (217924,Main_Page), (73900,Special:Random), (65047,404_error/),(55814,%E3%83%A1%E3%82%A4%E3%83%B3%E3%83%9A%E3%83%BC%E3%82%B8), (21521,Special:Export/Where_Is_My_Mind), (19722,Wikipedia:Portada),(18312,%E7%89%B9%E5%88%A5:%E6%A4%9C%E7%B4%A2), (17080,Pagina_principale),

Using Shark

Working with Shark is similar to working with Apache Hive. You even write your queries in HiveQL, the query language used by Hive.

  1. Connect to the master node of your Amazon EMR cluster using SSH. For information on how to do so, see http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html.
  2. From the command line, launch shark by running the following command.

    SPARK_MEM="2g" /home/hadoop/shark/bin/shark
    

  3. From the Shark command line, run the following query. This creates a Hive table and uses the same Wikistat data that we stored in Amazon S3 for running the Spark queries. The following query loads the data from Amazon S3 into a Shark table, sorts the Wikistat data, and selects the top ten pages with the highest pageview count. Note that the Shark table is stored in memory instead of on disk, and that the subsequent queries are run against that in-memory data store.

    set mapred.reduce.tasks=10;
       
    create table wikistat (projectcode string, pagename string, pageviews int, pagesize int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' location 's3://bigdatademo/sample/wiki/';
     
    create table wikistats_cached as select * from wikistat;
     
    select pagename, sum(pageviews) c from wikistats_cached group by pagename order by c desc limit 10;
    

Monitoring Your Spark Cluster with Amazon CloudWatch

Amazon EMR provides the ability to monitor your Spark cluster with Amazon CloudWatch and configure actions/alerts based on specified metrics.

Some of the metrics you can monitor include:

  • JVM.heap.init
  • JVM.heap.max
  • JVM.heap.usage
  • Master.apps.number
  • Master.workers.number
  • Etc

To see the complete list of available Spark metrics, open the Amazon CloudWatch console, select Custom Metrics and then select Spark.

It can be very useful to visualize trends in these metrics. For example, the following image shows the trend in JVM.heap.used in a sample Spark cluster:

You can set up actions/alerts when one or more metrics reaches a specified threshold. For example, you can be notified or take an action (example: add more nodes) when your Spark application is reaching its JVM.heap.max limit. For more details on Amazon CloudWatch alerts see the CloudWatch documentation: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/AlarmThatSendsEmail.html.

Real-time data processing with Spark streaming and Amazon Kinesis

Spark 0.8+ supports Spark streaming as well as Amazon Kinesis, a fully managed AWS service for real-time ingest of streaming data at massive scale. Amazon Kinesis can collect hundreds of terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, machine/sensor data, social media, and operational logs. Using Amazon Kinesis and Spark 0.8+ on EMR, you can collect massive amounts of streaming data and process it in real-time.

We will walk through an example to demonstrate the integration of Spark streaming and Amazon Kinesis. This example assumes that you have already created an Amazon Kinesis stream and are able to submit records to Amazon Kinesis. If you do not have an Amazon Kinesis stream created, click here for a quick way to get started. We also assume you already have a Spark cluster running on Amazon EMR. If not, refer to the sections above.

Using your EMR cluster, login to the master node and run the following command. This is a real-time wordcount example with a rolling 60 second window:

SPARK_EXAMPLES=/home/hadoop/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar /home/hadoop/spark/run-example org.apache.spark.streaming.examples/KinesisWordCount spark://SPARK_MASTER_IP:7077 myFirstStream 

SPARK_MASTER_IP is the private IP address of your EMR master node (example: spark://10.1.2.3:7077). myFirstStream is the name of your Kinesis stream.

A successful execution of the command above should result in a continues set of Spark console outputs similar to this:

-------------------------------------------
Time: 1391404800000 ms
-------------------------------------------
(4,[paapp])
(4,[apppa])
(5,[pppaa])
(2,[apaaa])
(3,[aapaa])
(5,[apapp])
(5,[appap])
(6,[aaaap])
(3,[pappa])
(3,[papap]) 

You can learn more about Spark streaming here: http://spark.incubator.apache.org/docs/0.8.1/streaming-programming-guide.html.

Clean Up

When you are done running queries using Spark and Shark, you should shut down your cluster to avoid incurring further charges.

  1. Disconnect from the master node by terminating your SSH session.
  2. From the command line on your local machine, run the following command to terminate your Amazon EMR cluster. Replace j-367J67T8QGKAD with the identifier of your cluster.

    elastic-mapreduce --terminate -j j-367J67T8QGKAD
    

  3. Delete any log files stored in your Amazon S3 bucket, s3://yours3bucket, where yours3bucket is replaced by the name of the bucket you specified when you launched the jobflow. For more information, see http://docs.aws.amazon.com/AmazonS3/latest/UG/DeletinganObject.html

Credits

We want to thank the individuals who helped us prepare this article:

  • Matei Zaharia who helped us tremendously with every aspect of running and optimizing Spark on Amazon EMR
  • Reynold Xin who has helped us with getting Shark working on Amazon EMR
  • Benjamin Hindman who provided guidance around installing Mesos on Amazon EMR

Appendix: Sample IAM Role

The following IAM role permission grants full access to Amazon CloudWatch, Amazon Kinesis, Amazon SNS, and Amazon S3. You may want to have more restrictive permissions than this. (For example, you can restrict the S3 permissions to only certain bucket(s), not allow deletes, etc.)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "cloudwatch:*",
      "Effect": "Allow",
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "kinesis:*",
      "Resource": "*"
    },	
    {
      "Effect": "Allow",
      "Action": "sns:*",
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "s3:*",
      "Resource": "*"
    }
  ]
}

©2014, Amazon Web Services, Inc. or its affiliates. All rights reserved.