AWS Database Blog

Real-time serverless data ingestion from your Kafka clusters into Amazon Timestream using Kafka Connect

Organizations require systems and mechanisms in place to gather and analyze large amounts of data as it is created, in order to get insights and respond in real time. Stream processing data technologies enable organizations to ingest data as it is created, process it, and analyze it as soon as it is accessible. In this post, we show you how to stream events from your Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters to Amazon Timestream tables in real time using the Timestream Sink Connector, without you having to manage the infrastructure

You can securely scale your streaming data platform with hundreds and thousands of Kafka clusters using Amazon MSK, a fully managed service to build and run applications to process streaming data, which simplifies the setup, scaling, and management of clusters running Kafka. You can move your data between clusters and downstream systems such as databases, data lakes, file systems, and search indexes for further analytics and processing.

Amazon Timestream is a fast, scalable, and serverless time-series database service that makes it easier to store and analyze trillions of events per day for use cases like monitoring hundreds of millions of Internet of Things (IoT) devices, industrial equipment, gaming sessions, streaming video sessions, and more.

The following architectural patterns are available for real-time streaming of events from MSK clusters to Timestream database tables:

  • AWS Lambda AWS Lambda is a serverless, event-driven compute service that can be triggered by a variety of AWS events, including events from Amazon MSK and writing records to Timestream tables. For more details, refer to Network Access Patterns of AWS Lambda for Confluent Cloud.
  • Kafka Connect – This open-source framework for connecting Kafka clusters with external systems makes it straightforward for developers to stream data to and from their Apache Kafka clusters. MSK Connect enables you to deploy, monitor, and automatically scale connectors that move data between your MSK or Kafka clusters and external systems.

In this post, we dive deep into the Kafka Connect pattern, where you can stream events from an MSK cluster to a Timestream table using the Timestream Sink Connector. You can find the source code of the connector in the GitHub repo.

The connector helps you perform real-time analysis on time-series events streaming from your MSK clusters for a wide range of usecases, including real-time bidding, tracking and reporting impressions in ad-tech services, analyzing player actions, and establishing leaderboards in online gaming services, live tracking of flights, buses and cabs in mobility and logistics services, clickstream analysis in eCommerce and more.

Solution overview

In this section, we discuss the solution architecture to build an end-to-end pipeline, starting with an Amazon Elastic Compute Cloud (Amazon EC2) that generates data to the target Timestream table where the data gets ingested for further analysis.

When the Timestream sink connector is deployed in MSK Connect, it loads the target Timestream table schema definition from the configured Amazon Simple Storage Service (Amazon S3) bucket. The connector uses the schema definition to validate the incoming messages from the Kafka topic before inserting them as records to the Timestream table.

The data flow starts with an Amazon EC2, Kafka producer instance that writes records to a Kafka topic. As data arrives, an instance of the Timestream Sink Connector for Apache Kafka writes the data to a Timestream table as shown in the following diagram.

The data flow starts with an Amazon EC2, Kafka producer instance that writes records to an MSK topic. As data arrives, an instance of the Timestream Sink Connector for Apache Kafka writes the data to a Timestream table

The connector uses VPC endpoints (powered by AWS PrivateLink) to securely connect to Timestream and S3 so that the traffic between your VPC and the connected services doesn’t leave the Amazon network, as shown in the following solution architecture diagram.

The solution architecture displays the data flow from an EC2 instance, to an MSK Topic and then to the targeted Timestream table. The connector uses VPC endpoints to securely connect to AWS services so that the traffic between your VPC and the connected services doesn’t leave the Amazon network.

This post walks you through the following steps:

  1. Provision AWS resources using an AWS CloudFormation template, which enables you to model and manage infrastructure resources in an automated and secure manner.
  2. Configure the provisioned EC2 instance as a Kafka producer client.
  3. Configure MSK Connect for deploying the Timestream Sink Connector.
  4. Publish messages to the Kafka topic.
  5. Validate if data is written to the Timestream table.

I. Provision AWS resources

