AWS Database Blog

Build time-series applications faster with Amazon EventBridge Pipes and Timestream for LiveAnalytics

Amazon Timestream for LiveAnalytics is a fast, scalable, and serverless time-series database that makes it straightforward and cost-effective to store and analyze trillions of events per day. You can use Timestream for LiveAnalytics for use cases like monitoring hundreds of millions of Internet of Things (IoT) devices, industrial equipment, gaming sessions, streaming video sessions, financial, log analytics, and more. With Timestream for LiveAnalytics, you can ingest tens of gigabytes of time-series data per minute and run SQL queries on terabytes of time-series data in seconds with high availability.

Organizations are now looking to gain competitive advantage by providing real-time insights on their time-sensitive data. These use cases require data pipelines to support ingestion of real-time data using a data stream, and then processing and storing them in Timestream for LiveAnalytics. You might require out-of-the-box adapters such as Apache Flink Adapter, or undifferentiated yet specialized code to transfer time-series data from supported AWS sources to Timestream for LiveAnalytics.

To simplify time-series data ingestion, we launched Amazon EventBridge Pipes integration with Timestream for LiveAnalytics. Now you can ingest data from different sources, such as Amazon DynamoDB, Amazon Kinesis Data Streams, Amazon MQ, and Amazon Simple Queue Service (Amazon SQS) into Timestream for LiveAnalytics using EventBridge Pipes. With this launch, you have a flexible, low-code no-code (LCNC) configuration-based solution to ingest data into Timestream for LiveAnalytics.

EventBridge Pipes is intended for point-to-point integrations between supported sources and targets, with support for advanced transformations , enrichment, and filtering. EventBridge Pipes reduces the need of specialized knowledge and integration code when developing event-driven architectures, fostering consistency across your company’s applications.

In this post, we demonstrate how to configure EventBridge Pipes to ingest data from Kinesis Data Streams.

Solution overview

The following diagram illustrates the architecture we use in this post to configure EventBridge Pipes to ingest data from a Kinesis data stream. To explore other supported integrations, refer to Amazon EventBridge Pipes sources.

We generate sample vehicle data with the following JSON and stream it to Kinesis Data Streams using the Amazon Kinesis Data Generator (KDG). We set up pipes to ingest data from Kinesis Data Streams to Timestream for LiveAnalytics.

{
    "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss.SSS")}}",
    "vehicle_id": "CAR_{{random.number({"min":1, "max":100})}}",
    "driver_id": "USER_{{random.number({"min":1, "max":1000})}}",
    "engine_status": "{{random.number({"min":0, "max":1})}}",
    "miles": "{{random.number({"min":0, "max":1000})}}.0"   
}

Prerequisites

For this post, we created the database VehicleMetricsDB and the table VehicleMetrics. We recommend selecting a partition key based on a dimension with a high cardinality column and that is frequently used as a predicate in queries for your table based on use case. This helps distribute the data evenly across partitions and avoid performance issues. For this post, we use data generated from vehicles and use vehicle_id as customer-defined partition key while creating this table. We also created the data stream vehicle-metrics-stream.

Note: This solution creates AWS resources that incur costs on your account. Make sure to delete the resources created as part of this blog once you’re done.

Create an EventBridge pipe

Complete the following steps to set up your pipe:

  1. Open EventBridge Pipes console and choose Create pipe.
  2. Choose Create pipe.
  3. For Pipe name, enter a name for your pipe.
  4. For Description, enter an optional description.
  5. On the Build pipe tab, for Source, choose Kinesis as the source, and select the desired stream.
  6. Set the batching and concurrency values in the Additional settings section based on your requirements:
    1. On partial batch item failure – The recommendation is to choose AUTOMATIC_BISECT. It automatically halves each batch, and retries each half until all the records are processed or only failed messages remain in the batch.
    2. Batch size – Choose 100 (default). In case of Amazon SQS source, the maximum batch size is 10.
  7. To configure a target, on the Build pipe tab, choose Target.
  8. For this demo, we don’t need filtering and enrichment options, so we set up the target directly. You can choose to apply filtering and enrichment based on your requirements. If you add an enrichment layer, different services support different levels of batching.
  9. Under Details, for Target service, choose the target as Amazon Timestream for LiveAnalytics, and then choose your database and table.

If your data is a valid JSON, your input templates or JSON paths for target parameters can reference the content directly. For example, you can reference a variable using <$.data.someKey>.

  1. For Time field type, we use TIMESTAMP_FORMAT (yyyy-MM-dd HH:mm:ss.SSS) with $.data.connectionTime as the default time value. You can choose EPOCH format for Time field type based on your requirement.
  2. For Version value, you can use the latest value to update data points. Each Kinesis record includes a value, ApproximateArrivalTimestamp, which is set when a stream successfully receives and stores a record. For this post, we set the version value to $.data.approximateArrivalTimestamp to handle updates, so any measure changes with the latest ApproximateArrivalTimestamp and same dimension and time values will get updated.

