Deliver data at scale to Amazon Managed Streaming for Apache Kafka (Amazon MSK)

with AWS IoT Core

In this tutorial, you learn how to stream IoT data on an Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster using AWS IoT Core rules. 

 
Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

 
AWS IoT Core lets you connect IoT devices to the AWS cloud without the need to provision or manage servers. AWS IoT Core can support billions of devices and trillions of messages, and can process and route those messages to AWS endpoints and to other devices reliably and securely.

In this tutorial you learn how to:

  • Set up Private Certificate Authority (CA) with AWS Certificate Manager
  • Set up an Apache Kafka cluster with Amazon MSK
  • Configure Kafka authentication and test the stream using AWS Cloud9
  • Prepare Java KeyStore and configure an AWS Secrets Manager secret
  • Configure an AWS IoT Core rule to deliver messages to the Kafka cluster
  • Set up error logging for AWS IoT Core rules and service
  • Clean up resources
About this Tutorial
Time 1 hour                                        
Cost $7
Use Case Internet of Things, Analytics
Products Amazon MSK, AWS IoT Core
Audience Developers
Level Intermediate
Last Updated April 21, 2021

Step 1: Set up Private CA with Amazon Certificate Manager

In this step, you set up a Private Certificate Authority (Private CA) with AWS Certificate Manager (ACM). This Private CA enables you to issue TLS certificates to authenticate clients to communicate with the Apache Kafka cluster.

1.1 — Open the AWS Certificate Manager console and sign in with your AWS account credentials. If you do not have an AWS account, create a new AWS account to get started.

Already have an account? Log in to your account

1.2 — Choose Get Started. Then, under Private certificate authority, choose Get started. Choose the Region drop-down and choose the AWS Region where you want to create the Private CA. This tutorial uses the US East (N. Virginia) Region.

Private CA get started

1.3 —On the Select the certificate authority type page, choose Root CA and then choose Next.

1.4 — On the Configure the certificate authority (CA) name page, enter your organization details and choose Next.

Select CA type
Configure the CA name

1.5 – On the Configure the certificate authority (CA) key algorithm page, keep the default value of RSA 2048 and choose Next.

1.6 – On the Configure certificate revocation page, keep the default settings for the Certificate revocation list (CRL), and choose Next.

Configure certificate revocation

1.7 — On the Add tags page, optionally assign your own metadata to each resource in the form of tags.

add tags

1.8 — On the Configure CA permissions page, keep the default settings, and choose Next.

Specify the CA parameters

1.9 — On the Review and create page, in the CA permissions section, verify the details of the Private CA, then select the CA permissions check box, and choose Confirm and create.

CA permissions

1.10 — On the confirmation page, choose Get started to activate the CA.

get started

1.11 — On the Specify the root CA certificate parameters page, enter the root CA certificate parameters, and choose Next.

Specify the CA parameters

1.12 — On the Review, generate, and install root CA certificate page, review your choices, and then choose Confirm and install.

Review and install CA certificate

1.13 — Once the Private CA is successfully created, choose the Status tab, and in the Details section, copy the value of the ARN.

Verify Private CA

Step 2: Set up Apache Kafka cluster with Amazon MSK

AWS IoT rules for Apache Kafka can be configured to deliver messages to Amazon MSK. In this step you will set up a new Kafka cluster with IoT rule supported client authentication settings.

2.1 — Open the Amazon MSK console and sign in with your AWS account credentials. Verify you are in the same Region as your Private CA. Then, choose Create cluster.

create cluster

2.2 — On the Create cluster page, choose Create cluster with custom settings.

2.3 — In the General section, enter a name for your cluster, and then keep the default settings for Apache Kafka version.

2.4 — In the Configuration section, select Use the MSK default configuration.

2.5 — In the Networking section, make the following selections:

  • For VPC, choose the VPC id where your Kafka cluster will be deployed.
  • For Number of Zones, choose 2.
  • For First Zone and for Second Zone, choose the Availability zones and Subnets for each zone.

2.6 — In the Brokers section, for Broker type, choose kafka.t3.small, and for Number of brokers per Zone, choose 1.

2.7 — In the Tags - optional section, optionally, add tags.

2.8 — In the Storage section, for EBS storage volume per broker choose 5 GIB.