We use a CloudFormation template to provision the solution resources, such as a VPC, subnets, an MSK cluster spanning across three Availability Zones, an EC2 instance, VPC endpoints, a Timestream database and a table and more. Choose Launch Stack to provision the resources in an AWS Region where Timestream is available.

Launch stack

Note: This solution creates AWS resources that incur costs on your account, make sure you delete the stack once you’re done.

It takes 15–20 minutes for the stack to complete, after which you can find the following details on the stack’s Outputs tab on the AWS CloudFormation console:

  • KafkaPublisherEC2InstanceId– The instance ID of the EC2 instance that acts as the Kafka client and publisher
  • MSKClusterArn – The ARN of the MSK cluster created
  • MSKConnectCWLogGroupArn – The Amazon CloudWatch log group created for the Timestream Sink Connector
  • MSKConnectIAMRoleArn – The AWS Identity and Access Management (IAM) role created for the Timestream Sink Connector
  • MSKKafkaVersion – The MSK cluster’s Kafka version
  • S3BucketName – Name of the S3 bucket that stores the MSK Connect custom plugins and artifacts
  • S3KeyCSVData – An S3 key for the sample data to be published as messages to the MSK topic
  • S3KeyJMXTestPlan – An S3 key for the JMeter file that is used to publish the messages to the MSK topic
  • S3URIPlugin – An S3 URI for the custom plugin code JAR file
  • TimestreamDatabaseName – The Timestream database name
  • TimestreamIngestionVpcEndpoint – The Timestream ingestion VPC endpoint
  • TimestreamTableName – The Timestream table where the ingested data will be stored

II. Configure the Kafka producer

Next, let’s configure the provisioned EC2 instance that acts as the Kafka producer. We use this instance to create a topic and then publish messages into the topic using Apache JMeter, an open-source application designed to load test-functional behavior, measure performance and more.

To connect to the EC2 instance, we use Session Manager, a capability of AWS Systems Manager. See Connect to your Linux instance with AWS Systems Manager Session Manager for more details.

  1. On the Amazon EC2 console, in the navigation pane, choose Instances.
  2. Select the instance referred by KafkaPublisherEC2InstanceId in the stack’s outputs.
  3. Choose Connect.
  4. Choose Session Manager for Connection method, then choose Connect.

Create a Kafka topic

In this step, we create a Kafka topic in the MSK cluster that is referred by MSKClusterArn with the version referred by MSKKafkaVersion.

  1. In the browser-based SSH shell, run the following command:
    sudo -u ec2-user -i
  2. Create a topic in the provisioned MSK cluster.

Configure JMeter

In this section, we use Apache JMeter to publish thousands of messages to the Kafka topic that we created.

  1. Download and extract the jmeter application:
    cd ~
    wget https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.2.tgz
    tar -xf apache-jmeter-5.6.2.tgz
  2. Verify that jmeter is installed:
    cd apache-jmeter-5.6.2/bin/
    ./jmeter -v

    The output should look like the following screenshot.

    The screenshot displays that Apache jMeter is installed correctly.

  3. Download the kafka-clients.jar file, which contains classes for working with Kafka, and place it in the jmeter /lib/ext directory:
    cd ~
    cd apache-jmeter-5.6.2/lib/ext 
    wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/{MSKKafkaVersion}/kafka-clients-{MSKKafkaVersion}.jar
  4. Download the msk-iam-auth-all.jar file, which contains classes for working with Amazon MSK that have been configured with IAM authentication, and place it in the jmeter /lib/ext directory:
    cd ~ 
    cd apache-jmeter-5.6.2/lib/ext 
    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar

III. Deploy the Timestream Sink Connector

Next, let’s show you how to deploy the Timestream Sink Connector in MSK Connect.

Configure the Amazon MSK custom plugin

In this step, we create a custom plugin, an AWS resource that contains the code that defines the connector logic. Note that the code is bundled as a JAR file called kafka-connector-timestream-1.0-SNAPSHOT-jar-with-dependencies.jar and is uploaded to the provisioned S3 bucket referred by S3BucketName.

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane.
  2. Choose Create custom plugin.
  3. For S3 URI, enter the connector file’s S3 URI, referred by S3URIPlugin.
  4. Give it a name and choose Create custom plugin.

