The Internet of Things on AWS – Official Blog

Configuring near real-time notification for asset based monitoring with AWS IoT Events

Introduction

In today’s market, business success often lies in the ability to glean accurate insights and prediction. However, floor engineers and analysts have a hard time getting the information at the right instance to make an informed decision when there is a failure during operations.

To make it easy to detect and respond to events from IoT sensors and applications, AWS IoT Events allows you to detect events across thousands of IoT sensors sending different telemetry data. Events are patterns of data that could help identify more complicated circumstances than expected, such as changes in equipment when a belt is stuck, or motion detectors using movement signals to activate lights and security cameras. AWS IoT Events allows you to simply select the relevant data sources to ingest, define the logic for each event using simple ‘if-then-else’ statements, and select the alert or custom action to trigger when an event occurs. It continuously monitors data from multiple IoT sensors and applications, and integrates with other services, such as AWS IoT Core, to enable early detection and unique insights into events.

In this blog post, we explain how to design AWS IoT Events for two key issues faced by customers:

1. How to monitor and send alerts for assets at scale.

2. How to suppress repeated alerts during failures.

Solution overview

In this post, we describe the configuration of AWS IoT Events and Amazon Simple Notification Service (SNS) to send near real-time notification during failures, and cover data analysis using Amazon Athena.

  Architecture for near real-time notification using AWS IoT Events

Solution walk through

1) Prerequisites / Assumptions

We assume the following services are configured in your AWS environment, and devices are connected to AWS IoT core, for example a conveyor motor.

a) An on-premises edge gateway server where AWS IoT Greengrass service is running with required AWS IoT certificates and connected to AWS IoT Core as shown in the architecture. Additionally, your edge gateway sends data to the AWS IoT Core topic ‘iot/topic’ which can be routed to other permitted AWS services via AWS IoT Rules.

b) An Amazon Simple Storage Service (Amazon S3) exists in your AWS account that you can use throughout this blog post. Throughout the blog post we assume the Amazon S3 bucket name is datalakes3bucket. We will use a sub-folder within that bucket that we call datastore. For security reasons we recommend turning on server-side encryption for your Amazon S3 bucket.

c) An existing Amazon SNS topic with subscription pointing to desired email or mobile number in your AWS account. Throughout the blog post we assume that the topic name is ‘iot/topic.’

2) Setting up the data lake

This section talks about setting up data analytics that can be used to run analytics on the stored data. Data received in AWS IoT Core on a topic in the message broker is then forwarded to Amazon Kinesis Data Firehose. Kinesis Data Firehose is an extract, transform, and load (ETL) service that reliably captures, transforms, and delivers streaming data to storage destinations like Amazon S3.

Step 1: Configure AWS Key Management Service (AWS KMS) for encrypting data at rest in the Kinesis Data Firehose delivery stream.

a)  Sign in to the AWS Management Console

b)  Search for AWS KMS

c)   Select Create new

d)  Choose the Key type to be Symmetric

e)  Leave the rest as default and create the key

                                                          Creating a symmetric key in AWS Key Management Service

This KMS key will be used while configuring Kinesis Data Firehose delivery stream in subsequent steps.

Step 2: Configure Amazon CloudWatch log group and log stream which will store the Kinesis Data Firehose log information.

a)   Search Amazon CloudWatch

b)   Select Log groups

c)   Supply the log group name, in this blog post we will use <example>

d)  Set the retention period according to your requirements

                                                                  Setup Amazon CloudWatch log group for the Kinesis Data Firehose delivery stream

Step 3: Create a role with permissions in Amazon Identity and Access Management (IAM) that allows the Kinesis Data Firehose delivery stream to access the AWS KMS Key as well as the S3 bucket.

a)  Search AWS IAM

b)  Select Policies

c)  Substitute below policy document

Here and throughout this post, remember to replace the placeholder account ID with your own account ID, region with your own region, s3 bucket with your own bucket name.

{
   "Version": "2012-10-17",
   "Statement": [
     {
         "Sid": "",
         "Effect": "Allow",
         "Action": [
             "s3:AbortMultipartUpload",
             "s3:GetBucketLocation",
             "s3:GetObject",
             "s3:ListBucket",
             "s3:ListBucketMultipartUploads",
             "s3:PutObjectAcl",
             "s3:PutObject"
         ],
         "Resource": [
           "arn:aws:s3:::datalakes3bucket",
           "arn:aws:s3:::datalakes3bucket/datastore/*"
         ]
     },
     {
        "Sid": "",
        "Effect": "Allow",
        "Action": [
          "kms:GenerateDataKey",
          "kms:DescribeKey"
      ],
      "Resource": "arn:aws:kms:<region>:123456789012:key/<s3 bucket kms key>"
      },
      {
         "Sid": "",
         "Effect": "Allow",
         "Action": "logs:PutLogEvents",
         "Resource": "arn:aws:logs:<region>:123456789012:log-group:/aws/kinesisfirehose/kinesis-log-group:log-stream:kineis-log-stream"
      }
    ]
}