2.9 — In the Encryption section, keep the default settings.

2.10 — In the Authentication section, choose TLS client authentication through AWS certificate manager (ACM), and in the Private Certification Authorities (CA) from AWS Certificate Manager (ACM) drop-down box, choose the Private CA you created in Step 1.

cluster configuration authentication

2.11 — In the Monitoring section, keep the default settings.

2.12 — In the Advanced settings section, keep the default settings, and choose Create cluster.

Step 3: Prepare an AWS Cloud9 instance to configure Kafka authentication and test the stream

AWS Cloud9 provides pre-installed AWS CLI and Java, which helps make the development experience much simpler. In this step you configure Kafka authentication, install Kafka tools, and then test the consumer and producer.

3.1 — Open the AWS Cloud9 console, and choose Create environment. Verify that you launched the AWS Cloud9 environment in the same VPC where the Kafka cluster is deployed.

create environment

3.2 — On the Name environment page, type a Name, and optionally provide a Description. Then, choose Next step.

name environment

3.3 — On the Configure settings page, in the Environment settings section, keep the default settings, then choose Next step.

configure settings

3.4 — On the Review page, in the Environment name and settings section, review configuration details, and choose Create environment. Then, wait for the AWS Cloud9 environment to set up and start.

review
creation

3.5 — Open the Amazon MSK console, and in the General section, verify if your created cluster is Active, and copy the Cluster ARN

Choose the View client information tab, and copy the Bootstrap servers, and ZooKeeper connection to use in the following configuration.

client information
bootstrap servers

3.6 — Open the Cloud9 IDE bash window, and run the following commands in the terminal window.

a.  Download and extract Kafka cli tools

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz

b.  Install jq to parse JSON CLI response 

sudo yum install jq –y

c.  Set up variables for CLUSTER_ARN, PRIVATE_CA_ARN, REGION_NAME, and TOPIC as per your environment.

CLUSTER_ARN="[YOUR CLUSTER ARN]"

PRIVATE_CA_ARN="[YOUR PRIVATE CA ARN]"

REGION_NAME="[YOUR REGION]"

TOPIC="AWSKafkaTutorialTopic"

d.  The following commands refer to the variables configured in the previous step, and use AWS CLI and cluster ARN to fetch the ZooKeeper string and save it as a variable to ZOOKEEPER_STRING.

ZOOKEEPER_STRING=$(aws kafka describe-cluster --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".ClusterInfo.ZookeeperConnectString") 
echo $ZOOKEEPER_STRING
cloud9

3.7 —Configure Security Group Inbound rules for your Kafka cluster.

a. Open the AWS Cloud9 console, and select the View details tab for your created environment.

view details

b. On the Environment details page, under Security groups, copy the group ID for the instance.

c. Open the Amazon MSK console, and choose your Cluster name.

select cluster name

d. On the Details tab, in the Networking section, choose the link under Security groups applied.

security group applied link

e. On the Security Groups page, choose the Inbound rules tab, and choose Edit inbound rules.

security group applied link

f. On the Edit inbound rules page, add the copied security group ID as an Inbound rule for the Kafka Security group using Port 9094 and 2181.

edit rule

3.8 —Create a new topic AWSKafkaTutorialTopic in the Kafka cluster. This topic will be used by producer and consumer to publish and subscribe messages. In the following commands  (if applicable) change the name of the topic that was set as variable in section 3.6.c.

In your AWS Cloud9 terminal, run the following commands:
cd ~/environment/kafka_2.12-2.2.1
bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_STRING --replication-factor 2 --partitions 1 --topic $TOPIC 

This will create a new topic AWSKafkaTutorialTopic. Verify that the commands returns the following message: Created topic AWSKafkaTutorialTopic.

Note: If you receive a Client Session timed out error appears, check that your cluster Security Group Inbound Rules were configured correctly.

client session
created topic

Step 4: Prepare Java KeyStore and configure AWS Secrets Manager

Java KeyStore is a used to store private keys, public keys, certificates, and secret keys. In this step you will create a Java KeyStore, create a certificate using Keytool, sign the certificate with your Private CA, and save the KeyStore to Secrets Manager.

