AWS Big Data Blog

Installing and Running JobServer for Apache Spark on Amazon EMR

Derek Graeber is a senior consultant in big data analytics for AWS Professional Services

Working with customers who are running Apache Spark on Amazon EMR, I run into the scenario where data loaded into a SparkContext can and should be shared across multiple use cases. They ask a very valid question: “Once I load the data into Spark, how can I access the data to support ad hoc queries?” Open-source APIs are available that support doing this, such as JobServer, which exposes a RESTful interface to access the context.

“Okay, how do I put JobServer on EMR?” typically follows. There are a couple of ways. Amazon EMR recently started using Apache BigTop to support a much quicker release window (among other things) for out-of-the-box application components installed on EMR such as Spark, Pig, and Hive.

In this blog post, you will learn how to install JobServer on EMR using a bootstrap action (BA) derived from the JobServer GitHub repository. Then we’ll run JobServer using a sample dataset. To learn how to compile JobServer and install it on your Spark cluster, look at the JobServer readme file for EMR. All referenced code including the BA, Spark code, and commands, is located in this project’s GitHub repository.

Background and setup

For this approach, we assume that you have a working knowledge of Apache Spark running on EMR and can create a cluster with configurations using either the AWS Management Console or the AWS Command Line Interface (AWS CLI). For this exercise, we will define a cluster size and use the airline flight’s public dataset available on Amazon S3. This data is in Parquet format. We will create an Amazon EMR 4.7.1 cluster consisting of:

  • One r3.xlarge master instance
  • Five r3.xlarge core instances

Note that this cluster setup is completely arbitrary. This cluster is the one I typically use for proof-of-concept work. Note that the cluster is not optimized. (Optimization is outside the scope of this post.) You can modify your cluster nodes or Spark job configuration as you wish. For a reference, see AWS Big Data Blog post.

If you haven’t read the readme file for both JobServer and the EMR-JobServer configurations, do that before proceeding. Then, get the project located in GitHub and explore. The project is laid out in a typical Maven structure with additional directories for the configurations and BA.

Next, look at the version information in the JobServer readme file and determine the version of JobServer you’d like to use based on the version of Spark you are using. In this example, we are using Amazon EMR 4.7.1, which supports Spark 1.6.1. Thus, based on the readme, we will need version 0.6.2 (v0.6.2 branch) of JobServer. Make a note of this for later.

As you read the readme for EMR-JobServer, you’ll see there are two configuration files to be aware of:

  1. emr.sh – this file defines parameters related to your Spark installation. For our example, these points apply:
    1. You only need to modify the SPARK_VERSION value.
    2. We will use the emr_v1.6.1.sh file provided in this blog post’s sample code.
  2. emr.conf – this file defines the Spark run-time file used and some contexts that can be created. For our example, these points apply:
    1. We are creating a pre-defined spark-sql context for Hive.
    2. Researching the context definitions under the job-server-extras part of the GitHub project for JobServer helps a lot to understand these context factories.
    3. We will use the emr_contexts.conf file provided in this blog post’s sample code.

You can find copies of these configuration files in the GitHub project under <project-root>/jobserver-configs. Review and become familiar with them. We will be staging them on Amazon S3 for the cluster creation later on.

We also provide two BAs and an EMR configuration sample under <project-root>/BA:

  1. full_install_jobserver_BA.sh – this BA installs all necessary build components on your cluster, gets the project from GitHub, compiles, and creates a JobServer distribution. When installation is complete, this BA deploys JobServer and also puts the .tar file for the compiled code onto S3 for reuse.
  2. existing_build_jobserver_BA.sh – this BA looks for a precompiled distribution of JobServer in S3 and deploys that onto the cluster.
  3. configurations.json – this sample EMR configuration is provided for illustration purposes. Here, we’re setting the Spark cluster to use the maximumResourceAllocation option.

Why two BAs? When you determine the version of JobServer you want to use and begin to use it extensively, the overhead of installing the build frameworks and compiling the source code becomes redundant and time-consuming. You have built the distro and it is available on Amazon S3, so why save time by reusing it and streamlining the build process of the cluster,? This approach also means that your EMR cluster is cleaner, because it won’t have SBT and Git installed. However, this is just a matter of preference. We will walk through both approaches.

Compile and install JobServer from GitHub

In this section, we will work with full_install_jobserver_BA.sh, a bash script that performs the necessary tasks. We will put the BAs, EMR configurations, and JobServer configurations onto S3 for ease of access. For this to work, you’ll need to define which bucket on S3 you’d like to use as a staging area for the builds.

At the top of full_install_jobserver_BA.sh are the variables you need to configure:

  • JOBSERVER_VERSION – the GitHub version of JobServer you settled on (v0.6.2 in our case)
  • EMR_SH_FILE – the configuration file you created preceding, staged on Amazon S3 using the full object key
  • EMR_CONF_FILE – the configuration file you created preceding, staged on Amazon S3 using the full object key
  • S3_BUILD_ARCHIVE_DIR – the location in Amazon S3 where you want the compiled distribution to live after the cluster is created for reuse

