Build a data lake using Amazon Kinesis Data Streams for Amazon DynamoDB and Apache Hudi
Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and online order transaction data to develop customer order fulfillment applications, improve customer satisfaction, and get insights into sales revenue to create a promotional offer for the customer. It’s essential to store these data points in a centralized data lake, which can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.
A popular use case in order management is receiving, tracking, and fulfilling customer orders. The order management process begins when an order is placed and ends when the customer receives their package. When storing high-velocity order transaction data in DynamoDB, you can use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3).
Amazon Kinesis Data Streams for DynamoDB helps you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon OpenSearch Service , Amazon Redshift, or Amazon S3.
In this post, you use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other Kinesis Data Stream by simply enabling Kinesis streaming connection from Amazon DynamoDB console. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, Kinesis Data Analytics for Apache Flink, and Kinesis Data Firehose. In this post, you use Kinesis Data Firehose to save the raw data in the S3 data lake and Apache Hudi to batch process the data.
The following diagram illustrates the order processing system architecture.
In this architecture, users buy products in online retail shops and internally create an order transaction stored in DynamoDB. The order transaction data is ingested to the data lake and stored in the raw data layer. To achieve this, you enable Kinesis Data Streams for DynamoDB and use Kinesis Data Firehose to store data in Amazon S3. You use Lambda to transform the data from the delivery stream to remove unwanted data and finally store it in Parquet format. Next, you batch process the raw data and store it back in the Hudi dataset in the S3 data lake. You can then use Amazon Athena to do sales analysis.
Complete the following steps to create AWS resources to build a data pipeline as mentioned in the architecture. For this post, we use the AWS Region us-west-1.
- On the Amazon Elastic Compute Cloud (Amazon EC2) console, create a keypair.
- Download the data files, Amazon EMR cluster, and Athena DDL code from GitHub.
- Deploy the necessary Amazon resources using the provided AWS CloudFormation template.
- For Stack name, enter a stack name of your choice.
- For Keypair name, choose a key pair.
A key pair is required to connect to the EMR cluster nodes. For more information, see Use an Amazon EC2 Key Pair for SSH Credentials.
- Keep the remaining default parameters.
- Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.
For more information about IAM, see Resources to learn more about IAM.
- Choose Create stack.
You can check the Resources tab for the stack after the stack is created.
The following table summarizes the resources that you created, which you use to build the data pipeline and analysis.
|Logical ID||Physical ID||Type|
Enable Kinesis streaming for DynamoDB
AWS recently launched Kinesis Data Streams for DynamoDB so you can send data from DynamoDB to Kinesis data streams. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.
To enable this feature from the console, complete the following steps:
- On the DynamoDB console, choose the table you created in the CloudFormation stack earlier (it begins with the prefix
- On the Overview tab, choose Manage streaming to Kinesis.
- Choose your input stream (it starts with
- Choose Enable.
- Choose Close.
- Make sure that stream enabled is set to Yes.
Populate the sales order transaction dataset
To replicate a real-life use case, you need an online retail application. For this post, you upload raw data files in the S3 bucket and use a Lambda function to upload the data in DynamoDB. You can download the order data CSV files from the AWS Sample GitHub repository. Complete the following steps to upload the data in DynamoDB:
- On the Amazon S3 console, choose the bucket
- Choose Upload.
- Choose Add files.
- Choose the
- Choose Upload.
- On the Lambda console, choose the function
- Choose Test.
- For Event template, enter an event name.
- Choose Create.
- Choose Test.
This runs the Lambda function and loads the CSV file
order_data_09_02_2020.csv to the DynamoDB table.
- Wait until the message appears that the function ran successfully.
You can now view the data on the DynamoDB console, in the details page for your table.
Because you enabled the Kinesis data stream in the DynamoDB table, it starts streaming the data to Amazon S3. You can check the data by viewing the bucket on the Amazon S3 console. The following screenshot shows that a Parquet file is under the prefix in the bucket.
Use Apache Hudi with Amazon EMR
Now it’s time to process the streaming data using Hudi.
You can use the key pair you chose in the security options to SSH into the leader node.
- Use the following bash command to start the Spark shell to use it with Apache Hudi:
The Amazon EMR instance looks like the following screenshot.
- You can use the following Scala code to import the order transaction data from the S3 data lake to a Hudi dataset using the copy-on-write storage type. Change inputDataPath as per file path in
<stack-name>-raws3bucket-*in your environment, and replace the bucket name in
For more information about
DataSourceWriteOptions, see Work with a Hudi Dataset.
- In the Spark shell, you can now count the total number of records in the Apache Hudi dataset:
You can check the processed Apache Hudi dataset in the S3 data lake via the Amazon S3 console. The following screenshot shows the prefix
order_hudi_cow is in
When navigating into the
order_hudi_cow prefix, you can find a list of Hudi datasets that are partitioned using the
transaction_date key—one for each date in our dataset.
Let’s analyze the data stored in Amazon S3 using Athena.
Analyze the data with Athena
To analyze your data, complete the following steps:
- On the Athena console, create the database
order_dbusing the following command:
You use this database to create all the Athena tables.
- Create your table using the following command (replace the S3 bucket name with
<stack-name>- processeds3bucket*created in your environment):
- Add partitions by running the following query on the Athena console:
- Check the total number of records in the Hudi dataset with the following query:
It should return a single row with a count of 1,000.
Now check the record that you want to update.
- Run the following query on the Athena console:
The output should look like the following screenshot. Note down the value of product and amount.
Analyze the change data capture
Now let’s test the change data capture (CDC) in streaming. Let’s take an example where the customer changed an existing order. We load the
order_data_10_02_2020.csv file, where
order_id 3801 has a different product and amount.
To test the CDC feature, complete the following steps:
- On the Lambda console, choose the stack
- In the Environment variables section, choose Edit.
- For key, enter
- Choose Save.
You can see another prefix has been created in
- In Amazon EMR, run the following code in the Scala shell prompt to update the data (change
inputDataPathto the file path in
- Run the following query on the Athena console to check for the change to the total number of records as 1,000:
- Run the following query on the Athena console to test for the update:
The following screenshot shows that the product and amount values for the same order are updated.
In a production workload, you can trigger the updates on a schedule or by S3 modification events. A fully automated data lake makes sure your business analysts are always viewing the latest available data.
Clean up the resources
To avoid incurring future charges, follow these steps to remove the example resources:
- Delete the resources you created earlier in the pre-requisite section by deleting the stack instances from your stack set, if you created the EMR cluster with the CloudFormation template,.
- Stop the cluster via the Amazon EMR console, if you launched the EMR cluster manually.
- Empty all the relevant buckets via the Amazon S3 console.
You can build an end-to-end data lake to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. It allows your team to focus on solving business problems by getting useful insights immediately. Application developers have various use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.
If this post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!
About the Authors
Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to guide enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.
Saurabh Shrivastava is a solutions architect leader and analytics/ML specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.
Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on data analytics, AI/ML, and DevOps.