Note: The name of Kafka directory will change with each downloaded Kafka version. Use the name of your version.


Note: The path name for java-11-amazon-corretto.x86_64 will change with your java version. Change this path to your version of installed Java. Look for the file named cacerts in jvm sub-directories and copy it to the client directory created below.

4.1 — In your AWS Cloud9 terminal, run the following commands to create a client folder to store certificate files, and copy Java KeyStore to this folder.

cd ~/environment/kafka_2.12-2.2.1/
mkdir client && cd client
cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka.client.truststore.jks  

4.2 — Run the following commands to configure the Java KeyStore with ALIAS and PASSWORD.

Note: If you change ALIAS and PASSWORD variables, you must use the same in next steps.

ALIAS="keyAlias"
PASSWORD="myPassword"

4.3 — Run the following command to use keytool to generate a key with a Common Name (CN), ALIAS, and PASSWORD. (Optional) Enter a "Distinguished-Name" to help you identify your key, you can leave the variable "Distinguished-Name".

keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $PASSWORD -keypass $PASSWORD -dname "CN=Distinguished-Name" -alias $ALIAS -storetype pkcs12

4.4 — Run the following command to create a Certificate Signing Request for the key generated in the previous step.

keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias $ALIAS -storepass $PASSWORD -keypass $PASSWORD

4.5 — Run the following command to edit the certificate created to the correct format.

sed -i 's/NEW //' client-cert-sign-request

4.6 — Run the following command to issue a Certificate Signed by your Private CA and save the Certificate ARN to a variable.

In the following command, jq (previously installed) is used to parse the JSON output from AWS CLI to a string and save in variable CERTIFICATE_ARN.

CERTIFICATE_ARN=$(aws acm-pca issue-certificate --certificate-authority-arn $PRIVATE_CA_ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS" --region $REGION_NAME | jq -r ".CertificateArn")

4.7 — Run the following command to use AWS CLI to fetch the certificate using the received Certificate ARN in the previous step. The received response is in JSON format, which is parsed by jq, and saved into a file signed-certificate-from-acm.

aws acm-pca get-certificate --certificate-authority-arn $PRIVATE_CA_ARN --certificate-arn $CERTIFICATE_ARN --region $REGION_NAME | jq -r '"\(.CertificateChain)\n\(.Certificate)"' > signed-certificate-from-acm

4.8 —  Run the following command to import the signed certificate to KeyStore.

keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias $ALIAS -storepass $PASSWORD -keypass $PASSWORD 

a. Enter Yes when prompted in the terminal.  

b. The client file should have 4 files.

  • client-cert-sign-request
  • kafka.client.keystore.jks
  • kafka.client.truststore.jks
  • signed-certificate-from-acm

 

import cert to keystore
command prompt yes

4.9 —The MSK cluster that was previously configured requires all client communication to be TLS secured. The file kafka.client.keystore.jks has to be uploaded to Secrets Manager as a SecretBinary to authenticate IoT core rule.

Run the following command to create a Secret named Kafka_Keystore in Amazon Secrets Manager.

aws secretsmanager create-secret --name Kafka_Keystore --secret-binary fileb://kafka.client.keystore.jks --region $REGION_NAME

You can verify the newly created Secret in the Secrets Manager console.

keystore
secrets manager

4.10 —Set up a Kafka consumer in a new Terminal while keeping the producer session active.

Note: A new Terminal can be opened by choosing the + symbol or alt+T in AWS Cloud9 console.

In the new Terminal session,  run the following commands to set the initial variables, and start a console producer.

REGION_NAME="[YOUR REGION]"
TOPIC="[NAME OF THE TOPIC CREATED]"
CLUSTER_ARN="[YOUR CLUSTER ARN]"
new terminal

4.11 —Run the following commands to set up Producer and Consumer in Kafka, and create a file named client.properties with the following contents. If applicable, adjust the truststore and keystore locations to the paths where you saved kafka.client.truststore.jks.

Note: If all your paths are the default values suggested in the previous steps, you don't need to modify the following commands.

cd ~/environment/kafka_2.12-2.2.1/client
sudo nano client.properties

a. Paste the following contents into the pop up window, then save and exit

security.protocol=SSL
ssl.truststore.location=client/kafka.client.truststore.jks
ssl.keystore.location=client/kafka.client.keystore.jks
ssl.keystore.password=myPassword
ssl.key.password=myPassword

b. Run the following command to verify the alignment of the previous added text is correctly aligned.  

cat client.properties

c. Run the following command to start a console producer. This tells AWS CLI to fetch the bootstrap server list for Cluster ARN and saves it as a variable. This list is used to connect to the Kafka cluster.

cd ~/environment/kafka_2.12-2.2.1
BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".BootstrapBrokerStringTls")
bin/kafka-console-producer.sh --broker-list $BOOTSTRAP_SERVER --topic $TOPIC --producer.config client/client.properties
client properties file

