AWS Big Data Blog

Using Spark SQL for ETL

Ben Snively is a Solutions Architect with AWS

With big data, you deal with many different formats and large volumes of data. SQL-style queries have been around for nearly four decades. Many systems support SQL-style syntax on top of the data layers, and the Hadoop/Spark ecosystem is no exception. This allows companies to try new technologies quickly without learning a new query syntax for basic retrievals, joins, and aggregations.

Amazon EMR is a managed service for the Hadoop and Spark ecosystem that allows customers to quickly focus on the analytics they want to run, not the heavy lifting of cluster management.

In this post, we demonstrate how you can leverage big data platforms and still write queries using a SQL-style syntax over data that is in different data formats within a data lake. We first show how you can use Hue within EMR to perform SQL-style queries quickly on top of Apache Hive. Then we show you how to query the dataset much faster using the Zeppelin web interface on the Spark execution engine. Lastly, we show you how to take the result from a Spark SQL query and store it in Amazon DynamoDB.

Hive and Spark SQL history

For versions <= 1.x, Apache Hive executed native Hadoop MapReduce to run the analytics and often required the interpreter to write multiple jobs that were chained together in phases.  This allowed massive datasets to be queried but was slow due to the overhead of Hadoop MapReduce jobs.

SparkSQL adds this same SQL interface to Spark, just as Hive added to the Hadoop MapReduce capabilities. SparkSQL is built on top of the Spark Core, which leverages in-memory computations and RDDs that allow it to be much faster than Hadoop MapReduce.

Spark integrates easily with many big data repositories.  The following illustration shows some of these integrations.

Using SparkSQL for ETL

In the second part of this post, we walk through a basic example using data sources stored in different formats in Amazon S3. Using a SQL syntax language, we fuse and aggregate the different datasets, and finally load that data into DynamoDB as a full ETL process.

The table below summarizes the datasets used in this post.

Create a table in Hive/Hue

Hive and SparkSQL let you share a metadata catalogue. This allows you to create table definitions one time and use either query execution engine as needed. All table definitions could have been created in either tool exclusively as well.

First, launch an EMR cluster with Hive, Hue, Spark, and Zeppelin configured. It’s recommended that you run a cluster with at least four core nodes if the default instance size is m3.xlarge.

Then launch a Hue browser and navigate to the query section.  To learn how to enable web interface access to Hue, see View Web Interfaces Hosted on Amazon EMR Clusters.

The first table to create is the ratings table.  The table definition specifies the tab-separated values in the ROW FORMAT line below:

