Como me conecto ao meu cluster do Amazon MSK usando o Kafka-Kinesis-Connector?

4 minuto de leitura
0

Quando tento usar o Kafka-Kinesis-Connector para me conectar ao Amazon Managed Streaming for Apache Kafka (Amazon MSK), recebo uma mensagem de erro. Como me conecto ao meu cluster do Amazon MSK usando o Kafka-Kinesis-Connector?

Breve descrição

Para se conectar ao seu cluster do MSK usando o Kafka-Kinesis-Connector, sua configuração deve atender aos seguintes requisitos:

  • Uma assinatura ativa da AWS.
  • Uma nuvem privada virtual (VPC) visível tanto para a máquina cliente quanto para o cluster do MSK. O cluster e o cliente do MSK devem residir na mesma VPC.
  • Conectividade com servidores MSK e Apache Zookeeper.
  • Duas sub-redes associadas à sua VPC.
  • Tópicos criados no MSK para enviar e receber mensagens do servidor.

Resolução

Construir seu arquivo de projeto

1.    Clone o projeto kafka-kinesis-connector para baixar o Kafka-Kinesis-Connector.

2.    Use o comando mvn package para criar o arquivo amazon-kinesis-kafka-connector-X.X.X.jar no diretório de destino:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

O Kafka-Kinesis-Connector procura credenciais na seguinte ordem: variáveis de ambiente, propriedades do sistema java e o arquivo de perfil de credenciais.

3.    Atualize sua configuração para a configuração DefaultAWSCredentailsProviderChain:

[ec2-user@ip-10-0-0-71 target]$ aws configure

Esse comando garante que a chave de acesso anexada ao usuário do AWS Identity and Access Management (IAM) tenha as permissões mínimas necessárias. O comando aws configure também garante que haja uma política disponível para acessar o Amazon Kinesis Data Streams ou o Amazon Kinesis Data Firehose. Para mais informações sobre a configuração de credenciais da AWS, consulte Working with AWS credentials (Como trabalhar com credenciais da AWS).

Observação: se você estiver usando um Java Development Kit (JDK), também poderá usar a classe EnvironmentVariableCredentialsProvider para fornecer credenciais.

4.    Se você estiver usando o Kinesis Data Streams, atualize sua política para o seguinte:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Sid": "Stmt123",
          "Effect": "Allow",
          "Action": [
               "kinesis:DescribeStream",
               "kinesis:PutRecord",
               "kinesis:PutRecords",
               "kinesis:GetShardIterator",
               "kinesis:GetRecords",
               "kinesis:ListShards",
               "kinesis:DescribeStreamSummary",
               "kinesis:RegisterStreamConsumer"
          ],
          "Resource": [
               "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
          ]
     }]
}

Se você estiver usando o Kinesis Data Firehose, atualize sua política para se parecer com o exemplo a seguir:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Effect": "Allow",
          "Action": [
               "firehose:DeleteDeliveryStream",
               "firehose:PutRecord",
               "firehose:PutRecordBatch",
               "firehose:UpdateDestination"
          ],
          "Resource": [
               "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
          ]
     }]
}

Para mais informações sobre as configurações do fluxo de entrega do Kinesis Data Firehose, consulte Configuration and credential file settings (Definições e configurações do arquivo de credenciais).

Configurar o conector

Observação: você pode configurar o Kafka-Kinesis-Connector para publicar mensagens do MSK. As mensagens podem ser publicadas nos seguintes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon OpenSearch Service.

1.    Se você estiver configurando o Kinesis Data Streams, poderá configurar o conector com os seguintes valores:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstanding records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

-ou-

Se você estiver configurando um tipo diferente de fluxo, configure as propriedades do fluxo de entrega do Kinesis Data Firehose da seguinte forma:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configure as propriedades do trabalhador para o modo autônomo ou distribuído:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Para mais informações sobre o modo autônomo ou distribuído do Kafka-Kinesis-Connector, consulte Kafka Connect no site da Apache.

3.    Copie o arquivo amazon-kinesis-kafka-connector-0.0.X.jar para o seu diretório e exporte o classpath.

Observação: Você também pode adicionar o arquivo amazon-kinesis-kafka-connector-0.0.X.jar ao diretório JAVA_HOME/lib/ext.

4.    Execute o kafka-kinesis-connector usando a seguinte sintaxe de comando:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Informações relacionadas

Criar um cluster do Amazon MSK

AWS OFICIAL
AWS OFICIALAtualizada há 3 anos