4.12 —Run the following commands to start a Kafka consumer session.

cd ~/environment/kafka_2.12-2.2.1
BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --region $REGION_NAME --cluster-arn $CLUSTER_ARN | jq -r ".BootstrapBrokerStringTls")
bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER --topic $TOPIC --consumer.config client/client.properties --from-beginning

You should now have a Kafka producer session and consumer session initiated.

4.13 —Type any string and press enter into the producer terminal to publish to the Kafka stream.

kafka session

Step 5: Configure AWS IoT Core rule to deliver data to Kafka stream

5.1 — Create an AWS IAM role with AWS Secrets Manager permissions to allow an AWS IoT Core rule to access the Kafka KeyStore stored in Secrets Manager.

a. Open the IAM console, choose Roles from the left navigation pane, and choose Create role.

b. For Select type of trusted entity, choose AWS service. For Choose a use case, choose EC2, then choose Next: Permissions.

 

import cert to keystore
command prompt yes

5.2 —On the Create role page, in the search box, type SecretsManagerReadWrite, select the SecretsManagerReadWrite policy, and choose Next:Tags.

keystore

5.3 — On the Create role page, in the Add tags (optional) section, add any necessary tags. Then, choose Next: Review.

add tags optional

5.4 —On the Create role page, in the Review section, for Role name, type kafkaRole and optionally add a description. Then, choose Create role.

client properties file

5.5 —Once the Role is created, choose the newly created kafkaRole. On the Summary page for the role, choose the Trust relationships tab, choose Edit trust relationship, and copy and paste the following details.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "iot.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

5.6 —Then, choose Update Trust Policy.

new terminal

5.7 —Open the AWS IoT Core console, and in the left navigation pane, choose ACT. Then, choose Rules. Choose Create a rule.

kafka session

5.8 —On the Create a rule page, type a Name and Description. In the Role query statement section, choose the SQL version, and enter a Rule query statement. Then, choose Add action.

kafka session

5.9 — On the Select an action page, choose Send a message to an Apache Kafka cluster, then, choose Configure action.

kafka session

5.10 —On the Configure action page, choose Create a VPC destination.

kafka session

5.11 — On the Create a VPC destination page, choose the same VPC ID where you created your Kafka cluster.

5.12 — Choose the Subnet IDs, and choose the Security group.

Note: Make sure you select a Security Group with access to Kafka cluster Security Group.

Select the security group settings of the EC2 instance you created previously and/ or the security group of Kafka cluster itself.

5.13 — Choose Create Role, on the Create a new role window, name the role AWSKafkaTutorialTopic. Then, choose Create role.

5.14 — Choose Create Destination.

Note: It takes 5-10 mins for the Destination to be Enabled. Once the status is Enabled, continue back with the Rule creation page.

kafka session

5.15 — In your web browser, navigate to the previously opened AWS IoT - Rules- Configuration action page. Then, choose the newly created VPC Destination from the drop down, and enter AWSKafkaTutorialTopic for the Kafka topic. Leave Key and Partition empty.

kafka session

5.16 —On the Configure action page, in the Client properties section, enter the following details.

  • Bootstrap.server = The TLS bootstrap string for Kafka cluster
  • security.protocol = SSL
  • ssl.truststore = Leave EMPTY
  • ss.truststore.password = Leave EMPTY
  • ssl.keystore = ${get_secret('Kafka_Keystore','SecretBinary','arn:aws:iam::[YOUR ROLE ARN FOR IAM]:role/kafkaRole')}
  • ssl.keystore.password = myPassword

    Note: The password is the variable you entered for myPassword, which was configured in the previous steps.

