Kafka-Kinesis-Connector を使用して Amazon MSK クラスターに接続する方法を教えてください。

最終更新日: 2020 年 12 月 23 日

Kafka-Kinesis-Connector を使用して Amazon Managed Streaming for Apache Kafka (Amazon MSK) に接続しようとすると、エラーメッセージが表示されます。Kafka-Kinesis-Connector を使用して Amazon MSK クラスターに接続する方法を教えてください。

簡単な説明

Kafka-Kinesis-Connector を使用して MSK クラスターに接続するには、設定が以下の要件を満たしている必要があります。

  • アクティブな AWS サブスクリプション。
  • クライアントマシンと MSK クラスターの両方で認識される Virtual Private Cloud (VPC)。MSK クラスターとクライアントは同じ VPC に格納されている必要がある。
  • MSK および Apache Zookeeper サーバーへの接続。
  • VPC に関連付けられた 2 つのサブネット。
  • サーバーからのメッセージの送受信を行うために MSK で作成されたトピック。

解決方法

プロジェクトファイルの構築

1.    kafka-kinesis-connector プロジェクトをクローンして、Kafka-Kinesis-Connector をダウンロードします。

2.    mvn package コマンドを使って、ターゲットディレクトリに amazon-kinesis-kafka-connector-X.X.X.jar ファイルを構築します。

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Kafka-Kinesis-Connector が、環境変数、Java システムプロパティ、認証情報プロファイルファイルの順で認証情報を検索します。

3.    設定を DefaultAWSCredentailsProviderChain 設定に更新します。

[ec2-user@ip-10-0-0-71 target]$ aws configure

このコマンドは、AWS Identity and Access Management (IAM) ユーザーにアタッチされたアクセスキーに、最低限必要なアクセス許可があることを確実にします。aws configure コマンドも、Amazon Kinesis Data Streams または Amazon Kinesis Data Firehose にアクセスするために使用できるポリシーがあることを確実にします。AWS 認証情報の設定に関する詳細については、AWS 認証情報の使用をご参照ください。

注: Java Development Kit (JDK) を使用している場合は、 EnvironmentVariableCredentialsProvider クラスを使って認証情報を提供することもできます。

4.    Kinesis Data Streams を使用している場合は、以下のようにポリシーを更新します。

{
"Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt123",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListShards",
        "kinesis:DescribeStreamSummary",
        "kinesis:RegisterStreamConsumer"
      ],
      "Resource": [
        "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/SteamName"
      ]
    },
}

Kinesis Data Firehose を使用している場合は、次の例のようにポリシーを更新します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
            ]
        }
    ]

Kinesis Data Firehose 配信ストリームの設定に関する詳細については、設定ファイルと認証情報ファイルの設定をご参照ください。

コネクタの設定

注: MSK からメッセージを発信するように Kafka-Kinesis-Connector を設定できます。Amazon Simple Storage Service (Amazon S3)、Amazon Redshift、または Amazon OpenSearch Service のいずれかの送信先に、メッセージを発信できます。

1.    Kinesis Data Streams を設定する場合は、次の値を用いてコネクタを設定できます。

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true

- または -

異なるタイプのストリームを設定する場合は、Kinesis Data Firehose 配信ストリームのプロパティを次のように設定します。

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    ワーカープロパティをスタンドアロンモードまたは分散モードのいずれかに設定します。

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Kafka-Kinesis-Connector のスタンドアロンモードまたは分散モードの詳細については、Apache ウェブサイトの Kafka Connect をご参照ください。

3.    amazon-kinesis-kafka-connector-0.0.X.jar ファイルをディレクトリにコピーし、classpath をエクスポートします。

注: amazon-kinesis-kafka-connector-0.0.X.jar ファイルを JAVA_HOME/lib/ext ディレクトリに追加することも可能です。

4.    以下のコマンド構文を使用して、kafka-kinesis-connector を実行します。

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Amazon OpenSearch Service は、Amazon Elasticsearch Service の後継サービスです。


この記事は役に立ちましたか?


請求に関するサポートまたは技術サポートが必要ですか?