Run Spark and Spark SQL on Amazon Elastic MapReduce
Submitted By: Jonathan Fritz
AWS Products Used: Elastic MapReduce
Created On: February 23, 2013
With the proliferation of data today, a common scenario is the need to store large data sets, process that data set iteratively, and discover insights using low-latency relational queries. Using the Hadoop Distributed File System (HDFS) and Hadoop MapReduce components in Apache Hadoop, these workloads can be distributed over a cluster of computers. By distributing the data and processing over many computers, your results return quickly even over large datasets because multiple computers share the load required for processing. However, with Hadoop MapReduce, the speed and flexibility of querying that dataset is constrained by the time it takes for disk I/O operations and the two step (map and reduce steps) batch processing framework.
Apache Spark, an open-source cluster computing system optimized for speed, can provide much faster performance and flexibility than Hadoop MapReduce. Apache Spark speeds up data analytics by using a directed acyclic graph (DAG) execution engine and in-memory computing. Spark's primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD), which is designed to be cached in-memory. For more information on Spark, see https://spark.apache.org/.
For those who don't want to use Scala or Python to process data with Spark, Spark SQL allows queries expressed in SQL and HiveQL, and can query data from a SchemaRDD or a table in the Hive metastore. SchemaRDDs can be created from existing RDDs, Parquet files, JSON objects, or by selecting data from a Hive table. Spark SQL can return results up to 30 times faster than the same queries run on Hive, making it a good application for a low-latency data warehouse. Spark SQL supports other features of Apache Hive as well, such as serialization formats and user-defined functions (UDFs). It also has JDBC and ODBC connectivity. For more information about Spark SQL, see https://spark.apache.org/sql/.
Amazon EMR is a web service that makes it easy to run Hadoop clusters using AWS resources such as Amazon EC2 virtual server instances. This means that you can launch clusters without having to maintain a data center of physical hardware, and you only pay for the capacity you need. This is especially useful for applications that use the Spark framework. Since Spark stores data in memory, its CPU and memory requirements can vary depending on the workload. When you run your cluster on Amazon EMR, you can size your cluster to meet those needs, and change the number of nodes in a cluster as Spark's resource requirements change. For more information about Amazon EMR, see https://aws.amazon.com/elasticmapreduce/.
In this article, we will explain how to install Spark on a Hadoop cluster managed by Amazon EMR and query data from Amazon S3 using Spark SQL. By combining these technologies, you can run low-latency queries on large datasets using Spark SQL and while also taking advantage of the operational and cost benefits provided by Amazon EMR.
The following diagram illustrates running Spark on a Hadoop cluster managed by Amazon EMR. When you launch a cluster, Amazon EMR provisions virtual server instances from Amazon EC2. You can then use an Amazon EMR bootstrap action to install Spark on the cluster. Spark on Amazon EMR can process data from an Amazon S3 bucket or stored in HDFS, and results from queries can also be persisted back to Amazon S3 or HDFS after analysis.
Spark with YARN on an Amazon EMR cluster
In order to install Spark, we will use an EMR bootstrap action. You can use Amazon EMR bootstrap actions to install applications, libraries, or software on your Amazon EMR cluster. For example, you can install applications such as Matlab, Nutch, Presto, and Accumulo. For more information about bootstrap actions, see https://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/Bootstrap.html.
Note: The bootstrap action used in this tutorial assumes that you are using AMI version 3.2. Running Spark on YARN requires AMIs with Hadoop 2 (YARN is Hadoop 2's resource manager). At the time of writing this article, the bootstrap will install:
- Hadoop 1.0.3 (AMI 2.x): Spark 0.8.1
- Hadoop 2.2.0 (AMI 3.0.x): Spark 1.0.0
- Hadoop 2.4.0 (AMI 3.1.x and 3.2.x): Spark 1.1.0
- Sign up for AWS, if you do not already have an account. You can do this at https://aws-portal.amazon.com/gp/aws/developer/registration/index.html
- Install the AWS CLI
- Optional: If you want to monitor Spark cluster metrics using Amazon CloudWatch, you will need to create an Instance Profile (IAM role for EC2) that grants access to this services. Click here to learn how to create an Instance Profile. See the Appendix of this article for an example IAM policy. Also, to use Instance Profiles with EMR, you will need to create and use an EMR Service role. Click here to learn how create an EMR Service role.
- Create your Spark cluster with Amazon EMR using the AWS CLI. Also, in the below commands, Substitute "MYKEY" value for the KeyName parameter with the name of the EC2 key pair you want to use to SSH into the master node of your EMR cluster.
- a. Run the following command using the AWS CLI if you want to use Amazon CloudWatch with your Spark cluster (The command below assumes the Instance Profile you created is named SparkRole and the EMR Service Role is named EMR_DefaultRole):
aws emr create-cluster --name SparkCluster --ami-version 3.2 --instance-type m3.xlarge --instance-count 3 --service-role EMR_DefaultRole --ec2-attributes KeyName=MYKEY,InstanceProfile=SparkRole --applications Name=Hive --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark
- b. Or, run the following command using the EMR CLI if you do not plan to use Amazon CloudWatch with your Spark cluster:
aws emr create-cluster --name SparkCluster --ami-version 3.2 --instance-type m3.xlarge --instance-count 3 --ec2-attributes KeyName=MYKEY --applications Name=Hive --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark
- a. Run the following command using the AWS CLI if you want to use Amazon CloudWatch with your Spark cluster (The command below assumes the Instance Profile you created is named SparkRole and the EMR Service Role is named EMR_DefaultRole):
The CLI will return a statement similar to the following. The j-xxxxxxxxxx value is the identifier for your cluster (called a Cluster ID or jobflow in Amazon EMR).
ClusterId j-367J67T8QGKAD
The cluster should be ready in 5-10 minutes. You can check the status of the cluster as it is initializing by running the following command.
aws emr describe-cluster --cluster-id j-xxxxxxxxxx
This returns a list of the clusters with their current status. When your cluster is in the WAITING status, you are ready for the next step.
Sample Data for our Queries
To test that the Spark infrastructure is installed and working properly, we will sort the Wikistat public data set. Wikistat contains Wikipedia article traffic statistics covering a 16-month period from October 1, 2008 to February 6, 2010. The full data set is online at https://aws.amazon.com/datasets/4182. The data in Wikistat is formatted as follows:
- Each log file is named with the date and time of collection: pagecounts-20090430-230000.gz.
- Each line in the log file has four fields: projectcode, pagename, pageviews, and bytes.
A sample of the type of data stored in Wikistat is shown below.
en Barack_Obama 997 123091092 en Barack_Obama%27s_first_100_days 8 850127 en Barack_Obama,_Jr 1 144103 en Barack_Obama,_Sr. 37 938821 en Barack_Obama_%22HOPE%22_poster 4 81005 en Barack_Obama_%22Hope%22_poster 5 102081
Because the full Wikistat dataset is fairly large, we have excerpted a single file and copied it to the Amazon S3 bucket at s3://support.elasticmapreduce/bigdatademo/sample/wiki/pagecounts-20100212-050000.gz. In our queries, we will parse the data file and sort the dataset based on number of pageviews.
Using Spark
To analyze this data using Spark:
- Connect to the master node of your Amazon EMR cluster using SSH. For information on how to do so, see https://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html and read the "Connect to the Master Node Using the AWS CLI or Amazon EMR CLI" section.
- From the command line, run the following command to open the Spark shell.
MASTER=yarn-client /home/hadoop/spark/bin/spark-shell
-
From the Spark command line, run the following commands. The first line tells Spark which file to process. In the second line we split each line of the dataset into multiple fields, taking the first and the second fields (page title and pageview count) and perform a groupBy based on the key (pagetitle). The third line caches the data in memory in case we need to re-run this job. This eliminates the need to read our dataset from Amazon S3 again. The last line sorts the list and provides the result.
val file = sc.textFile("s3://support.elasticmapreduce/bigdatademo/sample/wiki") val reducedList = file.map(l => l.split(" ")).map(l => (l(1), l(2).toInt)).reduceByKey(_+_, 3) reducedList.cache val sortedList = reducedList.map(x => (x._2, x._1)).sortByKey(false).take(50)
Spark should return a result similar to the following:
INFO spark.SparkContext: Job finished: take at :16, took 8.015384737 s sortedList: Array[(Int, String)] = Array((328476,Special:Search), (217924,Main_Page), (73900,Special:Random), (65047,404_error/), (55814,%E3%83%A1%E3%82%A4%E3%83%B3%E3%83%9A%E3%83%BC%E3%82%B8), (21521,Special:Export/Where_Is_My_Mind), ...
Using Spark SQL
If you decide to use the Spark SQL command line, it's very similar to working with Apache Hive. You can even write your queries in HiveQL, the query language used by Hive, and utilize the Hive metastore. In this article, we will use the Spark SQL command line and HiveQL for querying the dataset. You can also use Spark SQL to query SchemaRDDs from the Spark command line instead.
- Return to the Hadoop command line. If you need to SSH back into your cluster, follow the first step in the previous section ("Using Spark").
- From the command line, launch Spark by running the following command.
MASTER=yarn-client /home/hadoop/spark/bin/spark-sql --executor-memory 4G
-
From the Spark SQL command line, run the following query. This creates a Hive table and uses the same Wikistat data that we stored in Amazon S3 for running the Spark SQL queries in the previous example. The following Spark SQL query loads the data from Amazon S3 into a Hive table, sorts the Wikistat data, and selects the top ten pages with the highest pageview count. Note that the table is cached in memory from disk using the CACHE command, and the subsequent queries are run against that in-memory cache for faster response.
SET spark.sql.shuffle.partitions=10; create external table wikistat (projectcode string, pagename string, pageviews int, pagesize int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' location 's3://support.elasticmapreduce/bigdatademo/sample/wiki'; CACHE TABLE wikistat; select pagename, sum(pageviews) c from wikistat group by pagename order by c desc limit 10;
INFO spark.SparkContext: Job finished: collect at HiveContext.scala:415, took 0.038523897 s Special:Search 328476 Main_Page 217924 Special:Random 73900 404_error/ 65047 %E3%83%A1%E3%82%A4%E3%83%B3%E3%83%9A%E3%83%BC%E3%82%B8 55814 Special:Export/Where_Is_My_Mind 21521 Wikipedia:Portada 19722 %E7%89%B9%E5%88%A5:%E6%A4%9C%E7%B4%A2 18312 Pagina_principale 17080 Alexander_McQueen 17067
Monitoring Your Spark Cluster with Amazon CloudWatch
Amazon EMR provides the ability to monitor your Spark cluster with Amazon CloudWatch and configure actions/alerts based on specified metrics.
Some of the metrics you can monitor include:
- JVM.heap.init
- JVM.heap.max
- JVM.heap.usage
- Master.apps.number
- Master.workers.number
To see the complete list of available Spark metrics, open the Amazon CloudWatch console, select Custom Metrics and then select Spark.
You can set up actions/alerts when one or more metrics reaches a specified threshold. For example, you can be notified or take an action (example: add more nodes) when your Spark application is reaching its JVM.heap.max limit. For more details on Amazon CloudWatch alerts see the CloudWatch documentation: https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/AlarmThatSendsEmail.html.
Clean Up
When you are done running queries using Spark and Spark SQL, you should shut down your cluster to avoid incurring further charges.
- Disconnect from the master node by terminating your SSH session.
- From the command line on your local machine, run the following command to terminate your Amazon EMR cluster. Replace j-367J67T8QGKAD with the identifier of your cluster.
aws emr terminate-clusters --cluster-id j-367J67T8QGKAD
- If you chose to enable logging on your Amazon EMR cluster (this wasn't in the example command to launch your cluster, but you could have chosen to use logging), delete any log files stored in your Amazon S3 bucket, s3://yours3bucket, where yours3bucket is replaced by the name of the bucket you specified when you launched the cluster. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/UG/DeletinganObject.html
Credits
We want to thank Federico Baldo, Christopher Bozeman, Chad Schmutzer, Rahul Bhartia, and Manjeet Chayel from AWS who helped develop the latest bootstrap action for installing and running Spark on YARN with Amazon EMR. Also, a special thanks goes out to Parviz Deyhim who contributed the original article.
Appendix: Sample IAM Role
The following IAM role permission grants full access to Amazon CloudWatch, Amazon SNS, and Amazon S3. You may want to have more restrictive permissions than this. (For example, you can restrict the S3 permissions to only certain bucket(s), not allow deletes, etc.)
{ "Version": "2012-10-17", "Statement": [ { "Action": "cloudwatch:*", "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": "sns:*", "Resource": "*" }, { "Effect": "Allow", "Action": "s3:*", "Resource": "*" } ] }