AWS Big Data Blog

Sink Amazon Kinesis Data Analytics Apache Flink output to Amazon Keyspaces using Apache Cassandra Connector

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

Amazon Keyspaces (for Apache Cassandra) is a scalable, highly available, and managed Apache Cassandra–compatible database service. With Amazon Keyspaces you don’t have to provision, patch, or manage servers, and you don’t have to install, maintain, or operate software. Amazon Keyspaces is serverless, so you only pay for the resources that you use. You can use Amazon Keyspaces to store large volumes of data, such as entries in a log file or the message history for a chat application as Amazon Keyspaces tables scale in response to actual application traffic, with virtually unlimited throughput and storage. You can also use Amazon Keyspaces to store information about devices for Internet of Things (IoT) applications or player profiles for games.

A popular use case in the wind energy sector is to protect wind turbines from wind speed. Engineers and analysts often want to see real-time aggregated wind turbine speed data to analyze the current situation out in the field. Furthermore, they need access to historical aggregated wind turbine speed data to build machine learning (ML) models which can help them take preventative actions on wind turbines. Customers often ingest high-velocity IoT data into Amazon Kinesis Data Streams and use Amazon Kinesis Data Analytics, AWS Lambda, or Amazon Kinesis Client Library (KCL) applications to aggregate IoT data in real-time and store it in Amazon Keyspaces, Amazon DynamoDB, or Amazon Timestream.

In this post, we demonstrate how to aggregate sensor data using Amazon Kinesis Data Analytics and persist aggregated sensor data in to Amazon Keyspaces using Apache Flink’s Apache Cassandra Connector.

Architecture

BDB-2063-kda-keyspaces-architecture

In the architecture diagram above, Lambda simulates wind speed sensor data and ingests sensor data into Amazon Kinesis Data Stream. Amazon Kinesis Data Analytics Apache Flink application reads wind speed sensor data from Amazon Kinesis Data Stream in real-time and aggregates wind speed sensor data using a five minutes tumbling window and storing aggregated wind speed sensor data into Amazon Keyspaces table. Aggregated wind speed sensor data stored in Amazon Keyspaces can be used by engineers and analysts to review real-time dashboards or to perform historical analysis on specific wind turbine.

Deploying resources using AWS CloudFormation

After you sign in to your AWS account, launch the AWS CloudFormation template by choosing Launch Stack:

BDB-2063-launch-cloudformation-stack

The CloudFormation template configures the following resources in your account:

  • One Lambda function which simulates wind turbine data
  • One Amazon Kinesis Data Stream
  • One Amazon Kinesis Data Analytics Apache Flink application
  • An AWS Identity and Access Management (IAM) role (service execution role) for Amazon Kinesis Data Analytics Apache Flink application
  • One Amazon Keyspaces Table: turbine_aggregated_sensor_data

After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Streaming applications tab, where you can see the Streaming application in the ready status. Select the Streaming application, choose Run, and wait until the Streaming application is in running status. It can take a couple of minutes for the Streaming application to get into running status.

Now that we have deployed all of the resources using CloudFormation template, let’s review deployed resources and how they function.

Format of wind speed sensor data

Lambda simulates wind turbine speed data every one minute and ingests it into Amazon Kinesis Data Stream. Each wind turbine sensor data message consists of two attributes: turbineId and speed.

{
  "turbineId": "turbine-0001",
  "speed": 60
}

Schema of destination Amazon Keyspaces table

We’ll store aggregated sensor data in to destination turbine_aggregated_sensor_data Amazon Keyspaces table. turbine_aggregated_sensor_data table has on-demand capacity mode enabled. Amazon Keyspaces (for Apache Cassandra) on-demand capacity mode is a flexible billing option capable of serving thousands of requests per second without capacity planning. This option offers pay-per-request pricing for read and write requests so that you pay only for what you use. When you choose on-demand mode, Amazon Keyspaces can scale the throughput capacity for your table up to any previously reached traffic level instantly, and then back down when application traffic decreases. If a workload’s traffic level hits a new peak, then the service adapts rapidly to increase throughput capacity for your table.

