AWS Big Data Blog

Test drive two big data scenarios from the ‘Building a Big Data Platform on AWS’ bootcamp

Matt Yanchyshyn is a Sr. Manager for AWS Solutions Architecture

AWS offers a number of events during the year such as our annual AWS re:Invent conference, the AWS Summit series, the AWS Pop-up Loft, and a variety of roadshows such as the upcoming AWS Big Data Solutions Days. All of these provide opportunities for AWS customers to attend talks focused on big data and participate in hands-on learning with AWS trainers, Solutions Architects and product teams.  At our annual AWS re:Invent conference last year in Las Vegas, for example, there were more than twenty sessions in the big data track, including full-day bootcamp and hands-on labs.

AWS Summit Audience

At the AWS Pop-up Loft and at a few of this year’s AWS Summits we have been giving a full-day bootcamp called “Building a Big Data Platform on AWS”. (Formerly called “Store, Manage and Analyze Big Data in the Cloud”) This bootcamp provides a broad, hands-on introduction to the collection, storage, and analysis of data using AWS services and 3rd-party software. During the day, we dive deep into Amazon S3, Amazon EMR, Amazon Redshift and Amazon Kinesis, and show you how to combine them to build a comprehensive platform for both batch and streaming data analysis. Hands-on exercises help you learn to work with these services and test best practices for designing data analytics environments.  This course is highly technical and is geared towards data analysts, DBAs, developers and solution architects.

What is covered in the ‘Building a Big Data Platform on AWS’ bootcamp?

Building a Big Data Platform on AWS covers the following:

  • Efficient storage and handling of large datasets with Amazon Simple Storage Service (S3)
  • Batch data processing with Apache Hadoop and Hive running on EMR
  • Interactive querying for large datasets with Presto and Spark running on EMR
  • Streaming data collection and real-time analysis with Amazon Kinesis and Spark Streaming
  • Best practices for deploying, data modeling, data distribution and querying with Amazon Redshift

The hands-on exercises start simple. For example, you’ll learn how to efficiently copy and aggregate data for batch processing with EMR and  S3. They gradually get more complex as the day progresses, including advanced Amazon Redshift query and table optimization and code for real-time stream processing at scale.

We can’t possibly cover all AWS big data services in a day, so we are hard at work on new, complementary bootcamps that focus on Amazon DynamoDB, Amazon Elastic Countainer Service/Docker, Amazon Machine Learning, Amazon ElasticSearch (SOLR/CloudSearch) and more.

Sample exercises from the bootcamp

There are currently seven hands-on exercises in the Building a Big Data Platform on AWS bootcamp.  Below are two partial examples.  The first shows you how to process data in a Kinesis stream with the Spark Streaming framework running on EMR.  In the second, we explore query optimization with Amazon Redshift.

Processing data in your Amazon Kinesis Stream with Spark

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput and fault-tolerant processing of data streams. When Spark Streaming receives data it divides it into batches. It then uses the Spark engine for processing and result generation. You can run your Spark Streaming cluster in several modes: local, standalone, or with cluster management frameworks such as Mesos and YARN.  In the AWS Building a Big Data Platform on AWS bootcamp, we use EMR clusters running Hadoop 2 and YARN to run Spark Streaming.  This approach allows us to experiment with multiple applications on a single cluster, including Hive, Presto, Spark and more.