Next we will create the role to for the Kinesis Data Firehose delivery stream will assume. We also will attach the permission policy created above.

a)   Select Roles

b)  Choose Create role

c)   Within the Use case selection menu, choose Kinesis and then Kinesis Firehose

d)  In the Permission policies selection field, search and choose the permission policy created above.

e)   Give the role a name, leave the rest as default and choose Create role

Step 4: Configure the Kinesis Data Firehose delivery stream to send data to the S3 bucket.

a)   Search Amazon Kinesis

b)   Choose Create delivery stream

c)   Set source as Direct PUT

d)  Set destination to Amazon S3

e)  Set the Delivery stream name

f)   Select your pre-created S3 bucket in the Destination settings

Supply Destination settings

g)   Go to Advanced settings

h)  Enable Amazon CloudWatch Error logging

i)  Enable Server-Side encryption with the AWS KMS key you created

j)  In the Permissions section, Choose existing IAM role and select the role we created in Step 3

k)  Leave all the other options as default and choose Create delivery stream

Configure the Advanced settings in the Kinesis Data Firehose delivery stream creation

Step 5: Setup an AWS IoT Core rule to filter data received on a topic (iot/topic) and route it to the Kinesis Data Firehose delivery stream. Please see the example data below that might arrive on the topic:

{
"Plant":
"demo",
"MachineName":
"conveyor-motor",
"EngineName":
"engine1",
"MessageTimestamp":
"2022-01-01",
"temperature": 90,
"amps": 15,
"Vibration": 100
}

Sample Data send from the edge device to the AWS IoT Core message broker

a) In the console, search AWS IoT Core and select Rules under Act

b) Select Create rule and provide it with a Name and Description. Choose Next

Supply AWS IoT Core rule name

c) We use a SQL statement to filter the incoming data on the topic to only receive required fields from the messages. Copy and paste the below query statement into the field of the SQL statement

SELECT MachineName, EngineName, MessageTimestamp, temperature, amps, Vibration FROM ‘iot/topic’

d) Choose Next

e)  In the section Rule actions, select Kinesis Firehose stream; search for the Kinesis Firehose stream we created prior

f) Choose Create new role in the Field below IAM role to create a new role with the according permissions

Step 6. We need to allow the Kinesis data firehose stream to use the AWS KMS key through adjusting the resource policy in AWS KMS.

a) Modify resource policy of the AWS KMS key associated with the Amazon S3 bucket to allow the Kinesis data firehose stream to use it

{
  "Version": "2012-10-17",
  "Id": "key-consolepolicy",
  "Statement": [
    {
      "Sid": "Allow Kinesis to use the key",
      "Effect": "Allow",
      "Principal": {
        "AWS": " arn:aws:iam::123456789012:role/firehoserole"
    },
    "Action": [
      "kms:GenerateDataKey*",
      "kms:DescribeKey"
    ],
    "Resource": " arn:aws:kms:<region>:123456789012:key/<s3 bucket kms key>"
    }
  ]
}

3) Setup services for near real time alerting and notification

Step 1. Let’s configure AWS IoT Events for near real-time configuration

a) Search AWS IoT Events

b) Select Inputs and Create input. For this Blog Post we chose the Input name to be Alert_Input.

c)  Below you find the content for the JSON file you need to create and upload as part of the Input configuration

{

"MachineName": "<Machine Name/ID>",

"EngineName": "<Engine Name/ID>",

"MessageTimestamp": "<TimeStamp>",

"temperature": <temp>,

"amps": <amps>,

"Vibration": <vibration>

}

Input configuration within AWS IoT Events

Step 2. Let’s now setup the AWS IoT Rule to route incoming data to AWS IoT Events.

a) Search AWS IoT Core and select Rules under Act

b) Enter the name for the AWS IoT Rule and choose Next

c) Paste the SQL query to see below into the SQL statement field; replace the topic name placeholder in the FROM section with the topic where your data arrives

d) Choose Next

