The Internet of Things on AWS – Official Blog

Determining state in systems with high-frequency updates using AWS IoT Greengrass

AWS IoT Greengrass extends AWS managed services to edge device systems, providing security, message routing, and local state processing. It also provides a central hub for connectivity with other AWS services using AWS Lambda functions.

Some edge systems with multiple devices produce high-frequency updates, such as device shadow updates. It can be difficult to determine the definitive state of the system at a particular point in time. It also may prove difficult to move data from the edge to the cloud for further processing.

Pre-processing data from AWS IoT Greengrass devices at the edge before sending to the cloud can result in the following benefits:

  • Decreased risk of reporting an inaccurate state: evaluating all received data from devices allows for smart determination of system state, rather than accepting device updates at face value.
  • Decreased network traffic: pre-processing can reduce the size and volume of data that is transmitted to the cloud.
  • Decreased data processing (compute) costs: processing is performed on the edge device running AWS IoT Greengrass Core, which does not incur additional usage costs for invoking local Lambda functions.

Challenges associated with determining state in edge systems with multiple devices

In a system containing multiple devices that monitor the same real-life object, each one of these devices publishes an update event when it detects a state change in the real-life object.  In theory, these updates should all reflect the same change. However, in systems using unreliable methods to determine a state change (like ML models), device updates can disagree.

The problem of determining the state of a real-life object in a system such as this becomes more complex when considering systems with hundreds of devices.  For example, consider a use case wherein hundreds of IoT devices are being used to monitor safety conditions, such as carbon monoxide emissions, inside a factory.  In this scenario, whenever carbon monoxide emissions rise above a certain level, all devices should emit an update. However, device malfunctions or inaccurate readings might cause any single device to issue an “incorrect” update.

Consider the simple system in the following diagram. Four devices monitor the same real-life object.  When the state of the real-life object changes, one device publishes an update that does not agree with the majority of updates from other devices.  If these devices publish updates at a high frequency, it can be difficult to determine the state of a system at any point in time.

So—how can you reduce the risk of the system reporting inaccurate state information to downstream systems while simultaneously reducing network traffic and processing costs?

In this post, you create a local Lambda function running on an AWS IoT Greengrass core. The core is subscribed to all device update events, which collates received data and derives the state of the system at a given point in time.

If you are not familiar with AWS IoT Greengrass, check out Getting Started with AWS IoT Greengrass.

Solution overview

Consider a scenario: an edge system with 100 carbon monoxide detectors monitoring air quality in a factory.  These detectors publish a device shadow update one time every 1–5 seconds with 90% accuracy. The following is an example shadow document, with the “property” quality representing the parts per million (ppm) of carbon monoxide measured by the device.

{
   "current":{
      "state":{
         "desired":{
            "property":"32"
         }
      },
      "version":1116,
      "metadata":{
         "desired":{
            "property":{
               "timestamp":1559069499
            }
         }
      }
   },
   "previous":{
      "state":{
         "desired":{
            "property":"8"
         }
      },
      "version":1115,
      "metadata":{
         "desired":{
            "property":{
               "timestamp":1559069498
            }
         }
      }
   },
   "clientToken":"027d760a-3854-4e9e-a7a2-e98e69389b57"
}

In this system, there are multiple device updates each second, each reporting slightly different amounts of carbon monoxide.  So what is the state of the system at any point in time?  Build a local Lambda function to average the received data and send a single, authoritative state that is stored in Amazon DynamoDB.

Create a Lambda function for pre-processing

The Lambda function uses the concept of a “decision window,” which is a time period bookended by two timestamps. Within the decision window, the average state of shadow updates is calculated and written to DynamoDB.

Consider a 10-second decision window—that is, publishing one state update every 10 seconds. This provides the following benefits:

  • More accurate system state observations.
    • You can now say “the PPM was 60.5 over this 10-second window.”
  • Reduced network traffic.
    • Transmits only one message every 10 seconds to DynamoDB.
  • Reduced processing costs.
    • Performs all message processing at the edge and only writes one row to DynamoDB.

Upon receiving a shadow update, the Lambda function evaluates the timestamp and places the message in a queue with other received messages that exist within the same decision window. If there’s no existing decision window, the Lambda function creates a new one. When the decision window expires (that is, the current time is past the end of the decision window), the messages in the queue are evaluated, averaged, and written to DynamoDB.

Use two data structures to help calculate the average of a decision window:

  • UPDATE_TIMESTAMP_WINDOWS: A double-ended queue (deque) structure containing a list of the start and end timestamps of decision windows. For example: [1553485283, 1553485284]
  • CO_DETECTOR_UPDATE_QUEUES: A dictionary of queue objects containing shadow updates that fall into each decision window. The key for these queue objects is a hash of the sum of the start/stop timestamp of the decision window.

In the following code, upon invocation of the Lambda function using function_handler(), perform this procedure:

1.     Check if the received shadow update timestamp falls into a decision window that currently exists, by checking the timestamp of the message against start/stop timestamps in UPDATE_TIMESTAMP_WINDOWS.  In this example, I have limited the deque to track only the 10 most recent decision windows.

2.     If the message is found to be within an existing decision window, add it to the queue corresponding to that decision window and return from the function.

3.     If the message is not within any existing decision windows, create a new decision window and corresponding hash key. Then, create a new queue, add the message, create a new Python Timer object, and return from the function.

Create the timer with the hash key and start/stop timestamps as arguments and point to the aggregate_co_detector_state() method.  This Timer object executes after DECISION_WINDOW_LENGTH seconds.

