AWS Big Data Blog

Joining and Enriching Streaming Data on Amazon Kinesis

Are you trying to move away from a batch-based ETL pipeline? You might do this, for example, to get real-time insights into your streaming data, such as clickstream, financial transactions, sensor data, customer interactions, and so on.  If so, it’s possible that as soon as you get down to requirements, you realize your streaming data doesn’t have all of the fields you need for real-time processing, and you are really missing that JOIN keyword!

You might also have requirements to enrich the streaming data based on a static reference, a dynamic dataset, or even with another streaming data source.  How can you achieve this without sacrificing the velocity of the streaming data?

In this blog post, I provide three use cases and approaches for joining and enriching streaming data:

Joining streaming data with a relatively static dataset on Amazon S3 using Amazon Kinesis Analytics

In this use case, Amazon Kinesis Analytics can be used to define a reference data input on S3, and use S3 for enriching a streaming data source.

For example, bike share systems around the world can publish data files about available bikes and docks, at each station, in real time.  On bike-share system data feeds that follow the General Bikeshare Feed Specification (GBFS), there is a reference dataset that contains a static list of all stations, their capacities, and locations.

Let’s say you would like to enrich the bike-availability data (which changes throughout the day) with the bike station’s latitude and longitude (which is static) for downstream applications.  The architecture would look like this:

o_joiningenriching_1

To illustrate how you can use Amazon Kinesis Analytics to do this, follow these steps to set up an AWS CloudFormation stack, which will do the following:

  • Continuously produce sample bike-availability data onto an Amazon Kinesis stream (with, by default, a single shard).
  • Generate a sample S3 reference data file.
  • Create an Amazon Kinesis Analytics application that performs the join with the reference data file.

To create the CloudFormation stack

  1. Open the AWS CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-s3-reference-cfn-template.json
  1. For the stack name, type demo-data-enrichment-s3-reference-stack. Under the KinesisAnalyticsReferenceDataS3Bucket parameter, type the name of an S3 bucket in the US East (N. Virginia) region where the reference data will be uploaded.  This CloudFormation template will create a sample data file under KinesisAnalyticsReferenceDataS3Key. (Make sure the value of the KinesisAnalyticsReferenceDataS3Key parameter does not conflict with existing S3 objects in your bucket).You’ll also see parameters referencing an S3 bucket (LambdaKinesisAnalyticsApplicationCustomResourceCodeS3Bucket) and an associated S3 key. The S3 bucket and the S3 key represent the location of the AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources required to create an Amazon Kinesis application. Do not modify the values of these parameters.
  1. Follow the remaining steps in the Create Stack wizard (click “Next” button, and then the “Create” button). Wait until the status displayed for the stack is CREATE_COMPLETE.

You now have an Amazon Kinesis stream in your AWS account that has sample bike-availability streaming data which, if you followed the template, is named demo-data-enrichment-bike-availability-input-stream and an Amazon Kinesis Analytics application that performs the joining.

You might want to examine the reference data file generated under your bucket (its default name is demo_data_enrichment_s3_reference_data.json).  If you are using a JSON formatter/viewer, it will look like the following.  Note how the records are put together as top-level objects (no commas separating the records).

{
  "station_id": "s001",
  "name": "Liberty Ave",
  "lat": 40.6892,
  "lon": -74.0445
}
{
  "station_id": "s002",
  "name": "Empire St",
  "lat": 40.7484,
  "lon": -73.9857
}

In the CloudFormation template, the reference data has been added to the Amazon Kinesis application through the AddApplicationReferenceDataSource API method.

Next, open the Amazon Kinesis Analytics console and examine the application.  If you did not modify the default parameters, the application name should be demo-data-enrichment-s3-reference-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING. Choose Application details, and then go to SQL results. You should see a page similar to the following:

o_joiningenriching_2

In the Amazon Kinesis Analytics SQL code, an in-application stream (DESTINATION_SQL_STREAM) is created with five columns. Data is inserted into the stream using a pump (STREAM_PUMP) by joining the source stream (SOURCE_SQL_STREAM_001) and the reference S3 data file (REFERENCE_DATA) using the station_id field. For more information about streams and pumps, see In-Application Streams and Pumps in the Amazon Kinesis Analytics Developer Guide.