SELECT NodeID as MachineName, SubNodeID as EngineName, MessageTimestamp,<br />  get(get((SELECT Values FROM Data WHERE Name = 'engine1/temp'), 0).Values, 0).Value AS<br />  temperature, get(get((SELECT Values FROM Data WHERE Name = 'engine1/amps'), 0).Values, 0).Value AS amps,<br />   get(get((SELECT Values FROM Data WHERE Name = 'engine1/vib'), 0).Values, 0).Value AS Vibration FROM 'iot/topic'

In the above query we are selecting the columns names from the topic. The get() function used to read the values from the zeroth element from the nested json object.

e) As Rule actions, select IoT Event and choose the previously created Input name in the dropdown menu.

Step 3. In this section, you define a detector model (a model of your equipment or process) using states. For each state, you define conditional (Boolean) logic that evaluates the incoming inputs to detect a significant event. When an event is detected, it changes the state and can trigger additional actions.

In your states, you also define events that can execute actions whenever the detector enters or exits that state or when an input is received (these are known as OnEnter, OnExit and OnInput events). The actions are executed only if the event’s conditional logic evaluates to true. We are going to see how to send notification when threshold is breached and also set timer between subsequent alert notifications. In the following steps we will go through all the states, their definitions as well as connections needed to create the whole detector model.

a) Search AWS IoT Events.

b) Select Create detector model, and then Create new

c) The first detector state has been created for you. To modify it, select the circle with label State_1 in the main editing space and set the State name to “Running”

Configuring the first AWS IoT Event state

d) In the field of OnEnter choose Add event. On the OnEnter event page, enter an Event name and the Event condition. The name of the event is init and the event condition is true. This indicates that the event is always triggered when the state is entered.

e) Choose Add action

    • Select Set variable
    • For Variable operation, choose Assign value
    • For Variable name, enter the name of the variable to set
    • For Variable value, enter the value 0 (zero)

Configuring OnEnter event for State Running

f) Choose Save

g) Create another state by dragging and dropping State button. Modify the state name to Danger_OverVibration. In this state, we are evaluating if vibration exceeded threshold, if so then the action is set to send alert event to Amazon SNS

Configuring Danger_OverVibration State

h) Pause on the first state (Running). An arrow appears on the circumference of the state.

i) Click and drag the arrow from the first state (Running) to the second state (Danger_OverVibration). A directed line from the first state to the second state (labeled Untitled) appears.

j) Select the Untitled line. In the Transition event pane, enter an Event name and Event trigger logic.

 Event name: OverVibration

 Event Logic: $input.Alert_Input.Vibration > Threshold limit

 Event Actions:

  Set variable:

   Variable Operation: Assign Value

   Variable name: vibThresholdBreached

   Assign Value: $variable.vibThresholdBreached + 3

k) Choose Save

l) Pause on the second state (Danger_OverVibration). An arrow appears on the circumference of the state.

m) Click and drag the arrow from the second state (Danger_OverVibration) to the first state (Running). A directed line from the second state to the first state (labeled Untitled) appears.

n) Select the Untitled line. In the Transition event pane, enter an Event name and Event trigger logic.

 Event name: VibBackToNormal

 Event Logic: $input.Alert_Input.Vibration < Threshold limit && $variable.vibThresholdBreached <= 0

o) Choose Save

p) In the field of OnEnter choose Add event. On the OnEnter event page, enter an Event name and the Event condition. The name of the event is Vibration_Breached and the event condition $variable.vibThresholdBreached > 1

1) Choose Add action

Event name: Vibration_Breached

Event condition: $variable.vibThresholdBreached > 1

Event actions:

 Set Variable:

  Variable operation: Assign Value

  Variable name: initVibNotification

  Assign value: 1

2) Choose Save

q. In the field of OnInput choose Add Event. We need to create an event condition to send email alert, then start a timer (5 mins) so that during this period subsequent email alerts will not be triggered, and finally trigger email when timeout ends. This process will continue until the vibration drops below threshold thereby ensuring email notification but only once within the time limit.

          1)    Choose Add action

Event name: Email Alert

Event condition: $variable.vibThresholdBreached > 2 && $variable.initVibNotification == 1

Event actions:

 Set Timer:

  Select operation: Create

  Timer name: vibTimer

  Enter duration: 5 Minutes (This is the time during which alert will be disabled)

 Send SNS message

  SNS Topic: arn:aws:sns: :<region>:123456789012:<Topic Name>

  Select Default Payload

  Payload Type: JSON

 Set Variable:

  Variable operation: Assign Value

  Variable name: initVibNotification

  Assign Value: 0

2)  Choose Save