Configure the Amazon MSK worker configuration

A worker is a Java virtual machine (JVM) process that runs the connector logic. Next, we create a custom worker configuration, to use JSON based convertor instead of String based convertor that comes with the default worker configuration.

  1. On the Amazon MSK console, choose Worker configurations in the navigation pane.
  2. Choose Create worker configuration.
  3. In the Worker configuration, paste the below provided configuration.
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter 
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
  4. Give it a name and choose Create worker configuration.

Configure the Timestream Sink Connector

Now it’s time to configure the connector using the custom plugin that we created before.

  1. On the Amazon MSK console, choose Connectors in the navigation pane.
  2. Choose Create connector.
  3. Choose the plugin you created before and choose Next.
  4. Enter name of your choice (for example, use msk-timestream-connector)
  5. Choose the provisioned MSK cluster.
  6. Enter your connector configuration after replacing the values with respective configuration as follows.
    #Kafka connect configurations 
    connector.class=software.amazon.timestream.TimestreamSinkConnector 
    tasks.max=2 
    topics=<Topic that you created> 
    aws.region=<AWS Region> 
    timestream.ingestion.endpoint=<TimestreamIngestionVpcEndpoint>
    timestream.schema.s3.bucket.name=<S3BucketName>
    timestream.schema.s3.key=purchase_history.json
    timestream.database.name=<TimestreamDatabaseName>
    timestream.table.name=<TimestreamTableName>
  7. Under Worker configuration, select Use a custom configuration, select the custom worker configuration that you have created in the previous step.
  8. Under Access permissions, choose the provisioned IAM role referred by MSKConnectIAMRoleArn, then choose Next.
  9. In the Logs section, select Deliver to Amazon CloudWatch Logs and update Log Group Arn with the value referred by MSKConnectCWLogGroupArn, then choose Next.
  10. Review all settings and choose Create connector.
    The connector creation takes 5–10 minutes to complete. The pipeline is ready, when its status changes to Running as shown in the following screenshot.

The screenshot shows that the connector status changes to Running once it is active

IV. Publish messages

