AWS Database Blog

Patterns for AWS IoT time series data ingestion with Amazon Timestream

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

Large-scale internet of things (IoT) applications generate data at fast rates, and many IoT implementations require data to be stored sequentially, based on date-time values generated either at sensor or at ingestion levels. Across IoT implementations in many business verticals, such as industrial, utilities, healthcare, oil and gas, logistics, consumer devices, and smart vehicles, time series data provides intrinsic operational and business value.

Time series data is unlike traditional data because it’s used to perform time-window queries across both large and small time frames, and the data is continuously appended at high rates. Amazon Timestream is a purpose-built, managed time series database that enables querying data using rolling time windows, has out-of-the-box abilities to cope with missing data, and is easily integrated with typical data processing, operational, and analytics pipelines, such as business intelligence (BI) and machine learning (ML).

This post walks through key architecture patterns and considerations around ingesting data through AWS IoT services into Timestream, and showcases several key native Timestream capabilities. It also highlights how to create analytical pipelines that take advantage of native Timestream features for quick dashboarding as well as for more advanced analytical purposes.

You might want to consider a time series database if your requirements for storing and querying the data abide by one of the following constraints:

  • You need to perform interpolation for missing data points at specific times. This can be the case when:
    • Data is not reliably collected or transmitted and there are gaps in data.
    • Your data source is using deadbanding, meaning it only emits a data point when the difference from the previous value is greater than a specific threshold.
  • You need to perform analyses across multiple data series that aren’t producing data at the same rate (frequency) or are producing data at the same rate but aren’t synchronized.The time heartbeat granularity of data can vary from seconds to hours, days, or even weeks.
  • You need to compute and consider statistical values for your data series over different periods of time, such as averages, standard deviation, percentiles, and rankings.
  • You need to retrieve data at variable levels of granularity, for example to adapt to the zoom level in a particular analytical timeline.
  • You need to develop detailed timelines for specific events, such as before an automated shutoff of an industrial or assembly line.

Key Timestream concepts

The key concepts of Timestream are as follows:

  • Time series – A sequence of data points (or records) recorded over a time interval
  • Record – A single data point in the time series
  • Dimension – An attribute that describes the metadata of the time series
  • Measure – An attribute that describes the data of the series
  • Timestamp – Every record has a timestamp, which indicates when the measure was collected or ingested
  • Table – A container for related time series with timestamp, dimensions, and measures
  • Database – A top-level container for tables

For a more detailed explanation of these concepts, see Timestream Concepts.

AWS IoT to Timestream architecture overview

The following diagram illustrates a typical architecture that you can use to develop artifacts for ingestion and consumption of IoT data with Timestream.

The following diagram illustrates a typical architecture that you can use to develop artifacts for ingestion and consumption of IoT data with Timestream.

In this post, we detail the following options of the preceding diagram:

Pattern 1: Ingesting data into Timestream using AWS IoT Greengrass

When you’re designing ingestion paths from data generated by devices or sensors connected to AWS IoT Greengrass, you have several options to take advantage of the strengths of Timestream. The actual option to consider depends on the nature of your IoT data.

When you’re ingesting low-volume (frequency) data, you can send your data from AWS IoT Greengrass to AWS IoT Core with the MQTT protocol, and ingest data into Timestream via an IoT rule action.

For high-volume IoT data, the preferred option is to use AWS IoT Greengrass stream manager. Stream manager processes data streams locally and exports them to the AWS Cloud automatically. With stream manager, you don’t need to be concerned with intermittent or limited connectivity, because it’s designed to work in such environments. Also, with the recently released AWS IoT Greengrass v2.0, you can add and remove pre-built software components like stream manager based on your specific IoT use case requirements and your device memory and CPU capabilities.

You start by ingesting data into a message stream that is configured to export data to a consuming service like AWS IoT Analytics, AWS IoT SiteWise, or Amazon Kinesis Data Streams. When your data arrives at a Kinesis data stream in the cloud, the stream is consumed and written into Timestream. To orchestrate the actual data pipeline from a Kinesis data stream, you can use an AWS Lambda function or Amazon Kinesis Data Analytics for Apache Flink.

In terms of estimating the cost options, for Lambda, you’re charged based on the number of requests and their duration (the time it takes for your code to run). Amazon Kinesis Data Analytics is priced on an hourly rate based on the average number of Kinesis Processing Units (KPUs). The actual nature of your data flow helps you choose between a Lambda function or Kinesis Data Analytics. If you have a steady flow of data, Kinesis Data Analytics might be a better option compared to Lambda. Another pattern to consider when using Kinesis Data Analytics is whether you need to interject any processing on the streaming data before writing the results to Timestream.

