Set up streaming ETL pipelines

In this tutorial, you will learn how to create an Amazon Kinesis Data Analytics for Apache Flink application with Amazon Kinesis Data Streams as a source and a Amazon S3 bucket as a sink.
 
Random data is ingested using Amazon Kinesis Data Generator. The Apache Flink application code performs a word count on the streaming random data using a tumbling window of 5 minutes. The generated word count is then stored in the specified Amazon S3 bucket. Amazon Athena is used to query data generated in the Amazon S3 bucket to validate the end results.

In this tutorial you will learn how to:

  • Create an Amazon Kinesis Data Stream
  • Set up an Amazon Kinesis Data Generator
  • Send sample data to a Kinesis Data Stream
  • Create an Amazon S3 bucket
  • Download code for a Kinesis Data Analytics application
  • Modify application code
  • Compile application code
  • Upload Apache Flink Streaming Java code to S3
  • Create, configure, and launch a Kinesis Data Analytics application
  • Verify results
  • Clean up resources
About this Tutorial
Time 1 hour                                        
Cost Less than $17 (assuming all services
 are running for two hours)
Use Case Internet of Things, Analytics
Products Amazon KDG, Amazon Kinesis Data Analytics
Audience Developers
Level Intermediate
Last Updated November 16, 2021

Step 1: Create an Amazon Kinesis Data Stream

In this step, you will create a Kinesis Data Stream with one shard.

1.1 — Open the Amazon Kinesis services console and choose the Region where you want to create the Kinesis Data Stream resource.

Amazon Kinesis Services console

1.2 — In the Getting Started section, choose Kinesis Data Streams, and select Create data stream.

Choose create data stream

1.3 —On the Create a data stream page, in the Data stream configuration section, enter a name for your Data stream.

Enter Name for data stream

1.4 – In the Data stream capacity section, choose 1 for the number of open shards, and then select Create data stream.

A single shard can ingest up to 1 MB of data per second (including partition keys) or 1,000 records per second for writes. If your application requires higher ingestion capabilities you can increase the shard count accordingly. The write and read capabilities of the entire stream gets updated dynamically in the Kinesis console under the Total data stream capacity. For more information, see Quotas and Limits.  

1.5 — In the Stream details section, verify that the Status updates to Active.

active state

Step 2: Set up Amazon Kinesis Data Generator

In this step, you will configure Amazon Kinesis Data Generator (KDG) to send sample data to a Kinesis Data Stream.

2.1 — Open the Amazon Kinesis Data Generator home page and select Create a Cognito User with CloudFormation. You will be redirected to AWS CloudFormation console.

Kinesis Data Generator home page

2.2 — On the Create stack page, in the Specify template section, verify that there is a valid HTTPS path for the Amazon S3 URL, and then choose Next.

2.3 — On the Specify stack details page, enter a name for the Stack.

2.4 —In the Parameters section, enter a Username and Password that will be used to log in to the KDG console. Then, choose Next.

2.5 — On the Configure stack options page, keep the default settings, and choose Next.

2.6 —On the Review page, in the Capabilities section, select the checkbox I acknowledge that AWS CloudFormation might create IAM resources, and then choose Create stack.

2.7 — On the Stack details page, in the Stacks section, verify that the status shows CREATE_COMPLETE. Then, choose the Outputs tab, under Value copy the URL.

To view the resources deployed by the AWS CloudFormation stack choose the Resources tab.

2.8— Paste the copied URL in a web browser, enter the Username and Password created in Step 2.4, and choose Sign In.

2.9 — On the KDG home page, in Region drop-down, choose the Region you where you created the Data Stream in Step 1.1. Then, the created Stream/delivery stream should automatically populate.

Step 3: Send sample data to an Amazon Kinesis Data Stream

The Amazon Kinesis Data Generator generates records using random data based on a template you provide. The KDG extends faker.js, an open source random data generator.

In this step, you will use the KDG console to send sample data using a sample template using the faker.js documentation to the previously created Kinesis Data Stream created at one record per second rate and sustain the ingestion until the end of this tutorial to achieve reasonable data for analysis while remainder of steps are performed.

3.1 — On the KDG home page, perform the following actions:

  • For Records per second, choose the Constant tab, and change the value to 1.
  • For Record template, choose the Template 1 tab, and paste the following code sample into the text box.

"{{random.arrayElement([
    "Hi",
    "Hello there",
    "How are you",
    "Hi how are you"
])}}"

3.2 — Then, choose Test Template.

test template

3.3 — Verify the structure of the sample JSON records and choose Close.

JSON structure

3.4 — Then, choose Send data.