Now let’s publish messages to the created Kafka topic.

  1. Connect to the Kafka publisher EC2 instance using Systems Manager.
  2. In the browser-based SSH shell, run the following command:
    sudo -u ec2-user -i
  3. Set the bootstrap server URL and the created topic name in the environment variables by running the following command:
    export BOOTSTRAP_SERVER=<BootstrapServerString>
    export TOPIC=<Created Topic>

    See getting the bootstrap brokers for the bootstrap brokers server URL

    Important: You will get three endpoints for each of the brokers and you only need one broker endpoint for this step, as shown in the following sample command. Using the MSK bootstrap server connection string with more then one will result into connection error.

    export BOOTSTRAP_SERVER=b-3.mskcluster123456.9470qv.c4.kafka.eu-west-1.amazonaws.com:9098
    export TOPIC=purchase-history
  4. Download the sample data file with the key referred by S3KeyCSVData and the JMeter test plan file S3KeyJMXTestPlan from the S3 bucket referred by S3BucketName, which will be used for publishing the messages:
    cd ~ 
    cd apache-jmeter-5.6.2/bin/examples 
    mkdir timeseries 
    cd timeseries 
    aws s3api get-object --bucket <S3BucketName> --key <S3KeyCSVData> <S3KeyCSVData> 
    aws s3api get-object --bucket <S3BucketName> --key <S3KeyJMXTestPlan> <S3KeyJMXTestPlan>
  5. Verify if the Kafka publisher EC2 instance has all the prerequisites before executing the test plan.- The client library files (kafka-clients-{MSKKafkaVersion}.jar and aws-msk-iam-auth-1.1.9-all.jar) are present in jMeter classpath by executing the following command:
    cd ~ 
    ls apache-jmeter-5.6.2/lib/ext

    The screenshot displays that the jars - kaka-clients and aws-msk-iam-auth are downloaded properly at apache-jmeter-5.6.2/lib/ext

    • The jMeter test plan (S3KeyJMXTestPlan) and sample data (S3KeyCSVData) files are present in the examples/timeseries directory by executing the following command:
      cd ~ 
      ls apache-jmeter-5.6.2/bin/examples/timeseries
    • The jMeter test plan takes BOOTSTRAP_SERVER and TOPIC as command line arguments. So, make sure the environment variables BOOTSTRAP_SERVER and TOPIC are set by executing the following command
      echo $BOOTSTRAP_SERVER 
      echo $TOPIC
  6. Run the downloaded test plan
    cd ~ 
    cd apache-jmeter-5.6.2/bin 
    ./jmeter.sh -n -t examples/timeseries/<S3KeyJMXTestPlan> -Jtopic=$TOPIC -Jbootstrap_server=$BOOTSTRAP_SERVER

    The jMeter test plan reads the records from the sample csv file and then publishes them as messages in the Kafka topic. While it gets executed, a summary is printed on the console, similar to the following:

    [ec2-user@ip-10-0-0-165 bin]$ ./jmeter.sh -n -t examples/timeseries/Purchase_History_Publishing.jmx -Jtopic=$TOPIC -Jbootstrap_server=$BOOTSTRAP_SERVER
    WARN StatusConsoleListener The use of package scanning to locate plugins is deprecated and will be removed in a future release
    Creating summariser <summary>
    Created the tree successfully using examples/timeseries/Purchase_History_Publishing.jmx
    Starting standalone test @ 2023 Dec 29 06:54:31 UTC (1703832871222)
    Waiting for possible Shutdown/StopTestNow/HeapDump/ThreadDump message on port 4445
    Warning: Nashorn engine is planned to be removed from a future JDK release
    summary + 196 in 00:00:25 = 7.7/s Avg: 121 Min: 78 Max: 1702 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary + 351 in 00:00:30 = 11.7/s Avg: 84 Min: 64 Max: 152 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 547 in 00:00:56 = 9.9/s Avg: 98 Min: 64 Max: 1702 Err: 0 (0.00%)
    summary + 389 in 00:00:30 = 13.0/s Avg: 76 Min: 63 Max: 177 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 936 in 00:01:26 = 10.9/s Avg: 89 Min: 63 Max: 1702 Err: 0 (0.00%)
    summary + 407 in 00:00:30 = 13.5/s Avg: 73 Min: 61 Max: 146 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 1343 in 00:01:56 = 11.6/s Avg: 84 Min: 61 Max: 1702 Err: 0 (0.00%)
    summary + 410 in 00:00:30 = 13.7/s Avg: 72 Min: 60 Max: 139 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 1753 in 00:02:26 = 12.0/s Avg: 81 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 423 in 00:00:30 = 14.1/s Avg: 70 Min: 60 Max: 144 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 2176 in 00:02:55 = 12.4/s Avg: 79 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 428 in 00:00:30 = 14.3/s Avg: 69 Min: 60 Max: 122 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 2604 in 00:03:26 = 12.7/s Avg: 77 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 423 in 00:00:30 = 14.1/s Avg: 70 Min: 60 Max: 160 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 3027 in 00:03:56 = 12.9/s Avg: 76 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 412 in 00:00:30 = 13.8/s Avg: 72 Min: 60 Max: 184 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 3439 in 00:04:25 = 13.0/s Avg: 76 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 388 in 00:00:30 = 12.9/s Avg: 77 Min: 61 Max: 155 Err: 0 (0.00%) Active: 1 Started: 1 Finished: 0
    summary = 3827 in 00:04:56 = 12.9/s Avg: 76 Min: 60 Max: 1702 Err: 0 (0.00%)
    summary + 56 in 00:00:05 = 12.3/s Avg: 80 Min: 62 Max: 187 Err: 0 (0.00%) Active: 0 Started: 1 Finished: 1
    summary = 3883 in 00:05:00 = 12.9/s Avg: 76 Min: 60 Max: 1702 Err: 0 (0.00%)
    Tidying up ... @ 2023 Dec 29 06:59:34 UTC (1703833174620)
    ... end of run
    [ec2-user@ip-10-0-0-165 bin]$

