AWS Big Data Blog

Integrating Amazon Kinesis, Amazon S3 and Amazon Redshift with Cascading on Amazon EMR

This is a guest post by Ryan Desmond, Solutions Architect at Concurrent. Concurrent is an AWS Advanced Technology Partner.

With Amazon Kinesis developers can quickly store, collate and access large, distributed data streams such as access logs, click streams and IoT data in real-time. The question then becomes, how can we access and leverage this data for use in data applications running in Amazon EMR? There are many ways to solve this problem, but the solution should be simple, fault tolerant and highly scalable. In this post I demonstrate a micro-batch system that delivers this solution by processing a simulated real-time data stream using Amazon Kinesis with Cascading on Amazon EMR. We will also join our Amazon Kinesis stream with data residing in a file in Amazon S3 and write our results to Amazon Redshift using cascading-jdbc-redshift which leverages Amazon Redshift’s COPY command.

Cascading is a proven, highly extensible application development framework for building massively parallelized data applications on EMR. A few key features of Cascading include the ability to implement comprehensive TDD practices, application portability across platforms such as MapReduce and Apache Tez, and the ability to integrate with a variety of external systems using out-of-the-box integration adapters.

The result of combining Amazon Kinesis, Amazon EMR, Amazon Redshift and Cascading is an architecture that enables end-to-end data processing of streaming data sources.

Thankfully, Amazon has made this a relatively simple and straightforward process for us. Here are the steps we will take:

  1. Review a sample Cascading application that joins data from Amazon Kinesis and S3, processes the data with a few operations on EMR and writes the results to Amazon Redshift.
  2. Create an Amazon Kinesis stream.
  3. Configure an Amazon DynamoDB table to manage data range values, or “windowing.”
  4. Download the Amazon Kinesis publisher sample application.
  5. Use the publisher sample application to populate our Amazon Kinesis stream with sample Apache web log data.
  6. Create an Amazon Redshift cluster.
  7. Use the AWS CLI to create an EMR cluster and run our Cascading application by adding it as a step.

First, let’s look at the directed acyclic graph (DAG), for the Cascading application we will run. You can explore the full application with Driven. If you are prompted to log in, use the username “guest” and the password “welcome.”

As you can see, we source data from Amazon Kinesis as well as S3. We perform several operations on the incoming data before joining the two streams and writing the output to Amazon Redshift.

I review the key components of this application below. You can also view the full source. To run this application on your local machine, clone the Cascading Tutorials repository and move it into the cascading-aws/part4 directory.

$ git clone https://github.com/Cascading/tutorials.git
$ cd tutorials/cascading-aws/part4/

Review and compile Cascading application code

Before we can build and run this application, we must pull the latest EmrKinesisCascading connector down from an EMR instance. At the time of publishing this connector is only available on the EMR instances themselves at “/usr/share/aws/emr/kinesis/lib/”. As this is a build dependency for our Cascading application, we need to pull this library down and install it in a local Maven repository.

Step 1: SCP library from EMR to your local machine

  1. If you do not have access to an EMR instance, follow these instructions to set up a simple EMR cluster. Be sure to enable SSH access to this cluster using your private pem file.
  2. On your local machine, cd to the directory of your choice.
$ scp -i ~/.ssh/<your-private>.pem hadoop@<your-emr-ip>.compute-1.amazonaws.com:/usr/share/aws/emr/kinesis/lib/EmrKinesisCascading* 

Step 2:  Install this library into a local Maven repository

Remain in the same directory as EmrKinesisCascading connector.

$ mvn install:install-file -Dfile=<EmrKinesisCascading-<version>.jar> -DgroupId=aws.kinesis  -DartifactId=cascading-connector -Dversion=<EmrKinesisCascadingVersion> -Dpackaging=jar

Now that we have installed the EmrKinesisCascading connector in our local Maven repository, we can review the sample application.

First, let’s look at how we instantiate our KinesisHadoopScheme and KinesisHadoopTap. With Cascading, a “Tap” is used wherever you read or write data. There are roughly 30 supported Taps available for integration with the most widely used data stores/sources. We will also instantiate several Schemes which are used with Taps to specify the format (and types where necessary) of the incoming/outgoing data.

// instantiate incoming fields, in this case "data" to be used in the KinesisHadoopScheme
Fields columns = new Fields("data");
// instantiate KinesisHadoopScheme to be used with KinesisHadoopTap
KinesisHadoopScheme scheme = new KinesisHadoopScheme(columns);
// set noDateTimeout to true
scheme.setNoDataTimeout(1);
// apply our AWS access and secret keys – please see Disclaimer below regarding 
// the use of AccessKeys and SecretKeys in production systems
scheme.setCredentials([ACCESS_KEY],[SECRET_KEY]);
// instantiate Kinesis Tap to read “AccessLogStream”
Tap kinesisSourceTap = new KinesisHadoopTap("AccessLogStream", scheme);