The joined (enriched) data fields – station_name, station_lat and station_lon – should appear on the Real-time analytics tab, DESTINATION_SQL_STREAM.

o_joiningenriching_3

The LEFT OUTER JOIN works just like the ANSI-standard SQL. When you use LEFT OUTER JOIN, the streaming records are preserved even if there is no matching station_id in the reference data.  You can delete LEFT OUTER (leaving only JOIN, which means INNER join), choose Save and then run SQL. Only the records with a matching station_id will appear in the output.

o_joiningenriching_4

You can write the results to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elasticsearch Service with an Amazon Kinesis Firehose delivery stream by adding a destination on the Destination tab.  For more information, see the Writing SQL on Streaming Data with Amazon Kinesis Analytics blog post.

Note: If you have not modified the default parameters in the CloudFormation template, S3 reference data source should have been added to your application.  If you are interested in adding S3 reference data source on your own streams, the following code snippet shows what it looks like in Python:

import boto3
kinesisanalytics_client = boto3.client('kinesisanalytics')

application_name = 'YOUR KINESIS ANALYTICS APPLICATION NAME'
reference_data_s3_bucket = 'YOUR REFERENCE DATA S3 BUCKET'
reference_data_s3_key = 'YOUR REFERENCE DATA S3 KEY' #example: folder/sub-folder/reference.json
reference_data_role_arn = 'YOUR IAM ROLE ARN' #with s3:GetObject permission on the reference data s3 key. Example: arn:aws:iam::123456789012:role/service-role/role-name

response = kinesisanalytics_client.describe_application(
    ApplicationName=application_name
)
application_version_id = response['ApplicationDetail']['ApplicationVersionId']

kinesisanalytics_client.add_application_reference_data_source(
    ApplicationName=application_name,
    CurrentApplicationVersionId=application_version_id,
    ReferenceDataSource={
        'TableName': 'REFERENCE_DATA',
        'S3ReferenceDataSource': {
            'BucketARN': 'arn:aws:s3:::%s' % reference_data_s3_bucket,
            'FileKey': reference_data_s3_key,
            'ReferenceRoleARN': reference_data_role_arn
        },
        'ReferenceSchema': {
            'RecordFormat': {
                'RecordFormatType': 'JSON',
                'MappingParameters': {
                    'JSONMappingParameters': {
                        'RecordRowPath': '$'
                    }
                }
            },
            'RecordEncoding': 'UTF-8',
            'RecordColumns': [
                {
                    'Name': 'station_id',
                    'Mapping': '$.station_id',
                    'SqlType': 'VARCHAR(4)'
                },
                {
                    'Name': 'name',
                    'Mapping': '$.name',
                    'SqlType': 'VARCHAR(64)'
                },
                {
                    'Name': 'lat',
                    'Mapping': '$.lat',
                    'SqlType': 'REAL'
                },
                {
                    'Name': 'lon',
                    'Mapping': '$.lon',
                    'SqlType': 'REAL'
                },
            ]
        }
    }
)

 

This data enrichment approach works for a relatively static dataset because it requires you to upload the entire reference dataset to S3 whenever there is a change.  This approach might not work for cases where the dataset changes too frequently to catch up with the streaming data.

If you change the reference data stored in the S3 bucket, you need to use the UpdateApplication operation (using the API or AWS CLI) to refresh the data in the Amazon Kinesis Analytics in-application table.  You can use the following Python script to refresh the data. Although it will not be covered in this blog post, you can automate data refresh by setting up Amazon S3 event notification with AWS Lambda.

#!/usr/bin/env python
import json,boto3

# Name of the Kinesis Analytics Application
KINESIS_ANALYTICS_APPLICATION_NAME='demo-data-enrichment-s3-reference-application'

# AWS region
AWS_REGION='us-east-1'


