The Internet of Things on AWS – Official Blog

Injecting data into AWS IoT Analytics from Amazon Kinesis Data Streams

It is easy to ingest device data directly from connected IoT devices as they are integrated with AWS IoT Core. However, AWS IoT Analytics also provides a public BatchPutMessage API action to allow you to inject data from other data resources.

In this post, I show how to inject data into AWS IoT Analytics from Amazon Kinesis Data Streams by using an AWS Lambda function.

 

AWS Services Used

This solution uses the following AWS services:

  • Amazon Kinesis Data Streams—Enables you to build custom applications that process or analyze streaming data for specialized needs. Please refer here for more information.
  • AWS Lambda—Lets you run code without provisioning or managing servers. Please refer here for more information.
  • AWS IoT Analytics—A fully managed service that takes care of collecting IoT data, transforming and cleansing the data, and storing the data. It allows you to do sophisticated analytics using machine learning.Please refer here for more information.

BatchPutMessage API action

BatchPutMessage is used to send a batch of messages to AWS IoT Analytics. BatchPutMessage has a hard limit of 100 messages in a given batch and the size of each message should be less than 128 Kb. This API action can be found in the public AWS SDK.

To send messages:

  • Create a collection of messages to send by specifying the message ID and payload for each message.
    • messageId—String length with a maximum of 128 characters, minimum of 1.
    • payload—ByteBuffer of a JSON byte array.
  • Create a BatchPutMessageRequest request with the messages that you created and the channel to which to send them.
  • Call the API by executing the BatchPutMessage method with your request. To send these messages to different channels, create multiple BatchPutMessageRequest with different channel names.

For information about the common exceptions, see the Errors section in the BatchPutMessage API topic.

Getting started

The solution involves the following tasks:

  1. Create a channel, pipeline, and data store in AWS IoT Analytics.
  2. Create a Lambda function to ingest data from a Kinesis stream.
  3. Call BatchPutMessage from the Lambda function to inject data into your AWS IoT Analytics channel.
  4. Monitor your data with Amazon CloudWatch Logs.
  5. Query your data in AWS IoT Analytics and visualize it with Amazon QuickSight.

Prerequisites

To follow this solution, you need these resources:

  • An AWS account
  • An AWS IoT Core instance
  • A Kinesis stream with IoT records in the following JSON payload:
{
    "sensor_uuid": "probe-8eeba36f",
    "ambient_temperature": "20.50",
    "radiation_level": "201",
    "humidity": "87.4040",
    "photosensor": "774.05",
    "timestamp": 1521241469
}

Walkthrough

In the AWS IoT Analytics console, use the Quick start feature to create a channel, pipeline, and data store automatically. Specify a prefix, such as my_.

  • Channels ingest data, back it up, and publish it to a pipeline.
  • Pipelines ingest data from one or more channels and allows you to process the data using activities before storing it in a data store.
  • Data stores store data. They are scalable and queryable.
  • Datasets retrieve data from a data store. They are the result of a SQL query run against the data store.

Next, create a Lambda function to process records from a Kinesis stream. Use the following Java code in your function to inject data into AWS IoT Analytics by calling the BatchPutMessage API. Replace my_channel with the name of your own channel.

package example.lambda.iot.data;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.iotanalytics.AWSIoTAnalytics;
import com.amazonaws.services.iotanalytics.AWSIoTAnalyticsClientBuilder;
import com.amazonaws.services.iotanalytics.model.BatchPutMessageRequest;
import com.amazonaws.services.iotanalytics.model.Message;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;

public class ProcessKinesisEvents implements RequestHandler<KinesisEvent, Void>{

    //Specify the service Region ID, for example, us-east-1
    private final static String REGION_NAME = "us-east-1";
    //specify your AWS IoT Analytics channel name, for example, "mychannel"
    private final static String CHANNEL_NAME = "my_channel";
   
    @Override
    public Void handleRequest(KinesisEvent event, Context context)
    {
        System.out.println("Start to put message");
        final AWSIoTAnalytics iotAnalyticsClient = AWSIoTAnalyticsClientBuilder.standard()
                .withClientConfiguration(new ClientConfiguration())
                .withRegion(Regions.fromName(REGION_NAME))
                .build();
        List<Message> messages = event.getRecords().stream()
                .map(rec -> {
                    ByteBuffer payLoad = rec.getKinesis().getData();
                    Message message = new Message();
                    message.withPayload(payLoad).withMessageId(rec.getEventID());
                    return message;
                })
                .collect(Collectors.toList());

        BatchPutMessageRequest batchRequest = new BatchPutMessageRequest();
        batchRequest.withChannelName(CHANNEL_NAME).withMessages(messages);
        iotAnalyticsClient.batchPutMessage(batchRequest);
        System.out.println(String.format("Successfully injected %s messages into IoT Analytics", messages.size()));
        return null;
    }

}

Checking logs in CloudWatch for the Lambda function

The Lambda function handler code uses System.out.println( ) to generate log entries in CloudWatch. In the CloudWatch console, choose Logs and select your Lambda function. You see all the logs generated by the function.

 

Querying your data

To query your data, always call the GetDatasetContent API action. It generates a signed URL linked to your new generated dataset in a CSV file. The file is available to you in S3. The following example [WJ1] shows how you can check whether your results are ready and then download the file.

aws iotanalytics create-dataset-content –dataset-name mydataset

aws iotanalytics describe-dataset –dataset-name mydataset

aws iotanalytics get-dataset-content –dataset-name mydataset

If your dataset contains any data, then the output from the get-dataset-content will look like this:

{
   "timestamp":1508189965.746,
   "entries":[
      {
         "dataURI":"https://aws-iot-analytics-datasets-f7253800-859a-472c-aa33-e23998b31261.s3.amazonaws.com/results/f881f855-c873-49ce-abd9-b50e9611b71f.csv?X-Amz-Security-Token=<TOKEN>&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20171016T214541Z&X-Amz-SignedHeaders=host&X-Amz-Expires=7200&X-Amz-Credential=<CREDENTIAL>&X-Amz-Signature=<SIGNATURE>"
      }
   ]
}

You can download the csv file of your data by using the dataURI link. AWS IoT Analytics also provides direct integration with Amazon QuickSight. Amazon QuickSight is a fast business analytics service that you can use to build visualizations, perform ad hoc analysis, and quickly get business insights from your data. For more information, see Visualizing AWS IoT Analytics Data with QuickSight.

Cleanup

To avoid any recurring cost, delete the Kinesis stream and Lambda Function.

Conclusion

This post has shown you the basics of injecting data into AWS IoT Analytics from a real-time streaming data source outside AWS. With these few steps, you’ve assembled a fully managed set of services for collecting, injecting, and analyzing IoT data. This approach could scale to millions of things.