Leave the KDG web page open to ensure sustained streaming of random records into the Kinesis Data Stream. The streaming will continue for the remainder of the steps to provide reasonable data for analysis.

send data
send data to kinesis

3.5 — Open the Amazon Kinesis Data streams console, in the Data streams section, choose the previously created Kinesis data stream.

choose data stream

3.6 — Choose the Monitoring tab, in the Stream metrics section, find the Put records successful records -average (Percent) and Put records – sum (bytes) metrics to validate record ingestion.

metrics

Step 4: Create Amazon S3 buckets

In this step, you will create the following Amazon Simple Storage Service (Amazon S3) buckets.

  • kda-word-count-ka-app-code-location-<unique-name> to store the Amazon Kinesis Data Analytics code
  • kda-word-count-tutorial-bucket-<unique-name> to store the output word count of Amazon Kinesis Data Analytics application

Note: Amazon S3 bucket names are required to be globally unique. When creating the S3 bucket, you must add a unique name at the end.

4.1 — Open the Amazon S3 console and choose Create Bucket.

create bucket

4.2 — On the Create bucket page, in the General configuration section, enter the following details.

  • For Bucket Name, type kda-word-count-ka-app-code-location-<unique-name>
  • For AWS Region, select the AWS Region that was chosen in the previous steps.

Then, keep the default settings, and choose Create Bucket.


name environment

4.3 — In the Buckets section, choose Create bucket.

choose create bucket

4.4 — On the Create bucket page, in the General configuration section, enter the following details.

  • For Bucket Name, type kda-word-count-tutorial-bucket-<unique-name>
  • For AWS Region, select the AWS Region that was selected in the previous steps.

Then, keep the default settings, and choose Create Bucket.

 

create bucket

Step 5: Download the code for the Amazon Kinesis Data Analytics application

In this step, you will download example Java application code available in GitHub that is used for the Kinesis Data Analytics application.

This step requires you to have Git client installed. For more information, see Installing Git.

5.1 — In a terminal window, run the following command to clone the remote repository.

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

5.2 — Then, run the following command to navigate to the directory.

cd amazon-kinesis-data-analytics-java-examples/S3Sink/src/main/java/com/amazonaws/services/kinesisanalytics  

5.3 — The code in S3StreamingSinkJob.java performs the word count at following excerpts :

input.flatMap(new Tokenizer()) 			// Tokenizer for generating words
    	.keyBy(0) 					// Logically partition the stream for each word
    	.timeWindow(Time.minutes(1)) 			// Tumbling window definition
    	.sum(1) 					// Sum the number of words per partition
    	.map(value -> value.f0 + " count: " + value.f1.toString() + "\n")
    	.addSink(createS3SinkFromStaticConfig());


public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

The incoming stream of records from Kinesis Data Stream, converts each record to lowercase and splits the record body at tab or spaces. For each word retrieved after the split, it creates a two dimensional tuple with word string and its count.

For example :

“Hi” becomes (Hi,1)
“Hi how are you” becomes (Hi,1) (how,1) (are, 1) (you,1)

Then, it partitions the stream of tuples using the first key (i.e. the word string) and performs a sum on second key (i.e. the count) for each partition under a tumbling window of one minute.

In the previous example, if no new records come up in the stream after 1 minute then the output of sum method would be :

(Hi,2) (how,1) (are, 1) (you,1)

Then, a map is created using the sum output and send it to S3Sink. This means that if the record ingestion is sustained then a file in the kda-word-count-tutorial-bucket-<unique-name> S3 bucket is created every minute with new-line separated character strings of following format:

Hi count: 824 

how count: 124 

are count: 210 

you count: 100 

Step 6: Modify the application code

In this step, you will modify the application code to point to the resources created in previous steps and also make the reads start from the very beginning of untrimmed records in Kinesis Data stream.

Then, you update the time window to 5 minutes and modify the output content within S3 files to comma-delimited new-line character separated strings with <word,count,window time> schema.

6.1 — Run the following command (or any other file editor) to modify the S3StreamingSinkJob.java file.

sudo vi S3StreamingSinkJob.java

6.2 —  Then, modify the following variables:

  • For region, type the region that was used in previous steps.
  • For inputStreamName, type the name of the previously created Stream/delivery stream in Step 2.9.
  • For s3SinkPath, type the string "s3a://bucket-placeholder/wordCount” where bucket-palceholder is the name of S3 bucket created in Step 4.4.
  • For the STREAM_INITIAL_POSITION, type TRIM_HORIZON

Note :

