The Internet of Things on AWS – Official Blog

Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose

Introduction

When sending data from Internet of Things (IoT) devices to a data lake, you may need to enrich the device data payload with additional metadata in the cloud for further data processing and visualization. There are multiple reasons this data might not exist in the device payload, such as minimizing the device payload in limited bandwidth environments or modifying it with business inputs in the cloud. For example, a machine on the factory floor might be assigned to different operators during the day. This variable business data would be stored in a database. In your data lake, you might need this information to be stored along with the payload.

In this blog post, you will learn how to ingest enriched IoT data to a data lake in near real-time.

Prerequisites

  • An AWS account
  • AWS Command Line Interface (AWS CLI). See AWS CLI quick setup for configuration.

Use case definition

Let’s assume that in your logistics company, you have containers equipped with sensor-enabled IoT devices. When the container is loaded into a ship, the container ID is associated with the ship ID. You need to store the IoT device payload with the ship ID in your data lake.

In such a use case, the sensor payload comes from the IoT device attached to the container. However, the associated ship ID is only stored in the metadata store. Therefore, the payload must be enriched with the ship ID before putting it into the data lake.

Solution architecture

Architecture diagram for ingesting enriched IoT data into Amazon S3 by using Amazon Kinesis Data Firehose

In the architecture diagram,

  1. The IoT devices stream payloads to the AWS IoT Core message broker to a specific MQTT topic device/data/DEVICE_ID. The AWS IoT Core message broker allows devices to publish and subscribe to messages by using supported protocols.
  2. The AWS IoT rule is triggered when there is a payload in its topic. It is configured with an Amazon Kinesis Data Firehose action in this use case. You can use AWS IoT rules to interact with AWS services by calling them when there is a message in a specific MQTT topic or directly by using Basic Ingest feature.
  3. Amazon Kinesis Data Firehose buffers the device payloads before delivering them to the data store based on the size or the time, whichever happens first. Kinesis Data Firehose delivers real-time streaming data to destinations for storing or processing.
  4. Once the buffer hits the size or the time threshold, Kinesis Data Firehose calls an AWS Lambda function to enrich the device payloads in batches with the metadata retrieved from an Amazon DynamoDB AWS Lambda is a serverless compute service that runs your code for any type of application. Amazon DynamoDB is a fully managed NoSQL database that provides fast performance.
  5. The enriched payloads are returned back to Kinesis Data Firehose to deliver to the destination.
  6. The enriched payloads are put into an Amazon Simple Storage Service (Amazon S3) bucket as a destination. Amazon S3 is an object storage service which stores any amount of data for a range of use cases.

AWS CloudFormation template

Download the AWS Cloudformation template from the code repository.

The AWS CloudFormation template deploys all the necessary resources to run this example use case. Let’s have a closer look at AWS IoT rules, Kinesis Data Firehose, and AWS Lambda function resources.

AWS IoT rules resource

IoTToFirehoseRule:
  Type: AWS::IoT::TopicRule
  Properties:
    TopicRulePayload:
      Actions:
        -
          Firehose:
            RoleArn: !GetAtt IoTFirehosePutRecordRole.Arn
            DeliveryStreamName: !Ref FirehoseDeliveryStream
            Separator: "\n"
      AwsIotSqlVersion: ‘2016-03-23’
      Description: This rule logs IoT payloads to S3 Bucket by aggregating in Kinesis Firehose.
      RuleDisabled: false
      Sql: !Ref IotKinesisRuleSQL

The AWS IoT rule takes a SQL parameter which defines the IoT topic to trigger the rule and data to extract from the payload.

  • In the example, the SQL parameter is set to SELECT *, topic(3) as containerId FROM ‘device/data/+’ by default. SELECT * means the whole payload is taken as it is and containerId is generated from the second item in the MQTT topic and included to the payload.
  • FROM ‘device/data/+’ describes the IoT topic that will trigger the AWS IoT rule. + is a wildcard character for MQTT topics and the IoT devices will publish data payloads to device/data/DEVICE_ID topic to trigger this rule.