3)  Choose Add action

Event name: OverVibration

Event condition: $input.Alert_Input.Vibration > Threshold limit

Event actions:

  Set Variable:

    Variable operation: Assign Value

    Variable name: vibThresholdBreached

    Assign Value: 3

4) Choose Save

5) Choose Add action

Event name: TimerCheck

Event condition: timeout("vibTimer")

Event actions:

  Set Variable:

    Variable operation: Assign Value

    Variable name: initVibNotification

    Assign Value: 1

6)  Choose Save

7)  Choose Add action

Event name: Normal

Event condition: $input.Alert_Input.Vibration < <Threshold limit>

Event actions:

  Set Timer:

    Select operation: Destroy

    Timer name: vibTimer

  Set Variable:

    Variable operation: Decrement

    Variable name: vibThresholdBreached

  Set Variable:

    Variable operation: Assign Value

    Variable name: initVibNotification

    Assign Value: 1

8)  Choose Save

r)  In the field of OnExit choose Add event. On the OnEnter event page, enter an Event name and the Event condition. The name of the event is Normal_Vibration_Restored and the event condition true.

a) Choose Add action

Event actions:

  Set SNS message:

    SNS Topic: arn:aws:sns: :<region>:123456789012:<Topic Name>

    Select Default Payload

    Payload Type: JSON

b)  Choose Save

4) Analyzing Data using Amazon Athena

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. The AWS Glue Crawler recognizes the partition structure of the dataset and populates the AWS Glue data catalog. Once crawled, the AWS Glue catalog groups data into logical tables and makes partition columns available for querying through Athena, which can be connected to any preferred business intelligence tool for visualization.

1. Search for AWS Glue

2. Navigate to Crawlers

3. Click on Add Crawler

4. Create an AWS Glue table. See the screenshot below to get insights into the configuration used.

    Crawler configuration

5) Query the data using Athena

Daily Data View: In the below example we are creating a view from the sample data by flattening the json data.

CREATE OR REPLACE VIEW view_name AS<br />WITH<br />dataset AS (<br />SELECT<br />schemaversion<br />, nodeid<br />, CAST("from_iso8601_timestamp"(messagetimestamp) AS timestamp) Message_TimeStamp<br />, CAST("from_iso8601_timestamp"(messagetimestamp) AS date) Message_Date<br />, subnodeid<br />, compressed<br />, "split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 1) engine1_amp<br />, CAST("split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 2) AS double) value_engine1_amp<br />, "split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 3) engine1_vib<br />, CAST("split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 4) AS double) value_engine1_vib<br />, "split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 5) engine1_temp<br />, CAST("split_part"("replace"("replace"("replace"("json_format"(CAST(data AS json)), '"', ''), '[', ''), ']', ''), ',', 6) AS double) value_engine1_temp<br />FROM<br />“athenaad“."assetbasedmonitoring"<br />ORDER BY messagetimestamp DESC<br />)<br />SELECT *<br />FROM<br />dataset<br />WHERE (Message_Date = current_date)<br />ORDER BY Message_TimeStamp DESC

Clean up the resources

To avoid incurring future charges, follow these steps to remove the example resources:

1. Delete the IoT Events model. Search IoT Events, Under Detector models select the model created and select Delete.

2. Delete the IoT Events Input. Search IoT Events, Under Inputs select the Input created and select Delete.

3. Delete the AWS Glue database and table. Search Glue, under Tables select the table that was created, click on Action drop down and select Delete table.

4. Delete AWS Athena. Search Athena, under Workgroups select the workgroup that was created, click on Action drop down and select Delete.

Conclusion

The configuration of AWS IoT Events and Amazon SNS helps to achieve faster response times during failure. In this post, we used a conveyor motor as an example, but the solution extends to a wide breadth of industries, such as products in a retail context.

In a business context the solution reduces the downtime in a manufacturing or retail industry which sends out notifications, and improves data analytic capabilities for getting better insights.

Please visit AWS IoT Events and Amazon SNS to learn more about configurations.

About the authors

Prashanth Shankar Kumar

Prashanth Shankar Kumar is a Data Lake Data Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Vasanth Jeyaraj

Vasanth Jeyaraj is a Cloud Infra Architect with Amazon Web Services (AWS). He supports enterprise customers in building well architected infrastructure with a focus on Automation, Infrastructure as Code, and also focussed on helping customers speed their cloud-native adoption journey by modernizing their platform infrastructure, internal architecture using MicroServices Strategy, Containerization, DevOps. Outside of work, he loves spending time with family and traveling.