def main_handler():

    kinesisanalytics_client=boto3.client('kinesisanalytics',AWS_REGION)

    # retrieve the current application version id
    response_describe_application = kinesisanalytics_client.describe_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME
    )
    application_version_id=response_describe_application['ApplicationDetail']['ApplicationVersionId']

    # update the application
    response = kinesisanalytics_client.update_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME,
        CurrentApplicationVersionId=application_version_id,
        ApplicationUpdate={
        }
    )

    print("Done")


if __name__ == "__main__":
    main_handler()

 

To clean up resources created during this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose the application (demo-data-enrichment-s3-reference-application), choose Actions, and then choose Delete application.
  2. To delete the stack, open the AWS CloudFormation console, choose the stack (demo-data-enrichment-s3-reference-stack), choose Actions, and then choose Delete Stack.

The S3 reference data file should have been removed from your bucket, but your other data files should remain intact.

Enriching streaming data with a dynamic dataset using AWS Lambda and Amazon DynamoDB

In many cases, the data you want to enrich is not static. Imagine a case in which you are capturing user activities from a web or mobile application and have to enrich the activity data with frequently changing fields from a user database table.

A better approach is to store reference data in a way that can support random reads and writes efficiently.  Amazon DynamoDB is a fully managed non-relational database that can be used in this case.  AWS Lambda can be set up to automatically read batches of records from your Amazon Kinesis streams, which can perform data enrichment like looking up data from DynamoDB, and then produce the enriched data onto another stream.

If you have a stream of user activities and want to look up a user’s birth year from a DynamoDB table, the architecture will look like this:

o_joiningenriching_5

To set up a demo with this architecture, follow these steps to set up an AWS CloudFormation stack, which continuously produces sample user scores onto an Amazon Kinesis stream. An AWS Lambda function enriches the records with data from a DynamoDB table and produces the results onto another Amazon Kinesis stream.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-ddb-reference-cfn-template.json
  3. For the stack name, type demo-data-enrichment-ddb-reference-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream names or DynamoDB table name, you can accept the default values and choose Next.  The following steps are written with the assumption that you are using the default values.
  4. Complete the remaining steps in the Create Stack wizard. Wait until CREATE_COMPLETE is displayed for the stack status.This CloudFormation template creates three Lambda functions: one for setting up sample data in a DynamoDB table, one for producing sample records continuously, and one for data enrichment. The description for this last function is Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis.
  1. Open the Lambda console, select Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis, and then choose the Triggers
  2. If No records processed is displayed for Last result, wait for a few minutes, and then reload this page. If things are set up correctly, OK will be displayed for Last result.  This might take a few minutes.

o_joiningenriching_6

At this point, the Lambda function is performing the data enrichment and producing records onto an output stream. Lambda is commonly used for preprocessing the analytics app to handle more complicated data formats.

Although this data enrichment process doesn’t involve Amazon Kinesis Analytics, you can still use the Kinesis Analytics service to examine, or even process, the enriched records, by creating a new Kinesis Analytics application and connect it to demo-data-enrichment-user-activity-output-stream.

The records on the demo-data-enrichment-user-activity-output-stream will look like the following and will show the enriched birthyear field on the streaming data.

o_joiningenriching_7

You can use this birthyear value in your Amazon Kinesis Analytics application. Although it’s not covered in this blog post, you can aggregate by age groups and count the user activities.

You can review the code for the Lambda function on the Code tab. The code used here is for demonstration purpose only. You might want to modify and thoroughly test it before applying it to your use case.

Note the following:

  • The Lambda function receives records in batches (instead of one record per Lambda invocation). You can control this behavior through Batch size on the Triggers tab (in the preceding example, Batch size is set to 100). The batch size you specify is the maximum number of records that you want your Lambda function to receive per invocation.
  • The retrieval from the reference data source (in this case, DynamoDB) can be done in batch instead of record by record. This example uses the batch_get_item() API method.
  • Depending on how strict the record sequencing requirement is, the writing of results to the output stream can also be done in batch. This example uses the put_records() API method.

DynamoDB is not the only option.  What’s important is to have a data source that supports random data read (and write) at a high efficiency.  Because it’s a distributed database with built-in partitioning, DynamoDB is a good choice, but you can also use Amazon RDS as long as the data retrieval is fast enough (perhaps by having a proper index and by spreading the read workload over read replicas). You can also use an in-memory database like Amazon ElastiCache for Redis or Memcached.

