Kafka-Kinesis-Connector を使用して Amazon MSK クラスターに接続する方法を教えてください。
Kafka-Kinesis-Connector を使用して Amazon Managed Streaming for Apache Kafka (Amazon MSK) に接続しようとすると、エラーメッセージが表示されます。Kafka-Kinesis-Connector を使用して Amazon MSK クラスターに接続する方法を教えてください。
簡単な説明
Kafka-Kinesis-Connector を使用して MSK クラスターに接続するには、設定が以下の要件を満たしている必要があります。
- アクティブな AWS サブスクリプション。
- クライアントマシンと MSK クラスターの両方で認識される Virtual Private Cloud (VPC)。MSK クラスターとクライアントは同じ VPC に格納されている必要がある。
- MSK および Apache Zookeeper サーバーへの接続。
- VPC に関連付けられた 2 つのサブネット。
- サーバーからのメッセージの送受信を行うために MSK で作成されたトピック。
解決方法
プロジェクトファイルの構築
1. kafka-kinesis-connector プロジェクトをクローンして、Kafka-Kinesis-Connector をダウンロードします。
2. mvn package コマンドを使って、ターゲットディレクトリに amazon-kinesis-kafka-connector-X.X.X.jar ファイルを構築します。
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Kafka-Kinesis-Connector が、環境変数、Java システムプロパティ、認証情報プロファイルファイルの順で認証情報を検索します。
3. 設定を DefaultAWSCredentailsProviderChain 設定に更新します。
[ec2-user@ip-10-0-0-71 target]$ aws configure
このコマンドは、AWS Identity and Access Management (IAM) ユーザーにアタッチされたアクセスキーに、最低限必要なアクセス許可があることを確実にします。aws configure コマンドも、Amazon Kinesis Data Streams または Amazon Kinesis Data Firehose にアクセスするために使用できるポリシーがあることを確実にします。AWS 認証情報の設定に関する詳細については、AWS 認証情報の使用をご参照ください。
注: Java Development Kit (JDK) を使用している場合は、 EnvironmentVariableCredentialsProvider クラスを使って認証情報を提供することもできます。
4. Kinesis Data Streams を使用している場合は、以下のようにポリシーを更新します。
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Kinesis Data Firehose を使用している場合は、次の例のようにポリシーを更新します。
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Kinesis Data Firehose 配信ストリームの設定に関する詳細については、設定ファイルと認証情報ファイルの設定をご参照ください。
コネクタの設定
注: MSK からメッセージを発信するように Kafka-Kinesis-Connector を設定できます。Amazon Simple Storage Service (Amazon S3)、Amazon Redshift、または Amazon OpenSearch Service のいずれかの送信先に、メッセージを発信できます。
1. Kinesis Data Streams を設定する場合は、次の値を用いてコネクタを設定できます。
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
- または -
異なるタイプのストリームを設定する場合は、Kinesis Data Firehose 配信ストリームのプロパティを次のように設定します。
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. ワーカープロパティをスタンドアロンモードまたは分散モードのいずれかに設定します。
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Kafka-Kinesis-Connector のスタンドアロンモードまたは分散モードの詳細については、Apache ウェブサイトの Kafka Connect をご参照ください。
3. amazon-kinesis-kafka-connector-0.0.X.jar ファイルをディレクトリにコピーし、classpath をエクスポートします。
注意: amazon-kinesis-kafka-connector-0.0.X.jar ファイルは JAVA_HOME/lib/ext ディレクトリに追加することもできます。
4. 以下のコマンド構文を使用して、kafka-kinesis-connector を実行します。
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
関連情報
関連するコンテンツ
- 質問済み 5ヶ月前lg...
- 質問済み 20日前lg...
- AWS公式更新しました 1年前
- AWS公式更新しました 1年前