Now we create our Tap to read a file from S3 that will be joined to the Amazon KinesisStream, and we create the RedshiftTap that we will use to write our final output. For the S3 Tap we will use an Hfs tap which is fully compatible with S3. All you need to do is provide an S3 path instead of an Hfs path.

// instantiate S3 source Tap using Hfs. This Tap will source a comma delimited file of IP address 
// found at the location of s3InStr
Tap s3SourceTap = new Hfs( new TextDelimited( new Fields("ip"), "," ), s3InStr );
// instantiate S3 sink tap - comma delimited with fields “ip”, “count”. Using Redshift’s COPY 
// command we can load this data from S3 very quickly
Tap sinkTap = new Hfs( new TextDelimited( new Fields ("ip","count"), "," ), s3OutStr, SinkMode.REPLACE );
// instantiate S3 trap tap to catch any bad data - this data will be written to S3 and you will 
// be able to see if, and how many tuples are being trapped using Driven 
Tap trapTap = new Hfs(new TextDelimited(true, "t"),  s3TrapStr, SinkMode.REPLACE);

Now that we have our necessary Taps, we can process the data. The Cascading processing model is based on a metaphor of flows based on patterns. Pipes control the flow of data applying operations to each Tuple or groups of Tuples. Within these pipes, we will perform several operations using Each and Every pipe. The operations we will use include RegexParser, Retain, Rename, HashJoin, Discard, GroupBy and Count.

I highlight a few of these operations below. As mentioned earlier, you can view the full source code for this sample application.

RegexParser parser = new RegexParser(apacheFields, apacheRegex, allGroups);
// apply regex parser to each tuple in the Kinesis stream
processPipe = new Each(processPipe, columns, parser);
// retain only the field "ip" from Kinesis stream
processPipe = new Retain(processPipe, new Fields("ip"));
// in anticipation of the upcoming join rename S3 file field to avoid naming collision
joinPipe = new Rename( joinPipe, new Fields( "ip" ), new Fields( "userip" ) );
// rightJoin processPipe and joinPipe (IPs in S3 file) on ip (and renamed "userip")
Pipe outputPipe = new HashJoin(processPipe, new Fields("ip"), joinPipe, new Fields("userip"), new RightJoin());
// discard unnecessary "userip"
outputPipe = new Discard(outputPipe, new Fields("userip"));
// group all by "ip"
outputPipe = new GroupBy(outputPipe, new Fields("ip"));
// calculate count of every group of IP's
outputPipe = new Every(outputPipe, new Count(new Fields("count")));

Now all we have to do is compose the flow by connecting our Taps to Pipes. Then we will connect and complete the flow.

// define the flow definition
FlowDef flowDef = FlowDef.flowDef()
.addSource( processPipe, kinesisSourceTap )   		// connect processPipe to KinesisTap
.addSource( joinPipe, s3SourceTap )           		// connect joinPipe to s3Tap
.addTailSink( outputPipe, sinkTap )          		// connect outputPipe to S3 sink Tap
.setName( "Cascading-Kinesis-Flow" )          		// name the flow
.addTrap( processPipe, trapTap );             		// add the trap to catch any bad data in processPipe

// instantiate HadoopFlowConnector - other flowConnectors include:
//  -- Hadoop2Mr1FlowConnector
//  -- LocalFlowConnector
//  -- Hadoop2TezFlowConnector
//  -- Spark and Flink FlowConnectors under development
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// attach the flow definition to the flow connector
Flow kinesisFlow = flowConnector.connect(flowDef);
// run the flow
kinesisFlow.complete();

Setting up Amazon Kinesis, DynamoDB, Amazon Redshift and CLI

Before we run the Cascading application, let’s take a moment to ensure that the necessary infrastructure is in place and the Amazon Kinesis stream is populated. For simplicity, we use this setup script which handles the following tasks:

  • Create an Amazon Kinesis stream
  • Create two DynamoDB tables required by EMR to process Amazon Kinesis streams
  • Create an Amazon Redshift cluster
  • Download the kinesis-log4j-appender-1.0.0.jar to publish sample data to Amazon Kinesis
  • Download and unpack the sample data
  • Start the Amazon Kinesis Publisher for One-Time Publishing

As you can see, this script interacts with several AWS services via the command line. If you have not done so already, you must install the AWS Command Line Interface (CLI). The AWS CLI is a unified tool for managing your AWS services.

Now that we’ve installed the AWS CLI, we’ll add our AWS credentials to src/main/resource/ AwsCredentials.properties. This is required for kinesis-log4j-appender to publish to our Amazon Kinesis stream. Once that is complete, let’s call the script from the root directly of our sample project.