This exercise shows you how to use Spark Streaming to extract information from incoming clickstream data.  Specifically, we extract “HTTP referrer” (what website a visitor to your website was on before landing on yours).

  1. From the Services menu on the AWS Management Console, select EMR
  1. From the drop down menu in the top-right corner of the console, select US West (Oregon) as your region.
  1. Click the “Create cluster” button. You can choose any Cluster name.
  1. In the Cluster Configuration section, set Termination Protection to “No” and uncheck Logging.
  1. In the Software Configuration section ensure that Hadoop version is set to Amazon and the AMI Version is the latest (3.8.0 as of June 2015), with Hive listed under applications to be installed. You can optionally remove Pig.
  1. Select Spark in the Additional Applications drop-down and then click Configure and add.
  1. In the “Add Application” pop-up section add-x -lWARNin the text box labeled as Arguments (Optional) and click on Add.  (The -x argument configures Spark to use the optimal amount of cores and memory available in the cluster. The -lWARN argument sets the log level to display warnings only.)
  1. In the Hardware Configuration section, set the Network to a VPC in the list that you have access to.
  1. Set the instance type to (1) m3.xlarge for the Master node and (2) m3.xlarge for the Core nodes.
  1. In the Security and Access section, change the EC2 key pair to one you own.
  1. Under IAM Roles, make sure that Default is selected in the Roles configuration
  1. Leave all options with their defaults and then click “Create Cluster”
  1. On the next page, your cluster will be in the “Starting” state.  This will change from Starting to Running and then Waiting.
  1. From the Services menu on the AWS Management Console, select Kinesis. From the drop down menu in the top-right corner of the console, select US West (Oregon) as your region.
  1. On the following screen, click on Create Stream.
  1. For Stream Name, use “KinesisStream”  and input “1” for the Number of Shards. After that, select Create in the bottom right corner of the screen.
  1. Verify that your Kinesis Stream is in the CREATING Status. After a few moments, the status should transition to ACTIVE.

(To review the stream summary in the Stream Details page, click on the stream name itself . You should see empty Kinesis CloudWatch graphs. The red lines above the Write Capacity and Read Capacity graphs indicate the maximum provisioned write capacity of 1 MB/ sec and read capacity of 2 MB/ sec for the KinesisStream with a single shards.)

  1. Create a temporary S3 bucket with a unique name of your choice:

aws s3 mb s3://YOUR-S3-BUCKET –region us-west-2

  1. SSH into your EMR cluster and start the spark-shell:

spark/bin/spark-shell
–jars spark/lib/spark-streaming-kinesis-asl_2.10-1.3.0.jar

  1. Setup the Spark Shell for your streaming application.  Replace YOUR-S3-BUCKET with the bucket name that you just created.

/* import dependencies */
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker._
import java.util.Date
import org.apache.hadoop.io.compress._
import scala.util.parsing.json.JSON
import org.apache.log4j.Logger
import org.apache.log4j.Level

/* disable logging */
Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“akka”).setLevel(Level.OFF)

/* Set up the variables as needed */
val streamName = “bootcamp”
val endpointUrl = “https://kinesis.us-west-2.amazonaws.com”
val outputDir = “s3://YOUR-S3-BUCKET/raw-stream”
val outputBatchInterval = Seconds(60)
val inputBatchInterval = Seconds(2)

/* Reconfigure the spark-shell */
val sparkConf = sc.getConf
sparkConf.setAppName(“KinesisReferrerCount”)
sparkConf.remove(“spark.driver.extraClassPath”)
sparkConf.remove(“spark.executor.extraClassPath”)

sc.stop()
val sc = new SparkContext(sparkConf)

  1. Create a worker for each shard in your Amazon Kinesis stream. Each worker will create a Spark Streaming Discretized Stream (DStream; a continuous sequent of RDDs) which we will eventually merge into a single DStream.

/* Setup the KinesisClient */
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)

/* Determine the number of shards from the stream */
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size()

/* Create a streaming context and then create one worker per shard */
val ssc = new StreamingContext(sc, inputBatchInterval)
val kinesisStreams = (0 until numShards).map { i =>
KinesisUtils.createStream(ssc, streamName, endpointUrl,
inputBatchInterval,InitialPositionInStream.LATEST,
StorageLevel.MEMORY_ONLY)
}

/* Merge the worker Dstreams and translate the byteArray to string */
val unionStreams = ssc.union(kinesisStreams)
val accessLogs = unionStreams.flatMap(byteArray => new String(byteArray).split(” “))

  1. With the records now available in the accessLogs Dstream, we can now translate the string into a JSON object and extract the HTTP referrer field. We will group the dataset by referrer, using the reduceByKey operation on every RDD in the DStream:

