AWS Partner Network (APN) Blog

Analyze Streaming Data from Amazon Managed Streaming for Apache Kafka Using Snowflake 

By Srinivas Kesanapally, Partner Solution Architect at AWS
By Issac Kunen, Product Manager at Snowflake Computing

Snowflake-Logo-1
Snowflake-APN-Badge-1.1
Connect with Snowflake-1

In modern data workflows, businesses are interested in learning about end users’ experiences and feedback in near real-time, and then acting on it expeditiously for better business outcomes and user experiences.

When streaming data comes in from a variety of sources—such as smartphones, Internet of Things (IoT) devices, or ad-click data—organizations should have the capability to ingest this data quickly and join it with other relevant business data to derive insights and provide positive experiences to customers.

In this post, we explain how organizations can build and run a fully managed Apache Kafka-compatible Amazon MSK to ingest streaming data. We’ll also explore how to use a Kafka connect application to persist this data to Snowflake, enabling businesses to derive near real-time insights into end users’ experiences and feedback.

Amazon MSK is a fully-managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications.

With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning (ML) and analytics applications.

Snowflake Computing is an AWS Partner Network (APN) Advanced Technology Partner with AWS Competencies in Data & Analytics, Machine Learning, and Retail. A modern cloud data warehouse, Snowflake has built an enterprise-class SQL data warehouse designed for the cloud and today’s data.

Data Flow

The data flow from an Amazon MSK cluster to Snowflake data warehouse using Snowflake connector is as follows. Ingest flow for Kafka with Snowflake connector is shown in Figure 1.

  1. One or more applications publish JSON or Avro records to an Amazon MSK cluster. The records are split into one or more topic partitions.
    .
    If you use Avro records, read the documentation on Installing and Configuring the Kafka Connector to install and configure Avro parser and schema registry.
    .
  2. The Snowflake connector for Kafka buffers messages from the Kafka topics. In this post, the connector is configured to run on a separate Amazon Elastic Compute Cloud (Amazon EC2) instance in a different AWS Availability Zone.
    .
    When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in the internal stage. It triggers Snowpipe to ingest the temporary file; Snowpipe copies a pointer to the data file into a queue.
    .
  3. A Snowflake-managed virtual warehouse loads data from the staged file into the target table (for example, the table specified in the configuration file for the topic) via the pipe created for the Kafka topic partition.
    .
  4. Not shown in Figure 1: The connector monitors Snowpipe and deletes each file in the internal stage after confirming the file data was loaded into the table. If a failure prevented the data from loading, the connector moves the file into the table stage and produces an error message.
    .
  5. The connector repeats steps 2-4 above.

Snowflake-Amazon-MSK-Kafka-1

Figure 1 – Ingest flow for Kafka with Snowflake connector.

Architecture

This post explains how to ingest real-time streaming data into Amazon MSK and persists it to Snowflake using the following architecture.

Snowflake-Amazon-MSK-Kafka-2

Figure 2 – Amazon MSK architecture with Snowflake.

You will build the Amazon MSK cluster using the provided AWS CloudFormation template. For simplicity, we are running a single cluster in one region across three AWS Availability Zones in three different subnets.

In the same region and same virtual private cloud (VPC), but in a different AWS Availability Zone, you will install a standalone Kafka Connect instance. This instance will be used to:

  • Create a topic in the MSK cluster.
  • Write test data to the Kafka topic from a producer.
  • Push test data to Snowflake.

The following steps walk you through configuring the cluster with Snowflake connectivity and generating test data that gets ingested into Snowflake.

Deploy the Amazon MSK Cluster Using AWS CloudFormation

To get started, launch the AWS CloudFormation template:

Launch Stack

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then, enter the appropriate stack and key names to log in to the Kafka Connect instance and SSH location.

Snowflake-Amazon-MSK-Kafka-3

Choose the defaults on the next two pages, acknowledge that CloudFormation is going to create resources in your account and click Next.

The CloudFormation template creates the following resources in your AWS account:

  • Amazon MSK cluster in private subnet.
  • Kafka Connect instance in public subnet.
  • One VPC.
  • Three private subnets and a private subnet.