"""
This module contains an example Lambda function (Python 2.7) to be deployed with AWS Greengrass that aggregates messages received via devices
"""
# pylint: disable=global-statement
import logging
import hashlib
import collections
from datetime import datetime
from threading import Timer
from multiprocessing import Queue
import boto3
from botocore.exceptions import ClientError

# Define decision window length
DECISION_WINDOW_LENGTH = 10

# Attempt to create a DynamoDB table if not already created
DYNAMO_DB = boto3.resource('dynamodb', region_name='us-east-1')
TABLE_NAME = "CODetectorStats"

try:
    TABLE = DYNAMO_DB.create_table(
        TableName=TABLE_NAME,
        KeySchema=[
            {
                'AttributeName': 'Time',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'Time',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    TABLE.meta.client.get_waiter('table_exists').wait(TableName=TABLE_NAME)
except ClientError as exception:
    if exception.response['Error']['Code'] == 'ResourceInUseException':
        print("Table already created")
    else:
        raise exception

# Initialize Logger
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

# Declare aggregation data structures
UPDATE_TIMESTAMP_WINDOWS = collections.deque(maxlen=10)
CO_DETECTOR_UPDATE_QUEUES = dict()

# Define aggregation function
def aggregate_co_detector_state(hash_key, start_time, end_time):
    """Aggregate CO Detector Data"""
    global CO_DETECTOR_UPDATE_QUEUES
    global TABLE_NAME
    # Dequeue all messages in a time window and calculate average
    total_reported_ppm = 0
    total_messages = 0
    while not CO_DETECTOR_UPDATE_QUEUES[hash_key].empty():
        event = CO_DETECTOR_UPDATE_QUEUES[hash_key].get()
        ppm_value = event["current"]["state"]["desired"]["property"]
        total_reported_ppm += int(ppm_value)
        total_messages += 1
    ppm_average = total_reported_ppm / total_messages
    # Update DynamoDB
    table = DYNAMO_DB.Table(TABLE_NAME)
    table.put_item(
        Item={
            'Time':str(datetime.utcnow()),
            'StartTime': start_time,
            'EndTime': end_time,
            'TotalMessages':total_messages,
            'PPMAverage':ppm_average,
        }
    )

# Define Lambda function handler
def function_handler(event, context):
    """Handle Lambda Invocations"""
    global DECISION_WINDOW_LENGTH
    global CO_DETECTOR_UPDATE_QUEUES
    global UPDATE_TIMESTAMP_WINDOWS
    LOGGER.info("Received Event: %s. Context: %s", str(event), str(context))
    # Check if event fits into existing decision window
    for update_timestamp_window in UPDATE_TIMESTAMP_WINDOWS:
        start_time = update_timestamp_window[0]
        end_time = update_timestamp_window[1]
        if end_time >= event["current"]["metadata"]["desired"]["property"]["timestamp"] >= start_time:
            hash_string = str(start_time + end_time)
            hash_key = int(hashlib.sha256(hash_string.encode('utf-8')).hexdigest(), 16) % 10**8
            CO_DETECTOR_UPDATE_QUEUES[hash_key].put(event)
            LOGGER.info("Enqueued event into decision window: %s", str(event))
            return
    # Create new decision window and add event
    try:
        start_time = event["current"]["metadata"]["desired"]["property"]["timestamp"]
        end_time = event["current"]["metadata"]["desired"]["property"]["timestamp"] + DECISION_WINDOW_LENGTH
        UPDATE_TIMESTAMP_WINDOWS.append([start_time, end_time])
        hash_string = str(start_time + end_time)
        hash_key = int(hashlib.sha256(hash_string.encode('utf-8')).hexdigest(), 16) % 10**8
        CO_DETECTOR_UPDATE_QUEUES[hash_key] = Queue()
        CO_DETECTOR_UPDATE_QUEUES[hash_key].put(event)
        new_timer = Timer(
            DECISION_WINDOW_LENGTH,
            aggregate_co_detector_state,
            args=(hash_key,
                  start_time,
                  end_time,)
            )
        new_timer.start()
        LOGGER.info("Created new decision window and enqueued event: %s", str(event))
    except: # pylint: disable=bare-except
        LOGGER.info("Unable to Process Event: %s", str(event))
        return

Configuring the Lambda function on AWS IoT Greengrass for pre-processing

To successfully perform pre-processing, you must ensure that the Lambda function is deployed with the following configurations:

  • The Lambda function lifecycle is set to “long-lived.”
  • The Lambda function is subscribed to all device updates.

Configuring the Lambda function as “long-lived” allows you to temporarily store received shadow updates in-memory in the Lambda container before performing processing and sending them to DynamoDB.

This configuration creates a single persistent container that starts automatically when AWS IoT Greengrass Core starts, then runs indefinitely.  Any variables or pre-processing functionalities that are defined outside the function handler persist for every invocation of the function handler.

Additionally, you must make sure that the Lambda function is subscribed to all device updates.  In this example, you use device shadow updates, so configure the Lambda function as follows:

Results

This Lambda function is able to capture updates from multiple devices and provide consolidated updates to DynamoDB. The following screenshot shows some sample results from this function. Note how updates from multiple devices are averaged into a single DynamoDB entry covering a given time period:

Summary

In systems with large numbers of IoT devices, performing pre-processing at the edge can provide a number of benefits:

  • Simplification of data streams involving large numbers of devices.
  • Increased system state accuracy.
  • Reduced network traffic.
  • Reduced compute costs.

This approach is not limited to the aggregation of shadow updates.  You can expand it to any type of message in an IoT system, as well as variously complex processing logic. For instance, this example does not account for extreme outliers, any consensus among connected devices, or messages received with large time delays. However, it does provide a good general example of the benefits of edge pre-processing and message aggregation.