The AWS IoT rule also defines actions. In the example, you can see a Kinesis Data Firehose action which defines the target Kinesis Data Firehose delivery stream and the AWS Identity and Access Management (IAM) role needed to put records into this delivery stream. A separator can be chosen to separate each record, in the given example it is a new line character.

Kinesis Data Firehose delivery stream resource

FirehoseDeliveryStream:
  Type: AWS::KinesisFirehose::DeliveryStream
  Properties:
    ExtendedS3DestinationConfiguration:
      BucketARN: !GetAtt IoTLogBucket.Arn
      BufferingHints:
        IntervalInSeconds: 60
        SizeInMBs: 1
      Prefix: device-data/
      RoleARN: !GetAtt FirehosePutS3Role.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Type: Lambda
             Parameters:
               - ParameterName: LambdaArn
                  ParameterValue: !Sub '${FirehoseTransformLambda.Arn}:$LATEST'
               - ParameterName: RoleArn
                  ParameterValue: !GetAtt FirehoseLambdaInvokeRole.Arn

Kinesis Data Firehose delivery stream must define a destination to put the stream into. It supports different types of destinations. You can find the available destination types and their usage in this documentation. In this example, you are going to use Amazon S3 as the destination.

The example Delivery Stream resource defines the following properties:

  • BucketARN: the destination bucket which will store the aggregated data. The destination bucket is created by the CloudFormation stack.
  • BufferingHints: the size and time threshold for data buffering. In this example, they are set to 1MB and 60 seconds respectively to see the results faster. It can be adjusted according to the business needs. Keeping these thresholds low will cause the Lambda function to be invoked more frequently. If the thresholds are high, the data will be ingested to the data store less frequently, therefore, it will take time to see the latest data in the data store.
  • Prefix: the created objects will be put under this prefix. Kinesis Data Firehose partitions the data based on the timestamp by default. In this example, the objects will be put under the device-data/YYYY/MM/dd/HH folder. Kinesis Data Firehose has advanced features for data partitioning such as dynamic partitioning. The partitioning of the data is important when querying the data lake. For example, if you need to query the data per device basis by using Amazon Athena, scanning only the partition of the relevant device ID will significantly reduce the scan time and the cost. You can find details on partitioning in this documentation.
  • RoleARN: this is the IAM role that gives PutObject permission to Kinesis Data Firehose to be able to put aggregated data into the Amazon S3 bucket.
  • ProcessingConfiguration: As described in the use case, a transform Lambda function will enrich the IoT data with the metadata. Processing Configuration defines the processor which is a Lambda function in the example. For each batch of data, Kinesis Data Firehose will call this Lambda function for the transformation of the data. You can read more about data processing in this documentation.

Transformation Lambda Function

As you can see in the following Python code, Kinesis Data Firehose returns a batch of records where each record is a payload from the IoT devices. First, the base64 encoded payload data is decoded. Then, the corresponding ship ID comes from the DynamoDB table based on the container ID. The payload is enriched with the ship ID and encoded back to base64. Lastly, the record list is returned back to Kinesis Data Firehose.

Once Kinesis Data Firehose receives the records, it puts them as an aggregated file into the Amazon S3 bucket.

import os
import boto3
import json
import base64

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['METADATA_TABLE'])
records = []

def function_handler(event, context):
  for record in event["records"]:
    # Get data field of the record in json format. It is a base64 encoded string.
    json_data = json.loads(base64.b64decode(record["data"]))
    container_id = json_data["containerId"]

    # Get corresponding shipId from the DynamoDB table
    res = table.get_item(Key={'containerId': container_id})
    ddb_item = res["Item"]
    ship_id = ddb_item["shipId"]

    # Append shipId to the actual record data
    enriched_data = json_data
    enriched_data["shipId"] = ship_id

    # Encode the enriched record to base64
    json_string = json.dumps(enriched_data).encode("ascii")
    b64_encoded_data = base64.b64encode(json_string).decode("ascii")

    # Create a record with enriched data and return back to Firehose
    rec = {'recordId': record["recordId"], 'result': 'Ok', 'data': b64_encoded_data}
    records.append(rec)
  return {'records': records}

