How do I use the Kafka-Kinesis-Connector to connect to my Amazon MSK cluster?

4 minute read
0

When I try to use the Kafka-Kinesis-Connector to connect with Amazon Managed Streaming for Apache Kafka (Amazon MSK), I receive an error message.

Short description

Prerequisites:

  • You have an active AWS subscription.
  • You have a virtual private cloud (VPC) that's visible to both the client machine and MSK cluster. The MSK cluster and client must reside in the same VPC.
  • You have connectivity to Amazon MSK and Apache Zookeeper servers.
  • There are two subnets associated with your VPC.
  • You created topics in MSK to send and receive messages from the server.

Resolution

Build your project file

  1. To download the Kafka-Kinesis-Connector, clone the kafka-kinesis-connector project from the GitHub website.
  2. To build the amazon-kinesis-kafka-connector-X.X.X.jar file in the target directory, run the mvn package command:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    The Kafka-Kinesis-Connector looks for credentials in the following order: environment variables, java system properties, and the credentials profile file.
  3. To update your configuration to the DefaultAWSCredentailsProviderChain setting, run the following command:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    The preceding command makes sure that the access key that's attached to the AWS Identity and Access Management (IAM) user has the minimum required permissions. The aws configure command also makes sure that there's an available policy to access Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose. For more information about setting AWS credentials, see Provide temporary credentials to the AWS SDK for Java.
    Note: If you use a Java Development Kit (JDK), then you can also use the EnvironmentVariableCredentialsProvider class to provide credentials.
  4. If you use Kinesis Data Streams, then update your policy to look similar to the following example:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    If you use Kinesis Data Firehose, then update your policy to look similar to the following example:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    For more information about the Kinesis Data Firehose delivery stream settings, see Configuration and credential file settings.

Configure the connector

Note: You can configure the Kafka-Kinesis-Connector to publish messages from MSK. You can publish messages to the following destinations: Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon OpenSearch Service.

  1. If you're setting up Kinesis Data Streams, then configure the connector with the following values:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    If you're setting up a different type of stream, then configure the Kinesis Data Firehose delivery stream properties with the following values:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Configure the worker properties for either standalone or distributed mode:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    For more information about Kafka-Kinesis-Connector's standalone or distributed mode, see Kafka Connect on the Apache website.

  3. Copy the amazon-kinesis-kafka-connector-0.0.X.jar file to your directory, and export classpath.
    Note: You can also add the amazon-kinesis-kafka-connector-0.0.X.jar file to the JAVA_HOME/lib/ext directory.

  4. To run the Kafka-Kinesis-Connector, use the following command syntax:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Related information

Creating an Amazon MSK cluster

AWS OFFICIAL
AWS OFFICIALUpdated 4 months ago