Ensure the CloudFormation template ran successfully. Note the output values for the KafkaClientEc2Instance and for the MSKClusterARN, which we’ll use in the following steps to configure the Snowflake connector and Kafka topic.

The rest of the steps in the post will enable you to:

  • Configure a Kafka connect instance with connectivity to MSK cluster.
  • Configure connection to Snowflake database.
  • Install Snowflake client tool, Snowsql, to check the connectivity to Snowflake from Kafka connect instance and configure secure private key based authentication to Snowflake.

Login to Kafka Connect Instance

Login to the Kafka Connect instance using the Kafka Connect instance value, KafkaClientEC2InstancePublicDNS, from the CloudFormation template’s output value.

	ssh -i "<local key file>" <instance name>
	[replace local key file with your key file and instance name with instance name obtained from the previous step]

Next, we’ll create a Kafka topic from Kafka Connect instance.

Get Zookeeper information using the MSKClusterArn from the CloudFormation template. A Kafka topic is created via Zookeeper.

aws kafka describe-cluster --region us-east-1 --cluster-arn <kafka cluster arn value from step 1> | grep -i ZookeeperConnectString

Using the Zookeeper connection, create a Kafka topic called MSKSnowflakeTestTopic.

/home/ec2-user/kafka/kafka_2.12-2.4.0/bin/kafka-topics.sh –-create –-zookeeper < ZookeeperConnectString value from previous step> --replication-factor 3 --partitions 1 --topic MSKSnowflakeTestTopic
The following will be the output upon successful completion:
Created topic MSKSnowflakeTestTopic

Now, we’ll configure the Snowflake environment in the Kafka Connect instance.

sudo su -  
# Install Maven
cd /opt
wget https://www.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar xzf apache-maven-3.6.3-bin.tar.gz
ln -s apache-maven-3.6.3 maven

Fetch the Snowflake jar files.

#snowflake-kafka connector jar file
/opt/maven/bin/mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=com.snowflake:snowflake-kafka-connector:1.1.0:jar

#bouncycastle libraries are used to decrypt the private keys
/opt/maven/bin/mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.bouncycastle:bcprov-jdk15on:1.64:jar
/opt/maven/bin/mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.bouncycastle:bc-fips:1.0.1:jar

/opt/maven/bin/mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.bouncycastle:bcpkix-fips:1.0.3:jar

#if the input data format is of type Avro , the following jar is needed
/opt/maven/bin/mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.apache.avro:avro:1.9.1:jar

Copy the Snowflake jar files into the Kafka Connect’s lib folder.

cp /root/.m2/repository/com/snowflake/snowflake-kafka-connector/1.1.0/snowflake-kafka-connector-1.1.0.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

cp /root/.m2/repository/net/snowflake/snowflake-ingest-sdk/0.9.6/snowflake-ingest-sdk-0.9.6.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

cp /root/.m2/repository/net/snowflake/snowflake-jdbc/3.11.1/snowflake-jdbc-3.11.1.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

            cp /root/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.64/bcprov-jdk15on-1.64.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

		   cp /root/.m2/repository/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

cp /root/.m2/repository/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar /home/ec2-user/kafka/kafka_2.12-2.4.0/libs/

Verify the files are present in the /home/ec2-user/kafka/kafka_2.12-2.4.0/libs folder.

Snowflake-Amazon-MSK-Kafka-4

Generate public and private keys to connect to Snowflake using openssl. Enter a password at the prompt, and for the public key use the password entered during private key generation.

#generate private key
[root@ip-10-0-0-34 ec2-user]# openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
Generating RSA private key, 2048 bit long modulus
Enter Encryption Password:
Verifying - Enter Encryption Password:

#generate public key
[root@ip-10-0-0-34 ec2-user]# openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Enter pass phrase for rsa_key.p8:

Create a Snowflake connector configuration file and add the following information:

Please note that, this steps requires an active account with snowflake, snowflake db and schema name. This file has sensitive information and need to be secured with proper permissions.
echo  "name=XYZCompanySensorData" >> .snowflake-connection.properties
echo "connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector" >> .snowflake-connection.properties
echo "tasks.max=8" >> .snowflake-connection.properties
echo "buffer.count.records=100" >> .snowflake-connection.properties
echo "buffer.flush.time=60" >> .snowflake-connection.properties
echo "buffer.size.bytes=65536" >> .snowflake-connection.properties

