The Internet of Things on AWS – Official Blog

Real-time metrics with AWS IoT Analytics and Amazon CloudWatch

A question frequently asked by our customers in relation to our AWS IoT Analytics offering, is how to achieve a real-time dashboard of data flowing through IoT Analytics. In this blog, I’ll show you one pattern for achieving real-time metrics by utilizing Amazon CloudWatch and the AWS Lambda activity of a pipeline.

I’ve simulated 25 smart pump devices that send messages continuously as I use them with IoT Device Simulator, a solution that that enables you to create and simulate hundreds of virtual connected devices, without having to configure and manage physical devices or develop time-consuming scripts. These smart pumps report the temperature, humidity, pressure, and vibration data. Messages from these devices flow into my Amazon S3 bucket. I’d like to visualize the data flowing through these smart pumps in real-time to find any anomalous or alarming patterns so that I can proactively repair these devices.

IoT Analytics helps me build a dashboard and analytical workflow that I can set up in minutes to visualize the data from my smart pumps. I use the BatchPutMessage API of IoT Analytics to send these stored messages to a channel. Messages flow from the channel into the pipeline, where they are processed. After processing, the pipeline sends messages to the data store. The pipeline is the key to getting my near real-time metrics.

First I look at the attributes sent by my devices in the pipeline as it infers the message attributes from my channel.

Pipeline inferring my smart pump messages

 

Now the first step is to create a Lambda function that routes data to Amazon CloudWatch, where we will build our dashboard. In this example, I’ve named the Lambda function metrics_to_cloudwatch. Once the Lambda function is created, it can be added to our pipeline as an activity.

The Lambda activity in IoT Analytics

Let’s take a closer look at how this Lambda function works. As it pertains to the pipeline, this Lambda simply returns the messages it receives so they can continue to be processed.

import json
import boto3
 
cloudwatch = boto3.client('cloudwatch')
 
# You can change these payload attributes based on your device message schema
ATTRIBUTES = ["temperature", "pressure", "vibration", "humidity"]
# The payload attribute which represents your device identifier
DEVICE_ID_ATTRIBUTE = "_id_"
# You can change this namespace name per your device and preferences
CLOUDWATCH_NAMESPACE = "SmartPump/Monitoring"
 
# Publish metric data to CloudWatch
def cw(deviceId, metricValue, metricName):
    metric_data = {
        'MetricName': metricName,
        'Dimensions': [{'Name': 'DeviceId', 'Value': deviceId}],
        'Unit': 'None', 
        'Value': metricValue
    }
 
    cloudwatch.put_metric_data(MetricData=[metric_data],Namespace=CLOUDWATCH_NAMESPACE) 
    return

# The handler loops through all the messages and evaluates if the message
#  has attributes for pressure, humidity, vibration or temperature.
#  These are the attributes that I want to graph in near real-time in
#  this example. If the attribute exists in the message, we call the cw() 
#  function to emit the custom metric to Amazon CloudWatch.
def lambda_handler(event, context):
    for e in event:
        print("Received a message: {}".format(str(e)))
        # Validate this event payload contains a device ID
        if DEVICE_ID_ATTRIBUTE in e:
            # Iterate through each attribute we want to publish to CloudWatch
            for attribute in ATTRIBUTES:
                # Validate the event payload contains the current attribute
                if attribute in e:
                    print("publishing {} to CloudWatch".format(attribute))
                    cw(e[DEVICE_ID_ATTRIBUTE],e[attribute], attribute)
 
    return event

 

Hopefully the code is self-explanatory, but in essence what happens is that the Lambda function in a pipeline is passed an array of messages, with the size of the array dependent on the batch size that you configure for the activity. The default of 1 means that the Lambda function will be invoked for each individual message, which is fine for scenarios where messages only arrive in the channel every few seconds. When you have a high frequency of incoming messages (every millisecond for example) or when BatchPutMessage API puts multiple messages into the channel at a time, you might want to set the batch size greater than 1 to consolidate Lambda execution.

An important permission step is that you need to make sure you have granted the Lambda function permission to access Amazon CloudWatch.

Lambda function can send messages to Amazon CloudWatch

You also must make sure that you have granted IoT Analytics permission to invoke your Lambda function, and you can do this with the following AWS CLI command:

aws lambda add-permission --*function*-name metrics_to_cloudwatch --statement-id 
metrics_to_cloudwatch_perms --principal iotanalytics.amazonaws.com --action 
lambda:InvokeFunction

If you forget this, and you have configured your logging options (see the Setting up CloudWatch Logs documentation), you’ll see error messages like this for your Amazon CloudWatch log stream “aws/iotanalytics/pipelines”.

[ERROR] Unable to execute Lambda function due to insufficient permissions; dropping 
the messages, number of messages dropped : 1, functionArn : arn:aws:lambda:us-west-2:
<accountid>:function:metrics_to_cloudwatch

That’s it! We can now use the dashboarding features of Amazon CloudWatch to plot our custom metrics on graphs and dashboards with fine-grained time resolution. For example, this is my dashboard of my data at 30-second intervals:

Example real-time metrics from the IoT Analytics Pipeline showing in Amazon CloudWatch

Using this real-time dashboard, I created a CloudWatch Alarm for high temperatures on my smart pumps. This alarm notifies me when the temperature value exceeds an acceptable threshold:

Example alarm from Amazon CloudWatch

In conclusion, we’ve seen how we can leverage the Lambda activity that is available in the AWS IoT Analytics pipeline to route the message attributes we want to a near real-time dashboard in AWS CloudWatch, and trigger alarms when needed.

Happy dashboarding!