CREATE EXTERNAL TABLE IF NOT EXISTS UserMovieRatings (
userId int,
movieId int,
rating int,
unixTimestamp bigint
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/user-movie-ratings'

After you create the table, you select the row icon to the left of the table to refresh the table listing on the left side and see sample data.

Next, create the MovieDetails table to query over.  This data has two delimiters: a hash for the columns and a pipe for the elements in the genre array.

CREATE EXTERNAL TABLE IF NOT EXISTS MovieDetails (
movieId int,
title string,
genres array<string>
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '#'
collection items terminated by '|'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/movie-details'

After you create the array, the genres appear in the sample data browser.

Transform the data using SparkSQL/Zeppelin

Now interact with SparkSQL through a Zeppelin UI, but re-use the table definitions you created in the Hive metadata store.   You’ll create another table in SparkSQL later in this post to show how that would have been done there.

Connect to the Zeppelin UI and create a new notebook under the Notebook tab. Query to show the tables. You can see that the two tables you created in Hive are also available in SparkSQL.

%sql SHOW tables

Using SparkSQL, you can perform the same query as you did in Hive in a previous step.

Note: The last semi-colon at the end of the statement was removed.

%sql SELECT title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
WHERE rating = 5
GROUP BY title
ORDER BY numberOf5Ratings desc limit 5

This time, it will usually take less than 30 seconds for SparkSQL to query the data and return the results.  The actual response time depends on the size of the EMR cluster.

Spark lets you leverage an RDD for data that is queried and iterated over.  You can tell Spark to do this with your usermovieratings table, by executing the following command:

%sql cache table usermovieratings

and:

%sql cache table moviedetails

Now, execute the query again:

%sql SELECT title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
WHERE rating = 5
GROUP BY title
ORDER BY numberOf5Ratings desc limit 5

This time, the query returned within a couple seconds so that analysts can quickly interact with the large data set in the RDD.

Suppose you want the same information as the previous query, but this time broken out by the top five movies for males and the top five for females. To do this, bring in the data set user-details. This data set contains information such as gender and occupation.  This data set is pipe delimited.

%sql CREATE EXTERNAL TABLE IF NOT EXISTS UserDetails (
userId int,
age int,
gender CHAR(1),
occupation string,
zipCode String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/user-details'

This query combines two queries in a union statement.  The first query gets the five top-rated movies for males using all three datasets and then combines the results with the five top-rated movies for females:

%sql SELECT * FROM (SELECT 'M' AS gender, title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
JOIN userdetails u ON (r.userid = u.userid)
WHERE rating = 5
AND gender = 'M'
GROUP BY title, gender 
ORDER BY numberOf5Ratings desc limit 5) AS m
UNION
SELECT * FROM (SELECT 'F' AS gender, title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
JOIN userdetails u ON (r.userid = u.userid)
WHERE rating = 5
AND gender = 'F'
GROUP BY title, gender 
ORDER BY numberOf5Ratings desc limit 5) AS f
ORDER BY gender desc, numberof5Ratings desc

Because the ratings table is still cached in the SparkContext, the query happens quickly (in this case, four seconds).

Load the transformed data into DynamoDB

Next, create a new DynamoDB table that saves the number of ratings that users voted on, per genre and rating number.  To query this, you first need to figure out which movies were voted on. Combine that information with the movie details data and figure out the movie’s genres to know how are users voting per genre.

The following SQL statement queries for that information and returns the counts:

SELECT genre, rating, count(*)  ratingCount
FROM UserMovieRatings r
JOIN (SELECT movieid, title, explode(genres) AS genre FROM moviedetails) m
ON (r.movieid = m.movieid)
GROUP BY genre, rating

Notice that you are exploding the genre list in the moviedetails table, because that column type is the list of genres for a single movie.

Create a new DynamoDB table to store the results of the SQL query in the same region in which you are running. In this post, we use us-east-1. Use the following settings:

Note: Change the type for the range key, because the code below stores the rating as a number.

Next, SSH to the master node for the EMR cluster.  Here’s how to use the EMR-DDB connector in conjunction with SparkSQL to store data in DynamoDB.

Start a Spark shell, using the EMR-DDB connector JAR file name:

spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar

To learn how this works, see the Analyze Your Data on Amazon DynamoDB with Apache Spark blog post.

Paste this code into the Spark shell prompt:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
import java.util.HashMap
 
var ddbConf = new JobConf(sc.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "GenreRatingCounts")
ddbConf.set("dynamodb.throughput.write.percent", "0.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")

 
var genreRatingsCount = sqlContext.sql("select genre, rating, count(*)  ratingCount from UserMovieRatings r join (select movieid, title, explode(genres) as genre from moviedetails) m on (r.movieid = m.movieid) group by genre, rating order by genre, rating")

var ddbInsertFormattedRDD = genreRatingsCount.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()

var catValue = new AttributeValue()
catValue.setS(a.get(0).toString)
ddbMap.put("genre", catValue)

var ratingValue = new AttributeValue()
ratingValue.setN(a.get(1).toString)
ddbMap.put("rating", ratingValue)

var countValue = new AttributeValue()
countValue.setN(a.get(2).toString)
ddbMap.put("count", countValue)
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)

(new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

After you run the code, notice that the DynamoDB table now has 95 entries which contain the rating and the number of ratings per genre.

The ddbConf defines the Hadoop configuration that allows Spark to use a custom Hadoop input/output for reading and writing the RDD being created.

The next major piece of code executes the SparkSQL statement.  At this point, query the different datasets in S3 to get the data to store in DynamoDB.

var genreRatingsCount = sqlContext.sql("select genre, rating, count(*)  ratingCount from UserMovieRatings r join (select movieid, title, explode(genres) as genre from moviedetails) m on (r.movieid = m.movieid) group by genre, rating order by genre, rating")

The query result is stored in a Spark DataFrame that you can use in your code.

After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write.  The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

Create a new RDD with those types in it, in the following map call:

var ddbInsertFormattedRDD = genreRatingsCount.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
<Lines omitted, complete version is above…>
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
)

The ddbInsertFormattedRDD now contains elements that look like this for the DynamoDBItemWritable element in the tuple:

{count={N: 4049,}, category={S: Action,}, rating={N: 3,}}
{count={N: 5560,}, category={S: Action,}, rating={N: 4,}}
{count={N: 3718,}, category={S: Action,}, rating={N: 5,}}
{count={N: 654,}, category={S: Adventure,}, rating={N: 1,}}
{count={N: 1126,}, category={S: Adventure,}, rating={N: 2,}}

This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

Conclusion

EMR makes it easy to run SQL-style analytics in both Spark and Hive. As this post has shown, connectors within EMR and the open source community let you easily talk to many data sources, including DynamoDB. Rather than focusing on standing up the software and managing the cluster, with EMR you can quickly process and analyze your data and store the results in destinations such as NoSQL repositories and data warehouses.

If you have a question or suggestion, please leave a comment below.

————————————-

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.