#replace snowflake account name with your snowflake account
echo "snowflake.url.name=<snowflake account name>.snowflakecomputing.com" >> .snowflake-connection.properties

#replace snowflake user name with your snowflake user name
echo "snowflake.user.name=<snowflake user name>" >> .snowflake-connection.properties

#replace snowflake database name with your snowflake database name
echo "snowflake.database.name=<SnowflakeDBName>" >> .snowflake-connection.properties

#replace snowflake schema name with your snowflake schema name
echo "snowflake.schema.name=<SnowflakeDBSchema>" >> .snowflake-connection.properties

echo "key.converter=org.apache.kafka.connect.storage.StringConverter" >> .snowflake-connection.properties

echo "value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter" >> .snowflake-connection.properties

#The following step copies snowflake private key with header and footer information removed into .snowflake-connection.properties file

SNOWFLAKE_PRIVATE_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' /home/ec2-user/rsa_key.p8`|tr -d ' ') 
echo "snowflake.private.key=$SNOWFLAKE_PRIVATE_KEY" >> .snowflake-connection.properties


		# replace it with your private key password here
echo "snowflake.private.key.passphrase=<private key pass phrase>" >> .snowflake-connection.properties
#topic to subscribe to
echo "topics=MSKSnowflakeTestTopic" >> .snowflake-connection.properties

Install the Snowflake client, snowsql. This tool is needed to log in to Snowflake from a Kafka Connect instance and configure private key-based authentication.

cd /home/ec2-user
#get snowsql binary
curl -O https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.3-linux_x86_64.bash

#install snowsql
bash snowsql-1.2.3-linux_x86_64.bash

#create snowsql client config file
echo [connections] > /home/ec2-user/.snowsql.config
#either edit the file to enter password = your password
#or run the following command if your password does not #have special characters
echo "password=<enter your snowflake password>" >> /home/ec2-user/.snowsql.config

Log in to Snowflake using snowsql and set up the public key on the user’s account.

#run the following commands to parse the public key to remove the header and footer information and remove any new lines in the key.
SNOWFLAKE_PUBLIC_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' /home/ec2-user/rsa_key.pub`|tr -d ' ')

/root/bin/snowsql -a <snowflake account name> -u <snowflake user name> -q "alter user <snowflake user name> set rsa_public_key='$SNOWFLAKE_PUBLIC_KEY'" -r ACCOUNTADMIN
Output should be:
+----------------------------------+                                            
| status                           |
|----------------------------------|
| Statement executed successfully. |
+----------------------------------+
1 Row(s) produced. Time Elapsed: 0.170s

Test Snowflake access using the private key.

[root@ip-10-0-0-34 ec2-user]#  /root/bin/snowsql -a <snowflake account name> -u <snowflake user name> --private-key-path=/home/ec2-user/rsa_key.p8
Private Key Passphrase: 
* SnowSQL * v1.2.4
Type SQL statements or !help

Edit the Kafka config file /home/ec2-user/kafka/kafka_2.12-2.4.0/config/connect-standalone.properties, and update bootstrap.servers property to the MSK cluster nodes running in your account.

sudo su - ec2-user
aws kafka get-bootstrap-brokers --region=us-east-1 --cluster-arn <your msk cluster arn>

#It will have two output values:
{
"BootstrapBrokerString": "<server nodes>",
"BootstrapBrokerStringTls": "<server nodes>"
}

#Since, we are using PLAINTEXT authentication from Kafkaconnect instance to Kafka cluster, Use the value of BootstrapBrokerString.

Start the Kafka Connect instance.

/home/ec2-user/kafka/kafka_2.12-2.4.0/bin/connect-standalone.sh /home/ec2-user/kafka/kafka_2.12-2.4.0/config/connect-standalone.properties /home/ec2-user/.snowflake-connection.properties
	
			#upon successful completion of the start operation , the output will 
#look as follows:
[2020-02-07 05:46:30,535] INFO 
[SF_KAFKA_CONNECTOR] SnowflakeSinkTask[ID:7]:put 0 records (com.snowflake.kafka.connector.SnowflakeSinkTask:200)