We don’t recommend ingesting your IoT data from AWS IoT Greengrass directly into Timestream using a Lambda function. If you do so, you also need to implement features that already exist in stream manager, such as handling intermittent, limited connectivity, or buffering (holding on to data while the transport path is re-acquired). It’s a practice to avoid recreating existing functionality that exists and meets your requirements.

Example use case

As an example, you can collect system metrics like CPU, disk, and memory usage from an AWS IoT Greengrass core. After you create a database and table in Timestream, you create a Kinesis data stream and attach a Lambda function to it. The Lambda function gets records from the data stream and writes them into a table in Timestream.

The following Lambda code snippets for Python writes data to Timestream. For this function, we have defined the database name in the variable DATABASE_GGMETRICS and the table name in TABLE_GGMETRICS. Furthermore, the Lambda function must have an AWS Identity and Access Management (IAM) role attached with permissions to write into the related Timestream table.

def lambda_handler(event, context):
    logger.debug("event:\n{}".format(json.dumps(event, indent=2)))

    records_ggmetrics = []

    try:
        c_ts_write = boto3.client('timestream-write', config=config)

        for record in event["Records"]:
            logger.debug("record: {}".format(record))
            # Kinesis data is base64 encoded so decode here
            payload = json.loads(base64.b64decode(record["kinesis"]["data"]))
            logger.info("payload: {}".format((payload)))

            if 'ggcore_name' in payload and 'region' in payload:
                logger.info("ggmetrics payload")

                if not DATABASE_GGMETRICS or not TABLE_GGMETRICS:
                    logger.warn("database or table for ggmetrics not defined: DATABASE_GGMETRICS: {} TABLE_GGMETRICS: {}".format(DATABASE_GGMETRICS, TABLE_GGMETRICS))
                    return {"status": "warn", "message": "database or table for ggmetrics not defined"}

                dimensions = [
                    {"Name": "ggcore_name", "Value": payload["ggcore_name"]},
                    {"Name": "region", "Value": payload["region"]}
                ]

                for m in ["cpu_usage_percent", "disk_usage_percent", "mem_usage_percent", "load_1m", "load_5m", "load_15m"]:
                    if m in payload:
                        record = {
                            "Dimensions": dimensions,
                            "MeasureName": m,
                            "MeasureValue": str(payload[m]),
                            "MeasureValueType": "DOUBLE",
                            "Time": str(payload["time"])
                        }
                        records_ggmetrics.append(record)
                    else:
                        logger.warn("measure \"{}\" not found in payload".format(m))
            else:
                logger.warn("unknown payload")
                return {"status": "warn", "message": "unknow payload: {}".format(payload)}

        logger.info("records_ggmetrics: {}".format(records_ggmetrics))

        ret_val = {"status": "success"}
        
        if records_ggmetrics:
            logger.info("writing greengrass metric records to database.table {}.{}".format(DATABASE_GGMETRICS, TABLE_GGMETRICS))
            response = c_ts_write.write_records(
                DatabaseName=DATABASE_GGMETRICS,
                TableName=TABLE_GGMETRICS,
                Records=records_ggmetrics,
                CommonAttributes={})
            logger.info("wrote {} records to Timestream".format(len(records_ggmetrics)))
            logger.info("response - write_records: {}".format(response))
            ret_val["records ggmetrics processed"] = "{}".format(len(records_ggmetrics))

        return ret_val

    except Exception as e:
        logger.error("{}".format(e))
        return {"status": "error", "message": "{}".format(e)}

To complete the setup, you must create a long-running Lambda function for AWS IoT Greengrass. The function runs on an AWS IoT Greengrass core. Stream manager must be enabled in your AWS IoT Greengrass group.

Upon start, the Lambda function creates a new message stream. The stream is configured by the export definition to transfer data to the Kinesis data stream that you created earlier. The variable KINESIS_STREAM_NAME must be set to your stream name in the cloud. LOCAL_STREAM_NAME refers to the name of the message stream on AWS IoT Greengrass. See the following code:

GGMETRICS_INTERVALL = os.environ.get("GGMETRICS_INTERVALL", 120)
STREAM_METRICS_INTERVALL = os.environ.get("STREAM_METRICS_INTERVALL", 60)
KINESIS_STREAM_NAME = os.environ.get("KINESIS_STREAM_NAME", None)
GGCORE_NAME = os.environ['AWS_IOT_THING_NAME']
AWS_REGION = os.environ['AWS_REGION']

LOCAL_STREAM_NAME = "MY_LOCAL_MESSAGE_STREAM_NAME"
C_STRM_MGR = None

logger.info("GGMETRICS_INTERVALL: {} STREAM_METRICS_INTERVALL: {} KINESIS_STREAM_NAME: {}".format(GGMETRICS_INTERVALL, STREAM_METRICS_INTERVALL, KINESIS_STREAM_NAME))