val jsonFields = accessLogs.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) val referrerCount = jsonFields.map(data => data(“referrer”).toString).map(word => ( word.split(‘.’)(1), 1)).reduceByKey(_ + _) referrerCount.print()

  1. We can also derive a second DStream from the accessLogs Dstream by applying the windowing function over a sliding interval of outputBatchInterval.  We will store the resulting output in S3:

val batchLogs = accessLogs.window(outputBatchInterval,outputBatchInterval)
batchLogs.foreachRDD( (rdd,time) => {
if (rdd.count > 0) {
val outPartitionFolder = new java.text.SimpleDateFormat(“yyyy/MM/dd/HH/mm”).format(new Date(time.milliseconds))
rdd.coalesce(1).saveAsTextFile(“%s/%s”.format(outputDir, outPartitionFolder)+”/logs_”+time.milliseconds.toString,classOf[GzipCodec])
}})

  1. Now that setup is complete, start the Spark Streaming context and leave the application running for 2-3 minutes to see the results.  When you’re done, hit Control-C to quit:

ssc.start()
ssc.awaitTermination()

  1. After terminating the above program, check your S3 bucket. You should see compressed files in your S3 bucket under “raw-stream/year/month/day/hour/minute/logs_timestamp.”

Using the Explain Plan to debug long-running queries in Amazon Redshift

The Explain Plan shows the logical steps that Amazon Redshift will perform for the query. The corresponding EXPLAIN command does not actually run the query; the output contains only the plan that Amazon Redshift will execute if the query is run under current operating conditions. If you change the schema of a table in some way or if you change the data in the table and run ANALYZE again to update the statistical metadata, the explain plan might be different.

Reading the explain plan from the bottom up, you can see a breakdown of logical operations needed to perform the query as well as an indication of their relative cost and the amount of data that needs to be processed. By analyzing the plan, you can often identify opportunities to improve query performance.

To illustrate this, let’s dig into a straightforward example of how to optimize joins:

  1. Start a single-node Amazon Redshift cluster:

aws redshift create-cluster –db-name bootcamp –node-type dw2.large
–region us-west-2 –cluster-type single-node
–master-username master –master-user-password Redshift123
–cluster-identifier bootcamp –port 8192

  1. Connect to your Amazon Redshift cluster.  (Instructions here: https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-connect-to-cluster.html)
  1. Create tables to hold our sample data:

CREATE TABLE cloudfront ( logtime TIMESTAMP, edge VARCHAR(40), bytes INT, client_ip VARCHAR(50), http_method VARCHAR(50), cfhost VARCHAR(512),  uri VARCHAR(2048), file_name VARCHAR(1024), content_type VARCHAR(256), user_id INT, order_id INT, status INT, referer VARCHAR(4096), referer_host VARCHAR(256), referer_protocol VARCHAR(16)) DISTKEY(user_id) SORTKEY(logtime);

CREATE TABLE user_profile ( user_id INT, age INT, gender char, city VARCHAR(256), country VARCHAR(256));

CREATE TABLE games ( game_id INT, gamename VARCHAR(256), price DECIMAL);

CREATE TABLE order_line ( customer_id INT, order_id INT, description VARCHAR(256), product_id INT, unitprice DECIMAL, quantity INT, extended_price DECIMAL, line_tax DECIMAL);

  1. Load sample data into the tables, replacing YOUR-ACCESS-KEY and YOUR-SECRET-KEY with AWS IAM credentials from your AWS account corresponding to an AWS IAM user that has access to S3:

COPY cloudfront FROM ‘s3://aws-bigdata-bootcamp/data/logs-processed/’ CREDENTIALS ‘aws_access_key_id=YOUR-ACCESS-KEY;aws_secret_access_key=YOUR-SECRET-KEY’ DELIMITER ‘t’ IGNOREHEADER 0 MAXERROR 0 GZIP;