V. Verify the Timestream Sink Connector

Next, let’s verify if the published messages are written as time-series data in the targeted Timestream table referred by TimestreamTableName.

  1. On the Timestream console, in the navigation pane, choose Query editor.
  2. In the left pane, choose the database kafkastream that you provisioned.
  3. In the left pane, choose the table purchase-history that you provisioned.
  4. In the query editor, run the following query:
    -- To see the latest 10 rows in the table, run: 
    SELECT * FROM "kafkastream"."purchase-history" 
    WHERE time BETWEEN AGO(1d) AND NOW() 
    AND "user_id" in ('68056','536099','316223') -- Example user_id values ORDER BY time DESC LIMIT 1

    The output of the query should look like the following screenshot.

The screenshot shows the query results from the target Timestream table

Troubleshooting

The following information can help you troubleshoot problems that you might have while publishing messages using JMeter, or in the connector.

  1. Watch out for any issues thrown in the console while executing the JMeter test plan
    – If you see either of the messages mentioned below, then you might have missed setting up the environment variables namely TOPIC and BOOTSTRAP_SERVER. Make sure you set only one of the bootstrap server URLs along with the port in the BOOTSTRAP_SERVER environment variable.

    ERROR: topic is not passed in the arguments
    ERROR: bootstrapServer is not passed in the arguments

    – If you see the message mentioned below, then the variable BOOTSTRAP_SERVER is supplied but is not valid. Provide only one of the bootstrap server URLs in the mentioned format.

    ERROR: bootstrapServer is NOT valid. It must be of format HOSTNAME:PORT
  2. Check apache-jmeter-5.6.2/bin/jmeter.log log file if you find issues while publishing the messages.
  3. Check CloudWatch Logs referred by MSKConnectCWLogGroupArn for any configuration validation errors or any runtime exceptions thrown. For example, you might receive error message(s) with details as mentioned below
    Unable to convert the sink record, check if you have configured the JsonConverter in the worker configuration: You get this error when the connector uses the default worker configuration with StringConverter. Create a custom worker configuration with JsonConverter as mentioned in the section – configure the Amazon MSK worker configuration.
    invalid.timestream.ingestion.endpoint: Given Timestream ingestion endpoint [] is not a valid URI: You get this error when the ingestion endpoint is not a valid URI.
  4. Check the below troubleshooting guides for further details.
    How do I troubleshoot errors when I’m trying to create a connector using Amazon MSK Connect?
    Troubleshooting Amazon MSK Connect

Clean up

After you complete testing, make sure you delete the Timestream Sink Connector and the custom plugin from the MSK console and the CloudFormation stack that you created in the Region that you have chosen to clean up the AWS resources you created.

Conclusion

In this post, we showed you how you can stream events from your Amazon MSK clusters to Amazon Timestream using the Timestream sink connector that helps you further analyse your time-series data. We also showed how you can get your Apache jMeter based Kafka producer client authenticated with your MSK cluster that has IAM authentication enabled using Simple Authentication and Security Layer (SASL) mechanism. To learn more about how IAM access control for MSK works, see Amazon MSK Library for AWS Identity and Access Management. Try out this solution to ingest data from MSK clusters to your Timestream tables and share your comments.


About the Authors

Author Kayalvizhi KandasamyKayalvizhi Kandasamy works with digital-native companies to support their innovation. As a Senior Solutions Architect (APAC) at Amazon Web Services, she uses her experience to help people bring their ideas to life, focusing primarily on microservice and serverless architectures and cloud-native solutions using AWS services. Outside of work, she is a FIDE rated chess player, and coaches her daughters the art of playing chess.

Norbert Funke is a Sr. Timestream Specialist Solutions Architect at AWS based out of New York. Outside of work, he is a Master in Tae Kwon Do martial arts and teaches children and adults self-defense.