def setup_stream():
    global C_STRM_MGR
    stream_ready = False
    logger.info("setting up stream")

    while not stream_ready:
        try:
            logger.info("stream manager client")
            C_STRM_MGR = StreamManagerClient()
            logger.info("delete stream")
            try:
                C_STRM_MGR.delete_message_stream(stream_name=LOCAL_STREAM_NAME)
            except ResourceNotFoundException:
                pass

            exports = ExportDefinition(
                kinesis=[KinesisConfig(identifier="{}-export".format(LOCAL_STREAM_NAME),
                        kinesis_stream_name=KINESIS_STREAM_NAME,
                        batch_size=10,
                        batch_interval_millis=60000
                        )]
            )
            logger.info("create stream")
            C_STRM_MGR.create_message_stream(
                MessageStreamDefinition(
                    name=LOCAL_STREAM_NAME,
                    strategy_on_full=StrategyOnFull.OverwriteOldestData,
                    export_definition=exports
                )
            )
            logger.info("stream manager ready")
            stream_ready = True

        except Exception as e:
            logger.error("setting up stream: {}".format(e))
            time.sleep(3)

After the stream has been set up, you can start to collect system metrics. The following example code assumes that you use the psutil library to collect system metrics:

def stats_collector():
    try:
        gg_metrics = {
            "ggcore_name": GGCORE_NAME,
            "region": AWS_REGION,
            "time": "{}".format(int(round(time.time() * 1000))),
            "cpu_usage_percent": "{}".format(psutil.cpu_percent(interval=None)),
            "disk_usage_percent": "{}".format(psutil.disk_usage('/').percent),
            "mem_usage_percent": "{}".format(psutil.virtual_memory().percent),
            "load_1m": "{}".format(psutil.getloadavg()[0]),
            "load_5m": "{}".format(psutil.getloadavg()[1]),
            "load_15m": "{}".format(psutil.getloadavg()[2])
        }
        logger.info("gg_metrics: {}".format(gg_metrics))
        j_gg_metrics = json.dumps(gg_metrics)

        logger.info("appending gg_metrics to stream: {}".format(LOCAL_STREAM_NAME))
        sequence_number = C_STRM_MGR.append_message(stream_name=LOCAL_STREAM_NAME, data=j_gg_metrics.encode())
        logger.info("appended gg_metrics to stream: sequence_number: {}".format(sequence_number))

    except Exception as e:
            logger.error("appending gg_metrics to stream failed: {}".format(e))

    Timer(GGMETRICS_INTERVALL, stats_collector).start()

With a few simple queries, we can verify that data is written to Timestream:

-- select some data
SELECT * FROM DATABASE_NAME.TABLE_NAME LIMIT 20

-- how many rows are stored
SELECT COUNT(*) FROM DATABASE_NAME.TABLE_NAME

Pattern 2: Ingesting data into Timestream using an AWS IoT Core rule action

Some use cases may have a large number of memory-, CPU-, or bandwidth-constrained devices generating infrequent data and resulting in large aggregate volumes. In this situation, the devices can’t run AWS IoT Greengrass, let alone stream manager. In many cases, they’re already sending their telemetry data to AWS IoT Core via MQTT. To store this data into Timestream, you can use the Timestream AWS IoT rules action, which you can set up in a few steps. To simplify your setup you can use this AWS Cloud Formation template:

This template will create a Timestream database, a Timestream table, IoT Core, and a TopicRule. The template will also handle creating all of your roles and policies.

Once your environment is created you can clone this GitHub repository Amazon-Timestream-Tools and navigate to amazon-timestream-tools/integrations/iot_core for additional tools.

Let’s suppose your devices are generating a data payload similar to the following JSON object, which is published on the topic dt/device_id/measures:

{ 
  "dataFormat": 5, 
  "rssi": -88, 
  "temperature": 24.04, 
  "humidity": 43.605, 
  "pressure": 101082, 
  "accelerationX": 40, 
  "accelerationY": -20, 
  "accelerationZ": 1016, 
  "battery": 3007, 
  "txPower": 4, 
  "movementCounter": 219, 
  "measurementSequenceNumber": 46216 
}

The script sensordata.py from the iot_core folder will generate data in that format and send the data to the dt/device_id/measures topic. To run that script you must use python 3.6 and have boto3 installed.

Without the cloud formation template, the first step is to create a new IoT rule with the following query:

SELECT temperature, humidity, pressure FROM 'dt/device_id/measures' 

If you used the template the rule is already created for you.

This rule selects the three measures temperature, humidity, and pressure from the payload and makes them available to the selected action. In this case, we choose to use the Timestream action.

