如何使用 Kafka-Kinesis-Connector 连接到我的 Amazon MSK 集群?

上次更新时间:2020 年 10 月 15 日

当我尝试使用 Kafka-Kinesis-Connector 连接到 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 时,收到一条错误消息。如何使用 Kafka-Kinesis-Connector 连接到我的 Amazon MSK 集群?

简短描述

要使用 Kafka-Kinesis-Connector 连接到 MSK 集群,您的设置必须满足以下要求:

  • 有效的 AWS 订阅。
  • 对客户端计算机和 MSK 集群均可见的 Virtual Private Cloud (VPC)。MSK 群集和客户端必须驻留在同一 VPC 中。
  • 与 MSK Kafka 和 Apache Zookeeper 服务器的连接。
  • 与您的 VPC 关联的两个子网。
  • 在 MSK Kafka 中创建的主题,用于从服务器发送和接收消息。

解决方法

构建您的项目文件

1.    克隆 kafka-kinesis-connector 项目以下载 Kafka-Kinesis-Connector。

2.    使用 mvn package 命令在目标目录中构建 amazon-kinesis-kafka-connector-X.X.X.jar 文件:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Kafka-Kinesis-Connector 按以下顺序查找凭证:环境变量、java 系统属性和凭证配置文件。

3.    将您的配置更新为 DefaultAWSCredentailsProviderChain 设置:

[ec2-user@ip-10-0-0-71 target]$ aws configure

此命令确保附加到 AWS Identity and Access Management (IAM) 用户的访问密钥具有所需的最低权限。aws configure 命令还确保存在可用于访问 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 的策略。有关设置 AWS 凭证的更多信息,请参阅使用 AWS 凭证

注意:如果您使用的是 Java 开发工具包 (JDK),则也可以使用 EnvironmentVariableCredentialsProvider 类来提供凭证。

4.    如果您使用的是 Kinesis Data Streams,请将您的策略更新为以下内容:

{
"Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt123",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListShards",
        "kinesis:DescribeStreamSummary",
        "kinesis:RegisterStreamConsumer"
      ],
      "Resource": [
        "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/SteamName"
      ]
    },
}

如果您使用的是 Kinesis Data Firehose,请将您的策略更新为类似于以下示例的内容:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
            ]
        }
    ]

有关 Kinesis Data Firehose 传输流设置的更多信息,请参阅配置和凭证文件设置。

配置连接器

注意:您可以将 Kafka-Kinesis-Connector 配置为从 MSK 发布消息。消息可以发布到以下任何目标:Amazon Simple Storage Service (Amazon S3)、Amazon Redshift 或 Amazon Elasticsearch Service (Amazon ES)。

1.    如果要设置 Kinesis Data Streams,则可以使用以下值配置连接器:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true

--或者--

如果要设置其他类型的流,请配置 Kinesis Data Firehose 传输流属性,如下所示:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    为独立分布式模式配置工作线程属性:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

有关 Kafka-Kinesis-Connector 的独立分布式模式的详细信息,请参阅 Apache 网站上的 Kafka Connect

3.    将 amazon-kinesis-kafka-connector-0.0.X.jar 文件复制到您的目录并导出类路径。

注意:您也可以将 amazon-kinesis-kafka-connector-0.0.X.jar 文件添加到 JAVA_HOME/lib/ext 目录。

4.    使用以下命令语法运行 kafka-kinesis-connector

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

这篇文章对您有帮助吗?


您是否需要账单或技术支持?