Como me conecto ao meu cluster do Amazon MSK usando o Kafka-Kinesis-Connector?
Quando tento usar o Kafka-Kinesis-Connector para me conectar ao Amazon Managed Streaming for Apache Kafka (Amazon MSK), recebo uma mensagem de erro. Como me conecto ao meu cluster do Amazon MSK usando o Kafka-Kinesis-Connector?
Breve descrição
Para se conectar ao seu cluster do MSK usando o Kafka-Kinesis-Connector, sua configuração deve atender aos seguintes requisitos:
- Uma assinatura ativa da AWS.
- Uma nuvem privada virtual (VPC) visível tanto para a máquina cliente quanto para o cluster do MSK. O cluster e o cliente do MSK devem residir na mesma VPC.
- Conectividade com servidores MSK e Apache Zookeeper.
- Duas sub-redes associadas à sua VPC.
- Tópicos criados no MSK para enviar e receber mensagens do servidor.
Resolução
Construir seu arquivo de projeto
1. Clone o projeto kafka-kinesis-connector para baixar o Kafka-Kinesis-Connector.
2. Use o comando mvn package para criar o arquivo amazon-kinesis-kafka-connector-X.X.X.jar no diretório de destino:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
O Kafka-Kinesis-Connector procura credenciais na seguinte ordem: variáveis de ambiente, propriedades do sistema java e o arquivo de perfil de credenciais.
3. Atualize sua configuração para a configuração DefaultAWSCredentailsProviderChain:
[ec2-user@ip-10-0-0-71 target]$ aws configure
Esse comando garante que a chave de acesso anexada ao usuário do AWS Identity and Access Management (IAM) tenha as permissões mínimas necessárias. O comando aws configure também garante que haja uma política disponível para acessar o Amazon Kinesis Data Streams ou o Amazon Kinesis Data Firehose. Para mais informações sobre a configuração de credenciais da AWS, consulte Working with AWS credentials (Como trabalhar com credenciais da AWS).
Observação: se você estiver usando um Java Development Kit (JDK), também poderá usar a classe EnvironmentVariableCredentialsProvider para fornecer credenciais.
4. Se você estiver usando o Kinesis Data Streams, atualize sua política para o seguinte:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Se você estiver usando o Kinesis Data Firehose, atualize sua política para se parecer com o exemplo a seguir:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Para mais informações sobre as configurações do fluxo de entrega do Kinesis Data Firehose, consulte Configuration and credential file settings (Definições e configurações do arquivo de credenciais).
Configurar o conector
Observação: você pode configurar o Kafka-Kinesis-Connector para publicar mensagens do MSK. As mensagens podem ser publicadas nos seguintes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon OpenSearch Service.
1. Se você estiver configurando o Kinesis Data Streams, poderá configurar o conector com os seguintes valores:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-ou-
Se você estiver configurando um tipo diferente de fluxo, configure as propriedades do fluxo de entrega do Kinesis Data Firehose da seguinte forma:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Configure as propriedades do trabalhador para o modo autônomo ou distribuído:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Para mais informações sobre o modo autônomo ou distribuído do Kafka-Kinesis-Connector, consulte Kafka Connect no site da Apache.
3. Copie o arquivo amazon-kinesis-kafka-connector-0.0.X.jar para o seu diretório e exporte o classpath.
Observação: Você também pode adicionar o arquivo amazon-kinesis-kafka-connector-0.0.X.jar ao diretório JAVA_HOME/lib/ext.
4. Execute o kafka-kinesis-connector usando a seguinte sintaxe de comando:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Informações relacionadas
Conteúdo relevante
- AWS OFICIALAtualizada há 2 anos
- AWS OFICIALAtualizada há um ano
- AWS OFICIALAtualizada há um ano
- AWS OFICIALAtualizada há um ano