To clean up the resources created for this demo

  1. If you created a Kinesis Analytics application: Open the Amazon Kinesis Analytics console, select your application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-enrichment-ddb-reference-stack, choose Actions, and then choose Delete Stack.

Joining multiple sets of streaming data using Amazon Kinesis Analytics

To illustrate this use case, let’s say you have two streams of temperature sensor data coming from a set of machines: one measuring the engine temperature and the other measuring the power supply temperature.  For the best prediction of machine failure, you’ve been told (perhaps by your data scientist) that the two temperature readings must be validated against a prediction model ─ not individually, but as a set.

Because this is streaming data, the time window for the data joining should be clearly defined.  In this example, the joining must occur on temperature readings within the same window (same minute) so that the temperature of the engine is matched against the temperature of the power supply in the same timeframe.

This data joining can be achieved with Amazon Kinesis Analytics, but has to follow a certain pattern.

First of all, an Amazon Kinesis Analytics application supports no more than one streaming data source. The different sets of streaming data have to be produced onto one Amazon Kinesis stream.

An Amazon Kinesis Analytics application can use this Amazon Kinesis stream as input and can process the data based on a certain field (in this example, sensor_location) into multiple in-application streams.

The joining can now occur on the two in-application streams.  In this example, the join fields are the machine_id and the one-minute tumbling window.

o_joiningenriching_8

The selection of the time field for the windowing criteria is a topic of its own.  For information, see Writing SQL on Streaming Data with Amazon Kinesis Analytics. It is important to get a well-aligned window for real-world applications.  In this example, the processing time (ROWTIME) will be used for the tumbling window calculation for simplicity.

Again, you can follow these steps to set up an AWS CloudFormation stack, which continuously produces two sets of sample sensor data onto a single Amazon Kinesis stream and creates an Amazon Kinesis Analytics application that performs the joining.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-joining-multiple-streams-cfn-template.json
  3. For the stack name, type demo-data-joining-multiple-streams-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream, you can accept the default values and choose Next. The following steps are written with the assumption that you are using the default values.
  4. You will also see parameters referencing an S3 bucket and S3 key. This is an AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources to create an Amazon Kinesis application.
  5. Complete the remaining steps in the Create Stack wizard. Wait until the status displayed for the stack is CREATE_COMPLETE.

Open the Amazon Kinesis Analytics console and examine the application.  If you have not modified the default parameters in the CloudFormation template, the application name should be demo-data-joining-multiple-streams-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING.

Choose Application details, and then go to SQL results.

You should see a page similar to the following:

o_joiningenriching_9

The SQL statements have been populated for you. The script first creates two in-application streams (for engine and power supply temperature readings, respectively).  DESTINATION_SQL_STREAM holds the joined results.

On the Real-time analytics tab, you’ll see the average temperature readings of the engine and power supply have been joined together using the machine_id and per-minute tumbling window.  The results can be written to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elastisearch Service with a Firehose delivery stream.

o_joiningenriching_10

To clean up resources created for this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose demo-data-joining-multiple-streams-application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-joining-multiple-streams-stack, choose Actions, and then choose Delete Stack.

Summary

In this blog post, I shared three approaches for joining and enriching streaming data on Amazon Kinesis Streams by using Amazon Kinesis Analytics, AWS Lambda, and Amazon DynamoDB.  I hope this information will be helpful in situations when your real-time analytics applications require additional data fields from reference data sources or the real-time insights must be derived from data across multiple streaming data sources.  These joining and enrichment techniques will help you can get the best business value from your data.

If you have a question or suggestion, please leave a comment below.


About the author

 

assaf_mentzer_90Assaf Mentzer is a Senior Consultant in Big Data & Analytics for AWS Professional Services. He works with enterprise customers to provide leadership on big data projects, helping them reach their full data potential in the cloud. In his spare time, he enjoys watching sports, playing ping pong, and hanging out with family and friends.

 

 


Related

 

Writing SQL on Streaming Data with Amazon Kinesis Analytics

writingsql_image1