如何使用 Kafka-Kinesis-Connector 連線至我的 Amazon MSK 叢集?
當我嘗試使用 Kafka-Kinesis-Connector 與 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 連線時,我收到錯誤訊息。如何使用 Kafka-Kinesis-Connector 連線至我的 Amazon MSK 叢集?
簡短說明
若要使用 Kafka-Kinesis-Connector 連線至 MSK 叢集,您的設定必須符合下列需求:
- 有效的 AWS 訂閱。
- 用戶端機器和 MSK 叢集都可見的虛擬私有雲端 (VPC)。MSK 叢集和用戶端必須位於相同的 VPC 中。
- 與 MSK 和 Apache Zookeeper 伺服器的連線。
- 與 VPC 相關聯的兩個子網路。
- 在 MSK 中建立的用於從伺服器傳送和接收訊息的主題。
解決方法
建置專案檔案
1、 複製 kafka-kinesis-connector 專案以下載 Kafka-Kinesis-Connector。
2、 使用 mvn package 命令,在目標目錄中建置 amazon-kinesis-kafka-connector-X.X.X.jar 檔案:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Kafka-Kinesis-Connector 會依下列順序尋找憑證:環境變數、Java 系統屬性和憑證設定檔。
3、 將您的組態更新為 DefaultAWSCredentailsProviderChain 設定:
[ec2-user@ip-10-0-0-71 target]$ aws configure
此命令可確保連接至 AWS Identity and Access Management (IAM) 使用者的存取金鑰,具有最低的必要權限。aws configure 命令也可確保有可用於存取 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 的政策。如需設定 AWS 憑證的詳細資訊,請參閱使用 AWS 憑證。
**注意:**如果您使用的是 Java 開發套件 (JDK),您也可以使用 EnvironmentVariableCredentialsProvider 類別來提供憑證。
4、 如果您使用的是 Kinesis Data Streams,請將政策更新為以下內容:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
如果您使用的是 Kinesis Data Firehose,請將政策更新為類似以下範例的內容:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
如需 Kinesis Data Firehose 交付串流設定的詳細資訊,請參閱組態與憑證檔案設定。
設定連接器
**注意:**您可以設定 Kafka-Kinesis-Connector 以從 MSK 發佈訊息。訊息可以發佈至下列目標: Amazon Simple Storage Service (Amazon S3)、Amazon Redshift 或 Amazon OpenSearch Service。
1、 如果您要設定 Kinesis Data Streams,則可以使用下列值來設定連接器:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
- 或 -
如果您要設定不同類型的串流,請設定 Kinesis Data Firehose 交付串流屬性,如下所示:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2、 設定獨立或分散式模式的 Worker 屬性:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
如需 Kafka-Kinesis-Connector 的獨立或分散式模式的詳細資訊,請參閱 Apache 網站上的 Kafka Connect。
3、 將 amazon-kinesis-kafka-connector-0.0.X.jar 檔案複製到您的目錄並匯出類別路徑。
**注意:**您也可以將 amazon-kinesis-kafka-connector-0.0.X.jar 檔案新增至 JAVA_HOME/lib/ext 目錄。
4、 使用下列命令語法執行 kafka-kinesis-connector:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
相關資訊
相關內容
- 已提問 9 個月前lg...
- 已提問 1 年前lg...
- 已提問 2 個月前lg...
- 已提問 2 個月前lg...
- AWS 官方已更新 2 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 1 年前