COPY user_profile FROM ‘s3://aws-bigdata-bootcamp/data/users.lzo’ CREDENTIALS ‘aws_access_key_id=YOUR-ACCESS-KEY;aws_secret_access_key=YOUR-SECRET-KEY’ DELIMITER ‘,’ IGNOREHEADER 0 MAXERROR 0 LZOP;

COPY order_line FROM ‘s3://aws-bigdata-bootcamp/data/orders/’ CREDENTIALS ‘aws_access_key_id=YOUR-ACCESS-KEY;aws_secret_access_key=YOUR-SECRET-KEY’ DELIMITER ‘|’ IGNOREHEADER 0 MAXERROR 0 GZIP;

COPY games FROM ‘s3://aws-bigdata-bootcamp/data/gameList.csv’ CREDENTIALS ‘aws_access_key_id=YOUR-ACCESS-KEY;aws_secret_access_key=YOUR-SECRET-KEY’ DELIMITER ‘,’ IGNOREHEADER 0 MAXERROR 0;

  1. Use the EXPLAIN command below to determine how an inefficient query can be rewritten to perform better.   The result includes a warning about an inefficient Nested Loop, which has a very high row count and very high relative cost. Nested Loop is the least-optimal join in Amazon Redshift and should be avoided in most cases. It is used mainly for cross-joins (Cartesian products) and some inequality joins. A by-product of the Nested Loop in the query above is the very high number of rows that must be evaluated before this query can aggregate the results. This is because the query is causing Amazon Redshift do a nested loop that scans over 4 million rows, 670 million times!

EXPLAIN SELECT u.gender, g.gamename, count(*) AS purchases
FROM user_profile AS u, games AS g, order_line AS ol
GROUP BY u.gender, g.gamename
ORDER BY purchases DESC;

  1. Re-write the query, using JOINs. The WHERE or INNER JOIN clauses allow Amazon Redshift to perform Hash Joins and Hashes instead of Nested Loops. In the Explain Plan you will see a sharp decline in the number of rows that will need to be accessed as compared to the Nested Loop example above. You will also see a “Hash” operator. This creates the hash table for the inner table in the join by reading the outer table, hashing the joining column, and finding matches in the inner hash table. Hash Joins are typically faster than a Nested Loop join. They are used for inner joins and left and right outer joins. This operator will generally be used when you are joining tables if your joining columns for both tables are not both distribution keys and sort keys

EXPLAIN SELECT u.gender, g.gamename, count(*) AS purchases
FROM user_profile AS u
JOIN order_line AS ol ON ol.customer_id = u.user_id
JOIN games AS g ON g.game_id = ol.product_id

GROUP BY u.gender, g.gamename
ORDER BY purchases DESC;

  1. Next, run the new query without EXPLAIN to see if it improves performance. Unlike the never-ending Nested Loop query in the first example, this second query should finish in <5 seconds:

SELECT u.gender, g.gamename, count(*) AS purchases
FROM user_profile AS u
JOIN order_line AS ol ON ol.customer_id = u.user_id
JOIN games AS g ON g.game_id = ol.product_id
GROUP BY u.gender, g.gamename
ORDER BY purchases DESC;

  1. Run the same query one more time. It should perform even faster the second time.  Why? Amazon Redshift’s execution engine assembles a sequence of steps, segments, and streams to execute the query plan supplied by the optimizer. It then generates and compiles C++ code, and sends the compiled code segments to the compute nodes. The compiled code executes much faster because it eliminates the overhead of using an interpreter, but there is always some overhead cost the first time the code is generated and compiled, even for the cheapest query plans. As a result, the performance of a query the first time you run it can be misleading. The compiled code is cached and shared across sessions in a cluster, so subsequent executions of the same query will run faster, even with different query parameters and in different sessions, because they can skip the initial generation and compilation steps.

Conclusion

Hopefully the description and examples above gave you a good taste of the Building a Big Data Platform on AWS.  We encourage you to sign up for this  bootcamp and others at the AWS Summits, AWS Pop-up Loft and AWS re:Invent.  We are also always looking for ideas for new full-day training sessions, so please leave any ideas in the comments.