BDB-2063-keyspaces-table BDB-2063-keyspaces-table-def-1 BDB-2063-keyspaces-table-def-2

Apache Flink code to aggregate and persist data in Amazon Keyspaces Table

Apache Flink source code used by this post can be found on the KeyspacesSink section of Kinesis Data Analytics Java Examples public git repository.

The following code snippet demonstrates how incoming wind turbine messages are getting aggregated using a five-minute tumbling window and produces a DataStream of TurbineAggregatedRecord records.

DataStream<TurbineAggregatedRecord> result = input
.map(new WindTurbineInputMap())
.keyBy(t -> t.turbineId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(new AggregateReducer())
.map(new AggregateMap());

The following code snippet demonstrates how Amazon Keyspaces table name and column names are annotated on the TurbineAggregatedRecord class.

@Table(keyspace = "sensor_data", name = "turbine_aggregated_sensor_data", readConsistency = "LOCAL_QUORUM", writeConsistency = "LOCAL_QUORUM")
public class TurbineAggregatedRecord {

@Column(name = "turbineid")
@PartitionKey(0)
private String turbineid = "";

@Column(name = "reported_time")
private long reported_time = 0;

@Column(name = "max_speed")
private long max_speed = 0;

@Column(name = "min_speed")
private long min_speed = 0;

@Column(name = "avg_speed")
private long avg_speed = 0;

The following code snippet demonstrates the implementation of Apache Cassandra Connector to sink aggregated wind speed sensor data TurbineAggregatedRecord into Amazon Keyspaces table. We’re using SigV4AuthProvider with Apache Cassandra Connector. The SigV4 authentication plugin lets you use IAM credentials for users or roles when connecting to Amazon Keyspaces. Instead of requiring a user name and password, this plugin signs API requests using access keys.

CassandraSink.addSink(result)
                .setClusterBuilder(
                        new ClusterBuilder() {

                            private static final long serialVersionUID = 2793938419775311824L;

                            @Override
                            public Cluster buildCluster(Cluster.Builder builder) {
                                return builder
                                        .addContactPoint("cassandra."+ region +".amazonaws.com")
                                        .withPort(9142)
                                        .withSSL()
                                        .withAuthProvider(new SigV4AuthProvider(region))
                                        .withLoadBalancingPolicy(
                                                DCAwareRoundRobinPolicy
                                                        .builder()
                                                        .withLocalDc(region)
                                                        .build())
                                        .withQueryOptions(queryOptions)
                                        .build();
                            }
                        })
                .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
                .setDefaultKeyspace("sensor_data")
                .build();

Review output in Amazon Keyspaces Table

Once Amazon Kinesis Data Analytics Apache Flink application aggregates wind turbine sensor data and persists aggregated data in Amazon Keyspaces table, we can query and review aggregated data using Amazon Keyspaces CQL editor as illustrated in the following.

select * from sensor_data.turbine_aggregated_sensor_data

BDB-2063-cql-editor BDB-2063-cql-editor-result

Clean up

To avoid incurring future charges, complete the following steps:

  1. Empty Amazon S3 bucket created by AWS CloudFormation stack.
  2. Delete AWS CloudFormation stack.

Conclusion

As you’ve learned in this post, you can build Amazon Kinesis Data Analytics Apache Flink application to read sensor data from Amazon Kinesis Data Streams, perform aggregations, and persist aggregated sensor data in Amazon Keyspaces using Apache Cassandra Connector. There are several use cases in IoT and Application development to move data quickly through the analytics pipeline and persist data in Amazon Keyspaces.

We look forward to hearing from you about your experience. If you have questions or suggestions, please leave a comment.


About the Author

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customer’s AWS environments operationally healthy.