To configure the Timestream action, you must specify a database and table in the same account and Region you’re configuring the AWS IoT rules and the values for the dimensions that are associated with the measures. The dimension values can be either static or extracted from the message context, such as fields in the payload or the topic. All the IoT rules SQL functions can be used. When using a function in the action, you need to use substitution templates, which consist of enclosing the function in curly brackets as ${clientid()} or ${dataFormat} to get the value from one of the fields in the original payload.

Finally, you need to select an existing IAM role or create a new role that gives the action the necessary permission to access the Timestream table. If you let the system create a new role on your behalf, the role you use to create the IoT rule must have the correct scoped down permissions.

We have published an example on GitHub to generate and ingest data with AWS IoT Core into Timestream.

How data populates Timestream

Every field in the JSON payload resulting from the SELECT query is stored as a measure in Timestream: the measure name is equal to the field name, and the measure value is equal to the field value. For more information about how the Timestream action ingests data into Timestream, see Timestream.

The Timestream action can also ingest multiple measures but for a single timestamp. If the devices are sending aggregated messages in the payload, such as in the following example code, you need to use a Lambda function to decode the message and store the single data points via the Timestream write API.

{ 
  "sensor-id": "AAABB",
  "temperature": [
    { "ts": 122200022, "value": 24.0 }
    { "ts": 122200023, "value": 24.2 }
    { "ts": 122200024, "value": 24.3 }
    { "ts": 122200025, "value": 24.3 }
    { "ts": 122200026, "value": 24.1 }
  ] 
}

Consuming time series data

In this section, we discuss the various ways in which you can consume time series data.

Timestream console

The query editor on the Timestream console provides a simple yet powerful tool to query the data in your Timestream database. It’s useful for verifying that the data is being ingested and for trying out your SQL queries before embedding them in your solutions. After you define the query, you can use the API or any of the other integrations to get the data into your application or report.

Amazon API Gateway and AWS AppSync

If you’re making the data available to your or third-party applications, you’re likely building a REST API or a GraphQL endpoint, either with Amazon API Gateway or AWS AppSync. In both cases, you can use Lambda to perform the query to the Timestream database and return the data to the user. Both services integrate with Amazon Cognito to provide additional control on the access level of each user.

These solutions are typically used in customer-facing applications such as a companion application for an IoT solution.

Amazon SageMaker

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy ML models quickly. You can access your data from Timestream with SageMaker to run algorithms for anomaly detection, forecasting, and more.

JDBC

If you have an existing tool that supports data sources via JDBC, you can access Timestream using a JDBC driver. One example is accessing data from database tools integrated with IDEs such as IntelliJ.

Visualizing and interacting with data

In most cases, data is ingested in a time series database to provide historical analysis capabilities, but there is nevertheless the need to visualize the data in near-real time. For these two use cases, you might consider tools such as Amazon QuickSight or Grafana.

Amazon QuickSight

QuickSight is a fast, cloud-powered BI service that makes it easy to deliver insights to everyone in your organization. QuickSight is typically used to access data aggregates over time and dimensions. Typical use cases for QuickSight include providing the maximum count of devices per day or per hour, or the average temperature per day. For a more in-depth discussion of using QuickSight with Timestream, see Getting Started with Amazon Timestream and Amazon QuickSight.

Grafana

You can use Grafana to query and visualize your time series data and also create alerts. Grafana provides an easy-to-use tool to display time series data in near-real time and quickly zoom in and out of the data along the time axis. You can use Amazon Managed Service for Grafana (Preview) or install Grafana on an Amazon Elastic Compute Cloud (Amazon EC2) instance, in Amazon Elastic Container Service (Amazon ECS), on your laptop, or on an on-premises computer.

Grafana supports data sources that are storage backends for time series data. To access your data from Timestream, you need to install the Timestream plugin for Grafana.

For detailed instructions on using Timestream with Grafana, see Getting Started with Amazon Timestream and Grafana.

Conclusion

In this post, you have seen an overview of the key architectural options for IoT data ingestion into Timestream and how to develop data consumption pipelines. You can ingest time series data directly from IoT devices or from the edge by using AWS IoT Greengrass. You’ve also learned about several options to consume data from Timestream. You can visualize time series data, use persisted data with SageMaker, or integrate the data into your own custom applications with API Gateway or JDBC.

As a next step, we recommend trying our Timestream related quick starts and getting some hands-on experience with the Amazon Timestream tools and samples. If you have comments or questions about this solution, please submit them in the comments section.


About the Authors

Catalin Vieru is a Sr. Specialist Solutions Architect for Elasticache

 

 

 

 

Massimiliano Angelino is a Principal Specialist Solutions Architect for IoT

 

 

 

 

Philipp Sacha is a Principal Specialist Solutions Architect for IoT