Start generating test data from a producer running on the Kafka Connect instance. Open another terminal window connection to the Kafka Connect instance, and run the following commands.

#Please ensure that correct Java version is used while running the following cp command. Currently, the Java version that came with the Amazon EC2 instance is #java-1.8.0-openjdk-1.8.0.222.b10-0.amzn2.0.1.x86_64. You can cd to #/usr/lib/jvm folder to check on the correct version.

cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.amzn2.0.1.x86_64/jre/lib/security/cacerts /home/ec2-user/kafka.client.truststore.jks

echo "ssl.truststore.location=/home/ec2-user/kafka.client.truststore.jks" >> /home/ec2-user/kafka/kafka_2.12-2.4.0/bin/client.properties

echo "security.protocol=ssl" >> /home/ec2-user/kafka/kafka_2.12-2.4.0/bin/client.properties

#The following command will start a producer. At the prompt, send some test traffic. Replace bootstrapbrokerstring with the correct value.
/home/ec2-user/kafka/kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list <BootstrapBrokerString>  /home/ec2-user/kafka/kafka_2.12-2.4.0/bin/client.properties --topic MSKSnowflakeTestTopic

For ex: 
>{"temperature":"45.6"}
>{"Humidity":"80%"}

Finally, verify that data is arriving in Snowflake by logging into your Snowflake account and verifying if a table with the same name as the Kafka topic MSKSnowflakeTestTopic exists in the database KAFKA_DB.

You can also log into Snowflake using snowsql client from the Kafka Connect instance by opening another terminal window connection to the Kafka connect instance as follows:

/root/bin/snowsql -u <snowflake user name> -a <snowflake account name> --private-key-path=/home/ec2-user/rsa_key.p8
	
<snowflake user name>##(no warehouse)@KAFKA_DB.PUBLIC>use KAFKA_TEST_WH;
<snowflake user name>##(no warehouse)@KAFKA_DB.PUBLIC>use warehouse KAFKA_TEST_WH;

<snowflake user name>#KAFKA_TEST_WH@KAFKA_DB.PUBLIC>select * from MSKSNOWFLAKETESTTOPIC;
+------------------------------------+-------------------------+                
| RECORD_METADATA                    | RECORD_CONTENT          |
|------------------------------------+-------------------------|
| {                                  | {                       |
|   "CreateTime": 1581057931179,     |   "temperature": "45.6" |
|   "offset": 0,                     | }                       |
|   "partition": 0,                  |                         |
|   "topic": "MSKSnowflakeTestTopic" |                         |
| }                                  |                         |
+------------------------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 0.705s

Cleanup

Delete the CloudFormation template from the AWS console and drop the database from your Snowflake account.

Summary

In this post, you have learned how to ingest streaming data into an Amazon MSK cluster and persist that data in Snowflake using Snowflake connector installed on a Kafka Connect instance.

We have also gone into details of configuring Snowflake connector in a Kafka Connect instance to ingest data from an MSK cluster by:

  1. Creating an MSK cluster and Kafka Connect instance using an AWS CloudFormation template.
  2. Configuring Snowflake on a Kafka Connect instance.
  3. Creating a Kafka topic from a Kafka Connect instance.
  4. Publishing test data to a Kafka topic from a Kafka producer running on the Kafka Connect instance.
  5. Testing that data published to the Kafka topic is persisted in Snowflake.

Next Steps

If you are an existing Snowflake customer and have an active AWS account, spin up an Amazon MSK cluster and Kafka Connect instance using the AWS CloudFormation template. You can then start ingesting streaming data into Snowflake.

If you’re new to Snowflake, create a test account to test the streaming solution.

Here are some other resources you may find helpful:

.
Snowflake-APN-Blog-CTA-1
.


Snowflake Computing – APN Partner Spotlight

Snowflake is an AWS Competency Partner. A modern cloud data warehouse, Snowflake has built an enterprise-class SQL data warehouse designed for the cloud and today’s data.

Contact Snowflake | Solution Overview | AWS Marketplace

*Already worked with Snowflake? Rate this Partner

*To review an APN Partner, you must be an AWS customer that has worked with them directly on a project.