How do I connect to my Amazon MSK cluster using the Kinesis-Kafka-Connector?

Last updated: 2020-06-11

When I try to use the Kinesis-Kafka-Connector to connect with Amazon Managed Streaming for Apache Kafka (AMS MSK), I receive an error message. How do I connect to my Amazon MSK cluster using the Kinesis-Kafka-Connector?

Short Description

To connect to your MSK cluster using the Kinesis-Kafka-Connector, your setup must meet the following requirements:

  • An active AWS subscription.
  • A virtual private cloud (VPC) that is visible to both the client machine and MSK cluster. The MSK cluster and client must reside in the same VPC.
  • Connectivity to MSK Kafka and Apache Zookeeper servers.
  • Two subnets associated to your VPC.
  • Topics created in MSK Kafka to send and receive messages from the server.

Resolution

Build your project file

1.    Clone the kinesis-kafka-connector project to download the Kinesis-Kafka-Connector.

2.    Use the mvn package command to build the amazon-kinesis-kafka-connector-X.X.X.jar file in the target directory:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

The Kinesis-Kafka-Connector looks for credentials in the following order: environment variables, java system properties, and the credentials profile file.

3.    Update your configuration to the DefaultAWSCredentailsProviderChain setting:

[ec2-user@ip-10-0-0-71 target]$ aws configure

This command ensures that the access key attached to the AWS Identity and Access Management (IAM) user has the minimum required permissions. The aws configure command also ensures that there is a policy available to access Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose. For more information about setting AWS credentials, see Working with AWS credentials.

Note: If you are using a Java Development Kit (JDK), you can also use the EnvironmentVariableCredentialsProvider class to provide credentials.

4.    If you are using Kinesis Data Streams, update your policy to the following:

{
"Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt123",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListShards",
        "kinesis:DescribeStreamSummary",
        "kinesis:RegisterStreamConsumer"
      ],
      "Resource": [
        "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/SteamName"
      ]
    },
}

If you are using Kinesis Data Firehose, update your policy to look like the following example:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
            ]
        }
    ]

For more information about the Data Firehose delivery stream settings, see Configuration and credential file settings.

Configure the connector

Note: You can configure the Kinesis-Kafka-Connector to publish messages from MSK to any of the following destinations: Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES).

1.    If you are setting up Data Streams, you can configure the connector with the following values:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true

--or--

If you are setting up a different type of stream, configure the Data Firehose delivery stream properties like this:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configure the worker properties for either standalone or distributed mode:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

3.    Copy the amazon-kinesis-kafka-connector-0.0.X.jar file to your directory and export classpath.

Note: You can also add the amazon-kinesis-kafka-connector-0.0.X.jar file to the JAVA_HOME/lib/ext directory.

4.    Run the kinesis-kafka-connector by using the following command syntax:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/

worker.properties /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/

kinesis-kafka-streams-connecter.properties

Did this article help you?

Anything we could improve?


Need more help?