The Internet of Things on AWS – Official Blog

How to Use Substitution Templates in the Rules Engine to Enrich IoT Messages

Post by Marcos Ortiz, an AWS Solutions Architect

The AWS IoT platform allows you to use substitution templates to augment the JSON data returned when a rule is triggered and AWS IoT performs a rule action. The syntax for a substitution template is ${expression}, where expression can be any expression supported by AWS IoT in SELECT or WHERE clauses. For more information about supported expressions, see AWS IoT SQL Reference.

The substitution template is an important feature for AWS IoT customers, especially when there’s a need  to dynamically add some contextual information that is not stored in the payload but is part of the MQTT communication (for example, the MQTT client ID or the MQTT topic structure) to some of your actions.

Background

In this blog post, we use a fictitious company called ACME Offshore, a rig contractor that leases drilling rigs to oil and gas operators. ACME Offshore wants to differentiate itself from its competitors by implementing an ambitious plan to transform its rigs into what they call next generation rigs. The idea is to provide all the rig sensors data in near real-time to its customers. In this post, we will show how to leverage the AWS IoT substitution templates in IoT rules so you can dynamically configure your IoT actions with functions provided by the AWS IoT SQL Reference.

We provide an AWS CloudFormation template that will allow you to create all the AWS resources required to run a demo. At the end of the post, we provide instructions for automatically deleting all the resources created.

Architecture

The following diagram shows the overall architecture.

As the rigs operate, thousands of sensors are continuously generating data. That data is being collected, aggregated, used locally on the rig, and sent to the AWS IoT platform. All the data sent by a rig will go to a “myrigs/id” topic, where “id” is the unique identifier for the rig. The rig can send two types of data: data points and events.

The following is an example of a payload sent by a rig:

{
    "datapoints":[
        {"recorded_at":1498679312, "well_depth":10, "bit_depth":0.0},
        {"recorded_at":1498679313, "well_depth":10, "bit_depth":0.1},
        {"recorded_at":1498679314, "well_depth":10, "bit_depth":0.2}
    ],
    "events": {
        "errors": [
            {"recorded_at":1498679312, "code":1001, "message":"Error 1001"},
            {"recorded_at":1498679313, "code":1002, "message":"Error 1002"}
        ],
        "warnings": [
            {"recorded_at":1498679313, "code":1003, "message":"Error 1003"},
            {"recorded_at":1498679314, "code":1004, "message":"Error 1004"}
        ],
        "infos": [
            {"recorded_at":1498679314, "code":1005, "message":"Error 1005"},
            {"recorded_at":1498679314, "code":1006, "message":"Error 1006"}
        ]
    }
}

Each payload can have a combination of data points and events. There are three AWS IoT rules to process the data coming from the rigs.

1. Data Points Rule

The data points rule subscribes to the “myrigs/+” topic so it will be able to augment data points sent by any rig. It matches on the MQTT topic only and triggers two IoT actions when new data points are available. The “+” and the “#” characters are wildcards that can be used to subscribe to IoT topics. For more information about topic wildcards, see Topics in the AWS IoT Developer Guide.

1.1 Anomalies Action

This action sends all the data points to an Amazon Kinesis stream. An AWS Lambda function reads from that stream in order to detect any anomalies in the data points. In the demo portion of this post, the Lambda function checks for the following scenarios:

  • bit_depth values less than 0.
  • well_depth values less than 0.
  • bit_depth values greater than well_depth values, where both bit_depth and well_depth are greater than 0.

When an anomaly is detected, the Lambda function writes it to a DynamoDB table. Recording data anomalies is important not just for sensor maintenance and quality control, but also for operations and security.

1.2 Firehose Action

This action sends all the data points to an Amazon Kinesis Firehose delivery stream. The purpose of the Data Points IoT rule is to create a data lake of rig telemetry in near real time. That data can be used later on for replay purposes. The ability to replay data makes it possible to reprocess the data against new versions of data point-consuming systems. It is also important for auditing and reporting.

2. Events Rule

The events rule subscribes to the “myrigs/+” topic so it can process all events being sent by any rig. It queries only the events portion of the payload and triggers one AWS IoT action when new events are available.

Events can be generated manually or automatically in cases like the following:

  • A rig operator requests support from the onshore team.
  • The rig state changes.
  • Pumps are turned on.

2.1 Events Action

This action sends all the events received to a Kinesis Firehose delivery stream. Events can be stored in a S3 bucket.

3. Error Events Rule