Once you’ve defined these parameters in the BA, upload the following to S3: the BA (full_install_jobserver_BA.sh), the configuration for Amazon EMR (configurations.json), and the JobServer configurations (emr.sh and emr_contexts.conf in this case). I typically use the AWS CLI to move files to S3. Below are the CLI commands for this upload.

> aws s3 cp <localDir>full_install_jobserver_BA.sh s3://<my-bucket>/<my-object-path>/full_install_jobserver_BA_.sh

> aws s3 cp <localDir>configurations.json s3://<my-bucket>/<my-object-path>/configurations.json

> aws s3 cp <localDir>emr.sh s3://<my-bucket>/<my-object-path>/emr.sh

> aws s3 cp <localDir>emr_contexts.conf s3://<my-bucket>/

For this example, we’ll streamline the process to create a cluster and highlight only the pertinent areas using the AWS console:

  1. In the Advanced Options section, choose EMR 4.7.1, Hive, Hadoop, and Spark 1.6.1.
  2. Select the Load JSON from S3 option, and browse to the configurations.json file you staged.
  3. Define your cluster size and counts as described preceding.
  4. Choose Additional Options, Bootstrap Actions, Custom Action, and then Configure and Add.
  5. Under Script Location, browse to your BA (full_install_jobserver_BA.sh) in S3.
  6. Make sure you have access to an Amazon EC2 key pair, because you will need to use Secure Shell (SSH) to get onto the master node later.
  7. Create your cluster.

You can find a sample AWS CLI command for this cluster under <project-root>/commands/commands-cluster.txt for reference.

The cluster takes around 15 minutes or so to create because the BA is installing some build artifacts and compiling the code into a .tar file, then extracting the .tar file onto the server. The BA also copies the .tar to S3 for reuse.

When the cluster is in the waiting state, you can verify the build:

  • Use SSH to get onto the master node and verify that /mnt/lib/spark-jobserver exists.
  • Verify that job-server-$JOBSERVER_VERSION.tar.gz is in your S3 bucket.

To get familiar with the distribution, explore /mnt/lib/spark-jobserver.

Install JobServer from Amazon S3

In this section, we will work with the existing_build_jobserver_BA.sh, a bash script that performs the necessary tasks. As with the other BA, there are configuration values you need to edit:

  • EMR_SH_FILE – set this value to the full path and name of configuration file you created preceding, staged on Amazon S3
  • EMR_CONF_FILE – set this value to the full path and name of the configuration file you created preceding, staged on Amazon S3
  • S3_FULL_PATH_TAR – set this value to the location in Amazon S3 where the compiled distribution is located (from the full_install_jobserver_BA.sh BA)

Make sure the files are on S3 by running the following CLI commands.

> aws s3 cp <localDir>existing_build_jobserver_BA.sh s3://<my-bucket>/<my-object-path>/existing_build_jobserver_BA.sh

> aws s3 cp <localDir>configurations.json s3://<my-bucket>/<my-object-path>/configurations.json

> aws s3 cp <localDir>emr.sh s3://<my-bucket>/<my-object-path>/emr.sh

> aws s3 cp <localDir>emr_contexts.conf s3://<my-bucket>/<my-object-path>/emr_contexts.conf

Next, create the cluster as depicted preceding, but this time reference existing_build_jobserver_BA.sh. You need to make sure that the compiled version of the jobserver.tar file matches the EMR version and Spark version you are using.

This cluster should take the normal amount of time to create (in my case, under 10 minutes). Your time will vary based on your setup and options, but this BA reduces the creation time compared to full_install_jobserver_BA.sh. The installation is also located at /mnt/lib/spark-jobserver.

Run JobServer

Now that we have the cluster created, we will do the following:

  1. Use the FlightsBatch job in yarn-client mode through spark-submit as a reference to baseline the flight data sample in Spark.
  2. Start JobServer with a predefined Hive context in SparkSQL.
  3. Load the flight dataset into that JobServer context.
  4. Run ad hoc queries by using the RESTful interface of JobServer.

We will be doing this on the master node, so make sure you have the SOCKS proxy enabled so that you can access the Spark and JobServer UIs. This great AWS Big Data Blog post covers the finer points of spark-submit. If you’re running this cluster in production, you can open port 8090 on your master node security group. Consult your security representative to verify that this approach is acceptable.

Once you have the proxy enabled and have reached the master node by using SSH, be aware of these locations:

  • /mnt/lib/spark-jobserver – the location where JobServer is installed and where you will start and stop JobServer
  • /mnt/var/log/spark-jobserver – the location of the logs for JobServer; it’s very helpful to tail these logs

Next, you create the .jar file for the jobs we will execute. You will need maven to create the jar distribution. This .jar file will contain the traditional batch Spark job (com.amazonaws.proserv.blog.FlightsBatch) and also the JobServer artifacts that use the proper interfaces (com.amazonaws.proserv.blog.FlightsSql.scala). Create the .jar file with maven.