Deployment

Run the following command in a terminal to deploy the stack.

aws cloudformation deploy --stack-name IoTKinesisDataPath --template-file IoTKinesisDataPath.yml --parameter-overrides IotKinesisRuleSQL="SELECT *, topic(3) as containerId FROM 'device/data/+'" --capabilities CAPABILITY_NAMED_IAM

After the deployment is complete, run the following command in a terminal to see the output of the deployment.

aws cloudformation describe-stacks --stack-name IoTKinesisDataPath

Note the IoTLogS3BucketName, MetadataTableName output parameters.

Testing

After the deployment is complete, first thing you need to do is to create a metadata item for data enrichment. Run the following command to create an item in the DynamoDB table. It will create an item with cont1 as containerId and ship1 as shipId. Replace IoTKinesisDataPath-MetadataTable-SAMPLE parameter with the DynamoDB table output parameter from the CloudFormation stack deployment.

aws dynamodb put-item --table-name IoTKinesisDataPath-MetadataTable-SAMPLE --item '{"containerId":{"S":"cont1"},"shipId":{"S":"ship1"}}'

In a real-life scenario, the devices publish the payloads to a specific MQTT topic. In this example, instead of creating IoT devices, you will use AWS CLI to publish payloads to MQTT topics. Run the following command in a terminal to publish a sample data payload AWS IoT Core. Pay attention to the payload field of the command, the only data provided by the device is the dynamic data.

aws iot-data publish --topic "device/data/cont1" --payload '{"temperature":20,"humidity":80,"latitude":0,"longitude":0}' --cli-binary-format raw-in-base64-out

Now, navigate to Amazon S3 from the AWS Management Console and select the bucket that has been created with the CloudFormation stack. You should see the device-data folder in this bucket. It may take up to 1 minute for the data to appear due to the buffering configuration that is set for the Firehose delivery stream. If you navigate into the device-data/YYYY/MM/dd/HH folder, you will see an object has been created. Go ahead and open this file. You will see the content of the file is the data payload with enriched shipId field.

{“temperature”: 20, “humidity”: 80, “latitude”: 0, “longitude”: 0, “containerId”: “cont1”, “shipId”: “ship1”}

Troubleshooting

In case of failure in the system, the following resources can be useful for analyzing the source of the problem.

To monitor AWS IoT Core Rules Engine, you need to enable AWS IoT Core logging. This will give detailed information about the events happening in AWS IoT Core.

AWS Lambda can be monitored by using Amazon CloudWatch. The example CloudFormation template has necessary permissions to create a log group for the Lambda function logging.

In case of failure, Kinesis Data Firehose will create a processing-failed folder under the device-data prefix in the AWS IoT Rules Engine action, transform Lambda function or Amazon S3 bucket. The details of the failure can be read as json objects. You can find more information in this documentation.

Clean up

To clean up the resources that have been created, first empty the Amazon S3 bucket. Run the following command by changing the bucket-name parameter with the name of the bucket deployed by the CloudFormation stack. Important: this command will delete all the data inside the bucket irreversibly.

aws s3 rm s3://bucket-name --recursive

Then, you can delete the CloudFormation stack by running the following command in a terminal.

aws cloudformation delete-stack --stack-name IoTKinesisDataPath

Conclusion

In this blog, you have learned a common pattern of enriching IoT payloads with metadata and storing cost effectively in a data lake in near real-time by using AWS IoT Rules Engine and Amazon Kinesis Data Firehose delivery stream. The proposed solution and the CloudFormation template can be used as a baseline for a scalable IoT data ingestion architecture.

You can read further about AWS IoT Core Rules Engine and Amazon Kinesis Data Firehose. Best practices for using MQTT topics in the AWS IoT Rules Engine will guide you to define your topic structures.

Ozan Cihangir

Ozan Cihangir

Ozan is a Prototyping Engineer at AWS. He helps customers to build innovative solutions for their emerging technology projects in the cloud. LinkedIn