The error events rule subscribes to the “myrigs/+” topic so it can process all error events sent by any rig. It queries only the error events portion of the payload and triggers two AWS IoT actions when new error events are available.

3.1 Republish Action

This action republishes all error events coming from a given rig to a specific MQTT topic. For example, error events coming from rig 99 will be republished to “myrigs/99/errors”. This allows monitoring systems and remote support drilling engineers to be notified in real time of any errors occurring on rigs. All that’s required is to subscribe to the error event topic.

Systems can receive all errors coming from all rigs by subscribing to the “myrigs/+/errors” topic.

3.2 Notification Action

This action routes all error events to an Amazon SNS topic named “acme-rigs”. This allows the same remote support drilling engineers to receive notification (e-mail or text) even if they are not in front of a computer. Amazon SNS can also notify external monitoring systems through, for example, an HTTP callback request whenever error events are received for a given rig.

Provisioning the Demo

Click the Launch Stack button below.

This will redirect you to the AWS CloudFormation console. This demo is deployed in the US East (Northern Virginia) Region. Click Next.

On the Specify Details page, type the following. For SnsSubscriberEmail, type your e-mail address so you can receive e-mail notifications from Amazon SNS. Click Next.

You can customize options (tags, permissions, notifications) on the following page or simply click Next to continue with the default options.

On the Review page, select the I acknowledge that AWS CloudFormation might create IAM resources box, and then click Create to launch the AWS CloudFormation stack.

After a few minutes, the provisioning should be complete.

Select the AWS CloudFormation stack, and on the Outputs tab, copy the value for the S3BucketName key. Our Kinesis Firehose delivery stream will write data points and events to this bucket.

After the provisioning is complete, you will receive an e-mail from Amazon SNS.

Click Confirm subscription so you can receive emails from Amazon SNS whenever a rig sends error events.

Before we start testing the demo, let’s review the AWS IoT substitution templates. On the Rules page in the AWS IoT console, you will see the three rules we created.

On the acmeRigDatapoints AWS IoT rule, we use the newuuid() AWS IoT function to set the value of our Kinesis Streams partition key. The newuuid() function returns a random 16-byte UUID, so no matter how many payloads AWS IoT receives, we will always be evenly distributing traffic between all the shards of our Kinesis stream.

We also use the topic AWS IoT function on a query statement so we can add the rig_id information when writing the data points to DynamoDB or S3.

On the acmeRigAllEvents AWS IoT rule, we only use the topic function on the query statement, so we can add the rig_id information when writing the events to the Amazon S3 bucket.

On the acmeRigErrorEvents, we use the topic function to dynamically set the republishing topic for our AWS IoT republish action. This allows us to dynamically republish any errors published to the “myrigs/id” topic to the “myrigs/id/errors” topic. For example, rig 99 sends payloads to “myrigs/99” and any errors are republished to “myrigs/99/errors”. If we are talking about rig 5, those topics would be “myrigs/5” and “myrigs/5/errors”, respectively.

We also use the same topic function to add rig_id context to the payload of our SNS notification.

Testing the Demo

Now you should be all set to test the demo. On the Test page in the AWS IoT console, in Subscription topic, type “myrigs/1” and then click Subscribe to topic.

Follow the same steps to subscribe to the “myrigs/1/errors” topic. We want to subscribe to this topic so we can test our AWS IoT republish action.

To simulate a rig sending a payload to your AWS IoT endpoint, copy the following JSON payload to your clipboard. (In this case, the rig ID is 1.)

The sample payload we are using has the following anomalies:

  1. At 1498679312, well_depth is less than 0.
  2. At 1498679313, bit_depth is less than 0.
  3. At 1498679314, bit_depth is greater than well_depth

Click the “myrigs/1” topic, delete all the context in the text area, and then paste the payload you just copied to your clipboard into that text area. Now click Publish to topic.

If you click on the “myrigs/1/errors” topic, you should be able to see that it received the errors you published on the payload.

Navigate to the DynamoDB table. On the Items tab, you will be able to see these three anomalies saved on that table.

Check the email you used on our AWS CloudFormation stack. You should receive a message with the errors we sent on our sample payload:

After you publish the test payload, it should take about one minute for Amazon Kinesis Firehose to write the data points and events to the S3 bucket.

Cleaning Up

After you finish testing, be sure to clean up your environment so you are not charged for unused resources.

Go to the Amazon S3 console and delete the bucket contents.

Go to the AWS CloudFormation console, select the “acme-iot” stack, and then click Delete Stack.

Conclusion

We hope you found this demo useful. Feel free to leave your feedback in the comments.