The STREAM_INITIAL_POSITION value is set to TRIM_HORIZON, which enables the Kinesis Data Analytics application to read from the oldest untrimmed data record. This will ensure all records ingested in Kinesis Data stream since the setup of KDG in earlier steps are read by the application and stored in the kda-word-count-tutorial-bucket-<unique-name> S3 bucket.

name environment

6.3 — In the S3StreamingSinkJob.java file, replace line 52 with the following value.

.map(value -> value.f0 +","+ value.f1.toString() +","+ Timestamp.from(Instant.now()).toString())

6.4 — Replace line 50 with the following value.

.timeWindow(Time.minutes(5)) // Tumbling window definition

6.5 — Then, add the following lines of code after line 16.

import java.sql.Timestamp;
import java.time.Instant;

Step 7: Compile the application code

In the step, you will compile the application code to create a  .jar file of the modified code. This .jar is a required input for Kinesis Data Analytics.

7.1 — In a terminal, run the following command to navigate to the S3Sink directory.

cd amazon-kinesis-data-analytics-java-examples/S3Sink/ 

7.2 —  Run the following command to compile the application code.

mvn package -Dflink.version=1.11.1
Once completed, you should see a BUILD SUCCESS message.

The previous command creates a .jar file of the application at target/aws-kinesis-analytics-java-apps-1.0.jar.

build-success
application jar

In the step, you will upload the Apache Flink Streaming Java Code to the kda-word-count-ka-app-code-location-<unique-name> S3 bucket.

8.1 — Navigate to the Amazon S3 console and choose the kda-word-count-ka-app-code-location-unique-name> bucket.

cd amazon-kinesis-data-analytics-java-examples/S3Sink/ 
choose bucket

8.2 —  On the kda-word-count-ka-app-code-location-<bucket-name> page, choose the Objects tab, and choose Upload.

upload

8.3 — On the Upload page, in the Files and folders section, choose Add files.

add files

8.4 —  Select the aws-kinesis-analytics-java-apps-1.0.jar file that was downloaded in in Step 7.2.

upload

8.5 — Then, select Upload.

upload

Step 9: Create, configure, and start the Kinesis Data Analytics application

In the step, you will create, configure, and execute the Kinesis Data Analytics application.

9.1 — Navigate to the Kinesis Data Analytics console, on the Analytics applications page, choose Create application.

create application

9.2 —  On the Create application page, in the choose a method to set up the application section, select Apache Flink- Streaming application.

apache flink

9.3 — In the Application configuration section, for Application name, type kda-word-count. Then, choose Create/update IAM role <kinesis-analytics-kda-word-count-region> with required policies.

policy resources

9.4 —  In the Template for application settings section, in the Templates section, choose Development. Then, choose Create application.

create application

9.5 — On the kda-word-count page, choose Configure.

configure

9.6 —  On the configure kda-word-count page, in the Application code location section make the following changes:

  • For Amazon S3 bucket, choose Browse.
  • Then choose the kda-word-count-kda-app-code-location-<unique-name> bucket, and select Choose
create application
choose bucket
  • For Path to S3 object, type aws-kinesis-analytics-java-apps-1.0.jar to upload the custom code to Kinesis Data Analytics application.
  • Then, keep the default options, and select Save changes.
save changes

9.7 — On the kda-word-count page, in the Application details section, choose the IAM role hyperlink.

choose iam role

9.8 —  In the IAM console for the kinesis-analytics-kda-word-count-region, in the Summary section, choose the Permissions tab. Then, choose the kinesis-analytics-kda-word-count-region policy.

choose policy

9.9 — In the IAM console for Policies, in the Summary section, choose the Permissions tab, and choose Edit policy.

edit policy

9.10 — On the Edit kinesis-analytics-service-kda-word-count-region page, choose the JSON tab, and paste the following code to the end of the policy.

Then, make the following changes:

  • For “Resource”: replace with your Region and Account ID. (For example: "Resource": "arn:aws:kinesis:ap-south-1:012345678901:stream/kda-word-count-tutorial".)
  • For “Resource”: replace the s3 bucket names with the names of your previously created buckets. 

For example:

  • "arn:aws:s3:::kda-word-count-tutorial-bucket-<unique-name>",
  • "arn:aws:s3:::kda-word-count-tutorial-bucket-<unique-name>/*"
  • "arn:aws:s3:::kda-word-count-kda-app-code-location-<unique-name>"
  • "arn:aws:s3:::kda-word-count-kda-app-code-location-<unique-name>/*"
{
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:ap-south-1:012345678901:stream/kda-word-count-tutorial"
        	},
        	{
            "Sid": "WriteObjects",
            "Effect": "Allow",
            "Action": [
                "s3:Abort*",
                "s3:DeleteObject*",
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::kda-word-count-tutorial-bucket",
                "arn:aws:s3:::kda-word-count-tutorial-bucket/*",
                "arn:aws:s3:::kda-word-count-kda-app-code-location",
                "arn:aws:s3:::kda-word-count-kda-app-code-location/*"
            ]
        }