# add preferred Redshift credentials to setup-script.sh
$ vi //tutorials/cascading-aws/part4/src/main/scripts/setup-script.sh
# add AWS keys for Kinesis publisher –required for kinesis-log4j-appender-1.0.0.jar 
$ vi //tutorials/cascading-aws/part4/AwsCredentials.properties
# cd to root of sample project
$ cd //tutorials/cascading-aws/part4/
# call setup script
$ src/main/scripts/setup-script.sh

Tying everything together

Now that we have all the right pieces in all the right places, we just need to update our configuration.properties file located in the Cascading sample application source code at “src/main/resources” with our respective values.

Disclaimer: In the interest of simplicity for this tutorial, we set our AccessKey and SecretKey values manually. However, it is highly recommended that all systems take advantage of the identity and access management afforded by AWS IAM. In doing so, your AccessKey and SecretKey will be available in your instance profile and will not be required anywhere in your code. The result is a more robust and secure application. This will require that your IAM users and roles are properly configured and grant appropriate permissions in place for all AWS services that you need to access. AWS documentation provides more information on creating an IAM policy and using IAM to control access to Amazon Kinesis resources.

USER_AWS_ACCESS_KEY=
USER_AWS_SECRET_KEY=
REDSHIFT_JDBC_URL=
REDSHIFT_USER_NAME=
REDSHIFT_PASSWORD=

NOTE: At the time of publication cascading-jdbc-redshift uses the compatible PostgreSQL driver. With that in mind, please replace “jdbc:redshift://” with “jdbc:postgresql://” in the Amazon Redshift JDBC URL you add to configuration.properties.

Now we’re ready to run our application! In order to do so, let’s go back into the sample source code and take a look at the shell script that we will use to simplify the final execution steps. This script has two primary functions. The first is to compile the sample application and push it to your S3 bucket, along with a data file that we will join against the Amazon Kinesis stream.

# clean and compile the application
gradle clean fatjar
# create the bucket or delete the contents if it exists
aws s3 mb s3://$BUCKET || aws s3 rm s3://$BUCKET --recursive
# push built jar file to S3
aws s3 cp $BUILD/$NAME s3://$BUCKET/$NAME
# push data file to S3 - we will join this file against the Kinesis stream
aws s3 cp $DATA_DIR s3://$BUCKET/$DATA_DIR

The second function of the script launches an EMR cluster and submits our Cascading application (now located in your S3 bucket) as a step to be run on the cluster.

Disclaimer: For scheduled, operational, production systems it is recommended that the following action be wrapped in an AWS Data Pipeline definition. In this tutorial, for simplicity, we will be using the CLI to create a small cluster for demonstrative purposes. After the sample data has been processed, or if the Amazon Kinesis stream is empty, this cluster will terminate automatically.

aws emr create-cluster 
  --ami-version 3.8.0 
  --instance-type m1.xlarge 
  --instance-count 1 
  --name "cascading-kinesis-example" 
  --visible-to-all-users 
  --enable-debugging 
  --auto-terminate 			
  --no-termination-protected 
  --log-uri s3n://$BUCKET/logs/ 
  --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole 
  --steps Type=CUSTOM_JAR,Name=KinesisTest1,ActionOnFailure=TERMINATE_CLUSTER,Jar=s3n://$BUCKET/$NAME,Args=$BUCKET

To run the sample Cascading application simply call this script and provide the following arguments:

  1. S3_BUCKET                          // S3 bucket to hold application and data

For the auto-compile to work you will need to call this script from the following directory “/[PATH_TO}/tutorials/cascading-aws/part4/”. For example:

$ cd /[PATH_TO}/tutorials/cascading-aws/part4/
$ src/main/scripts/cascading-kinesis.sh 

After calling cascading-kinesis.sh, you will see the Cascading application compile, then the application jar and the data file that we will join against the Amazon Kinesis stream will be transferred to S3. After that, an EMR cluster is created and our Cascading application jar is added as a step. You can verify that this cluster is booting up by visiting the EMR console.  When the application has completed you will find the final output in an Amazon Redshift table named CascadingResults.

If you are already using Driven, you will also see the application appear on the landing page after logging in. With Driven, you can visualize the status, progress and behavior of your applications in real-time, as well as over time.

Driven lets developers build higher quality data applications and gives operators the ability to efficiently optimize and monitor these applications. If you’d like to explore Driven, you can take a tour or visit Driven’s website.

There you have it. With Cascading we now have a micro-batch system that sources streaming, real-time data from Amazon Kinesis, joins it with data in S3, processes this joined stream on EMR, loads the results into Amazon Redshift, and monitors the entire application lifecycle with Driven.

If you have questions or suggestions, please leave a comment below.

—————————–

Related:

Indexing Common Crawl Metadata on Amazon EMR Using Cascading and Elasticsearch