> mvn clean package

Put the .jar file (jobserverEmr-1.0.jar) onto your master node. We will use this file for our JobServer execution. In this example, we will put it at /mnt/lib/spark-jobserver. We will skip the actual benchmarking execution (FlightsBatch), but you will find sample spark-submit commands and all commands we run on the master node at <project-root>/commands/commands-flights.txt. For this size of cluster without optimization, it takes around 7 minutes to load the data into SparkSQL context. When the data is there, the SQL execution is quick, which is the point of using JobServer.

Loading JobServer

Now, let’s start JobServer with our predefined context.

> cd /mnt/lib/spark-jobserver

> ./server-start.sh

Verify that the server started by browsing to http://<master-node-dns>:8090 and confirming that hive-context has been created on the Contexts tab. The image below shows the JobServer UI with preloaded content.

 

Now, let’s add our .jar file so JobServer can have it available for execution. We will do this by using the REST interface. I use CURL, but you can use any REST-compliant interface.

> curl --data-binary @jobserverEmr-1.0.jar localhost:8090/jars/flightsample

Verify that the jar was successfully loaded by the Spark JobServer UI again by checking the Jars tab. The image below shows the JobServer UI with available jars.

Now we can load the data into hive-context and send ad hoc queries. We will separate these two steps. Typically, I open a second SSH window just to tail the logs. That is, I have 2 SSH sessions open at the same time:

> tail -200f /mnt/var/log/spark-jobserver/spark-job-server.log

First, load the data by using the RESTful interface:

> curl -d "loc = "s3://us-east-1.elasticmapreduce.samples/flightdata/input"" 'localhost:8090/jobs?appName=flightsample&classPath=com.amazonaws.proserv.blog.FlightsHiveLoad&context=hive-context'

Spark is now aware of the data and the directed acyclic graph (DAG), but the data hasn’t been acted on yet, so it hasn’t been loaded into the context. To do that, we run the first ad hoc query to perform an action that will load the data into an RDD:

> curl -d "sql = "SELECT origin, count(*) AS total_departures FROM flights WHERE year >= '2000' GROUP BY origin ORDER BY total_departures DESC LIMIT 10"" 'localhost:8090/jobs?appName=flightsample&classPath=com.amazonaws.proserv.blog.FlightsHiveTest&context=hive-context'

At this point, the benchmark comes in handy. We know that to load the entire dataset takes approximately 7 minutes, so we are not going to wait for the response for this first query. In the JobServer UI, we can see our initial request in the Running Jobs category. In the image below, you can see the JobServer UI with FlightHiveLoad executed and FlightsHiveTest SQL executing, in Running state.

Once this query is in the Completed Jobs category, we can execute the same query (or other queries) and wait for the response because the data is already loaded in memory.

For example, if we execute the same query we used to load the data with a timeout and a sync flag, we will get a response very quickly:

> curl -d "sql = "SELECT origin, count(*) AS total_departures FROM flights WHERE year >= '2000' GROUP BY origin ORDER BY total_departures DESC LIMIT 10"" 'localhost:8090/jobs?appName=flightsample&classPath=com.amazonaws.proserv.blog.FlightsHiveTest&context=hive-context&sync=true&timeout=150'

{

  "result": ["[ATL,5683237]", "[ORD,5063315]", "[DFW,4402632]", "[LAX,3318774]", "[DEN,3038788]", "[PHX,2801223]", "[IAH,2701763]", "[LAS,2313143]", "[DTW,2126781]", "[SFO,2115094]"]

}

> curl -d "sql = "SELECT origin, dest, count(*) AS total_flights FROM flights WHERE year >= '2000' GROUP BY origin, dest ORDER BY total_flights DESC LIMIT 10"" 'localhost:8090/jobs?appName=flightsample&classPath=com.amazonaws.proserv.blog.FlightsHiveTest&context=hive-context&sync=true&timeout=150'

{

  "result": ["[LAX,LAS,192576]", "[LAS,LAX,189801]", "[SFO,LAX,189702]", "[LAX,SFO,187680]", "[SAN,LAX,171133]", "[LAX,SAN,171059]", "[PHX,LAX,164053]", "[LAX,PHX,162193]", "[LGA,ORD,161917]", "[ORD,LGA,159844]"]

}

You can now issue ad hoc queries on the data currently in memory and correlate the requests by using the JobServer UI. The image shows the JobServer UI with multiple ad hoc queries executed

Conclusion

In the blog post, you learned how to use a bootstrap action to install Spark JobServer on EMR, both by compiling fully from source code in GitHub and by a precompiled distribution. We then demonstrated how we can start JobServer with a predefined Hive context in SparkSQL and warm that context with the flights dataset publicly available in Amazon S3. Finally, we walked through how to pose ad hoc queries by using the RESTful JobServer interface with the flights dataset to get synchronous responses without incurring the overhead of reloading the data into memory.

If you have any questions or suggestions, please leave a comment below.

 

——————————

Related

Top 10 Performance Tuning Techniques for Amazon Redshift