Then, choose Review policy.

choose policy

9.11 — Navigate to Kinesis Data Analytics application console kda-word-count page, and choose Run.

run

9.12 — Then, choose Run without snapshot and choose Run.

choose policy

9.13 — Once the application starts successfully, the status shows as Running. Choose Open Apache Flink dashboard.

apache flink

9.14 — In the Apache Flink Dashboard, from the left hand navigation, select Running Jobs. Then, for Status verify the Flink S3 Streaming Sink Job is Running.

choose policy

Step 10: Verify results

Random data ingestion from KDG into Kinesis Data Stream is sustained at one record per second and the Kinesis Data Analytics Application performs word counts on streaming records using a window of 5 minutes, so you should expect to see file creation in Amazon S3 every 5 minutes.

10.1 — Navigate to the Amazon S3 console, in the Bucket section, select the kda-word-count-tutorial bucket.

choose kda-word-count-tutroial

10.2 —  On the kda-word-count-tutorial page, select the Objects tab, in the Last modified section, you should see a 5 minute difference between the creation dates of the two files.

five minute difference

10.3 — Navigate to Amazon Athena console and select Explore the query editor.

query editor

10.4 —  Select the Editor tab, in the Data section, for Data Source select, AwsDataCatalog from the drop down. Then, for Database, select default from the drop down.

10.5 — In the Query tab, paste the following query to create an external table for data generated by Kinesis data analytics application:

Note: Replace the LOCATION S3 bucket with the bucket created in Step 4.4.

Then, choose Run

CREATE EXTERNAL TABLE kda_word_count_table (
         word STRING,
         count INT,
         window_time TIMESTAMP 
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://kda-word-count-tutorial-bucket/wordCount/';
new query

10.6 — In a new Query editor tab, paste the following query to the list the word count of all available windows. Then, choose Run.

SELECT *
FROM "default"."kda_word_count_table"
order by window_time desc, count desc; 
configure

10.7—   In a new Query editor tab, paste the following query to find the total count of words util a defined point. 

Modify the fourth line of the query to add in a relevant timestamp, based on your use case. Then, choose Run.

SELECT word,
         sum(count) AS total_count
FROM "default"."kda_word_count_table"
WHERE window_time > TIMESTAMP '2021-07-xx xx:xx’
GROUP BY  word
ORDER BY  total_count desc;
create application

Step 11: Clean up resources

In this step, you will delete the resources you used in this tutorial.

Important: Deleting resources that are not actively being used reduces costs and is a best practice. Not deleting your resources will result in charges to your account.

11.1 — In the opened Kinesis Data Generator web page used in Step 3.4, select Stop Sending Data to Kinesis.

stop sending data

11.2 —  In the AWS CloudFormation console, select the Kinesis-Data-Generator-Cogito-User stack, and select Delete. Then, when prompted to confirm, select Delete stack.

delete cfn stack
confirm delete stack

11.3 — In the Amazon Kinesis Data Streams console, select the kda-word-count-tutorial stream, and select Delete. Then, when prompted to confirm, type delete in the text box, select Delete.

delete word count
confirm delete

11.4 —  In the Amazon Kinesis Data Analytics applications console, select the kda-word-count graph. Then, select Actions, and select Delete

delete graph

11.5 —  In the Amazon S3 console, select the kda-word-count-ka-app-code-location-<unique-name> and kda-word-count-tutorial-bucket-<unique-name> buckets. Then, select Delete

delete buckets

11.6 — In the Amazon Athena console, select the ellipsis next to the kda_word_count_table that you created in Step 10.5, and select Delete table. When promoted to confirm, select Yes.

delete table

Congratulations

You have created a streaming pipeline and performed ETL on top of it using Amazon Kinesis ecosystem services. Kinesis Data Analytics for Apache Flink enables you to author and run code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

Was this tutorial helpful?

Introduction to Amazon Kinesis Data Analytics for Java Applications

The support for Java programming in Amazon Kinesis Data Analytics helps you solve challenges, and this course will show you how. You’ll also learn how the SDKs are supported through Apache Flink libraries and see how it works in real-world use cases. Learn more.

Learn how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead.

Learn how to enhance your ETL pipeline

You can further enhance this pipeline by persisting data in S3 using Parquet format under dedicated meaningful partitions to reduce your scan costs of Athena query.