In the next step, you configure the Timestream for LiveAnalytics data model (dimension, measures, measure name, and data types for required columns). For this configuration, you can use either the visual builder or JSON edit. Follow Data Modeling Best Practices to Unlock the Value of your Time-series Data for effective data modelling.

  1. For this post, we use JSON editor and provide the following information:
    1. For DimensionValue (dimensions), use vehicle_id and driver_id.
    2. For MeasureValue (measures), use miles and engine_status.
    3. For the MultiMeasureName (measure name), use the static value ‘metric’. You can choose a specific source column as multi-measure name based on your requirement, but make sure it doesn’t exceed 8,192 distinct values.
      {
        "DimensionMappings": [
          {
            "DimensionValue": "$.data.vehicle_id",
            "DimensionValueType": "VARCHAR",
            "DimensionName": "vehicle_id"
          },
          {
            "DimensionValue": "$.data.driver_id",
            "DimensionValueType": "VARCHAR",
            "DimensionName": "driver_id"
          }
        ],
        "MultiMeasureMappings": [
          {
            "MultiMeasureName": "metric",
            "MultiMeasureAttributeMappings": [
              {
                "MeasureValue": "$.data.miles",
                "MeasureValueType": "DOUBLE",
                "MultiMeasureAttributeName": "miles"
              },
              {
                "MeasureValue": "$.data.engine_status",
                "MeasureValueType": "BIGINT",
                "MultiMeasureAttributeName": "engine_status"
              }
            ]
          }
        ]
      }

You can also use Visual builder for setting up the data model. The following screenshot is an example of setting up dimensions; you can do the same for MULTI_MEASURE_NAME and MULTI_MEASURE_VALUE accordingly.

Now you can configure the pipe settings (permissions, retry policies, and Amazon CloudWatch logs).

  1. On the Pipe settings tab, in the Permissions section, you can define a new AWS Identity and Access Management (IAM) role or use an existing role. If this is your first time creating a pipe, select Create a new role for specific resource.

To improve the developer experience, EventBridge Pipes figures out the IAM role, so you don’t need to manually configure required permissions. You can let EventBridge Pipes configure least-privilege permissions for the IAM role.

If you specified a Kinesis data stream or DynamoDB stream as the pipe source, you can optionally configure a retry policy and dead-letter queue (DLQ). You can discard records older than the specified retention period and specify the number of retry attempts in case of failures.

  1. Under Retry policy, for Maximum age of the event, the recommendation is to use the minimum of 30 minutes or higher. For this demo, we set it to 30 minutes, so records older than 30 minutes will not be processed and will directly move to the DLQ.
  2. For Retry attempts, we set it to 10 times, which is recommended to help address any transient issues. For persistent issues, after the retry attempts, records will be moved to the DLQ and unblock the rest of the stream.
  3. We strongly recommend configuring the DLQ for your pipe to avoid data loss in case of misconfigurations. You can reference the Kinesis event’s sequence number in the DLQ to fix the record and resend to Timestream. To enable the DLQ, toggle on Dead-letter queue, choose the method for your choice, and choose the queue or topic you’d like to use. For this demo, we use Amazon SQS, and choose a queue in the same AWS account and AWS Region.
  4. Under Logs, choose the destinations to which you want log records to be delivered. For this post, we choose CloudWatch Logs.
  5. For Log level, choose the level of information for EventBridge to include in the log records. The ERROR log level is selected by default. For this post, we change Log level from ERROR to INFO so we can see more details.
  6. We recommend selecting the Include execution data option when the target is Timestream for LiveAnalytics, so EventBridge can include event payload information, service requests, and response information in the log records.

EventBridge Pipes run data is useful for troubleshooting and debugging. The payload field contains the actual content of each event included in the batch, enabling you to correlate individual events to a specific pipe run. However, your incoming data may contain sensitive information, and enabling this option will result in actual event data getting logged on all destinations of your choice, so make sure to make the right decision based on the sensitivity of your data.

  1. Choose Create pipe to create your pipe.

You can navigate to the specific pipe you created on the EventBridge console and wait for the pipe to enter a Running state.

Validate the solution

We streamed the data using the KDG as explained in the sample architecture. To validate this ingestion, open the Amazon Timestream for LiveAnalytics query editor and run the following SQL query:

SELECT * FROM "VehicleMetricsDB"."VehicleMetrics" limit 5

The following screenshot shows the query results.

Based on your requirement, you can run queries using the Timestream query language.

Considerations

You can also interact with EventBridge Pipes and create a pipe using the AWS Command Line Interface (CLI) (AWS CLI), AWS CloudFormation, and the AWS Cloud Development Kit (AWS CDK).

You can monitor the progress of ingestion using CloudWatch metrics. Visualize the invocations and failures on the Monitoring tab on the EventBridge console. For troubleshooting, refer to Log Amazon EventBridge Pipes.

Clean up

To avoid incurring charges, use the AWS Management Console to delete the resources that you created while running this demo:

  1. On the Timestream console, delete the Timestream database and table.
  2. On the EventBridge console, choose Pipes in the navigation pane, select the pipe you created, and choose Delete.
  3. On the Kinesis Data Streams console, choose Data Streams in the navigation pane and select the stream you created. On the Actions menu, choose Delete.
  4. On the CloudFormation console, delete the CloudFormation stack you created for the KDG.

Conclusion

In this post, we demonstrated how to integrate EventBridge Pipes to ingest data from Kinesis Data Streams.This new integration helps simplify time-series data ingestion.

For more information, refer to the following resources:

Send your feedback to AWS re:Post for Amazon Timestream or through your usual AWS Support contact.


About the Authors

Balwanth Reddy Bobilli is a Timestream Specialist Solutions Architect at AWS based out of Utah. Prior to joining AWS, he worked at Goldman Sachs as a Cloud Database Architect. He is passionate about databases and cloud computing. He has great experience in building secure, scalable, and resilient solutions in cloud, specifically with cloud databases.

Nabarun Bandyopadhyay is a Sr. Data Architect at AWS ProServe based out of New York City, NY. Nabarun enables customers in the design and implementation of data analytics applications using AWS services. He is passionate about building modern data architecture for ML and generative AI platforms.