5.17 — Then, choose Add action.
kafka session

5.18 — On the Create a rule page, choose Create rule.

5.19 — Use the following procedure to test the MQTT Stream.

a. In the left navigation pane of AWS IoT Core, choose Test, and choose MQTT test client.

b. On the MQTT test client page, choose the publish to a topic tab.

c. Under Topic name, enter a name. Under Message payload, enter a message.

d. In the active AWS Cloud9 consumer session (created/opened in the previous step), you should see data published from the MQTT topic and streamed to Kafka consumer.

 

import cert to keystore

Step 6: Error logging

To help with troubleshooting errors, you can set up error logging for AWS IoT Core rules and service.

6.1 — Open the AWS IoT console, in the left navigation pane, choose Settings. Then, in the Logs section, choose Manage Logs.

a. In the Log level section, choose Debug (most verbosity) from the drop down, and choose Update

Log level settings
manage logs

6.2 —To create an IoT log level, open the IoT console, and choose Rules. Then, choose the kafkaRole.

6.3 — In the Error action section, choose Add action. Then, choose Republish message to an AWS IoT topic, and choose Configure action.

You can now view your IoT Logs in Amazon Cloudwatch, Log groups, and AWS IoT Logs.

rule-logs
error-action
republish

6.4 —For Topic, type iot/errors, choose the quality of service, and choose kafkaRole. Then, choose Add action.

You can now subscribe to the created iot/errors topic using MQTT test client and easily monitor Rule level error messages.

client properties file

Step 7: Clean up

In the following steps, you clean up the resources you created in this tutorial.

It is a best practice to delete resources that you are no longer using so that you are not continually charged for them.  

Delete Amazon MSK cluster

7.1 — Open the Amazon MSK console.

7.2 — Choose Clusters and choose the MSK cluster that you created for this tutorial.

7.3 — Choose Delete.

7.4 — Type delete to confirm and choose Delete.

Delete IoT Core rule

7.5 — Sign in to the AWS Cloud9 environment, choose Open IDE, and run the following command.

aws iot delete-topic-rule --rule-name myrule

Delete IAM role

7.6 — Open the IAM console.

7.7 — In the navigation pane, choose Roles, and then select the check box next to the role that was created for this tutorial (kafkaRole).

7.8 — Choose Delete role.

7.9 — Choose Yes, Delete to delete the IAM role.

Delete AWS Secrets Manager secret

7.10 — Open the Secrets Manager console and in the left navigation pane, choose Secrets.

7.11 — Choose the kafka_keystore secret.

7.12 — In the Secret details section, choose Delete secret.

Delete AWS Secrets Manager secret

7.13 — Open the ACM Private CA console.

7.14 — Choose Private CAs.

7.15 — Choose your private CA from the list.

7.16 — If your CA is in the ACTIVE state, you must disable it. On the Actions menu, choose Disable.

7.17 — On the Actions menu, choose Delete.

7.18 — If your CA is in the PENDING_CERTIFICATE, EXPIRED, or DISABLED state, specify a restoration period of seven to 30 days. Then choose Delete.

Note: If your private CA is not in one of these states, it can not be restored later.

7.19 — If you are certain that you want to delete the private CA, choose Permanently delete when prompted. The status of the private CA changes to DELETED.

Delete AWS Cloud9 environment

7.20 — Open the AWS Cloud9 console.

7.21 — In the top navigation bar, choose the AWS Region where the environment is located.

7.22 — In the list of environments, for the environment you want to delete, choose the title of the card for the environment.

7.23 —Choose Delete.

7.24 — In the Delete dialog box, type Delete, and then choose Delete.

Congratulations

You have configured IoT Rules to deliver messages to an Apache Kafka cluster using AWS IoT Core and Amazon MSK. You can now securely deliver MQTT messages to a highly scalable, durable, and reliable system using Apache Kafka.

Was this page helpful?

Learn how to migrate a self-managed Apache Kafka cluster

If you want to learn different ways of migrating a self-managed Apache Kafka cluster, whether on Amazon EC2 or on premises, to Amazon MSK, complete the Migration Lab.

Take a comprehensive and hands-on class that dives into Amazon MSK