Come posso connettermi al mio cluster Amazon MSK utilizzando il connettore Kafka-Kinesis?

4 minuti di lettura
0

Quando provo a utilizzare il connettore Kafka-Kinesis per connettermi a Streaming gestito da Amazon per Apache Kafka (Amazon MSK) ricevo un messaggio di errore. Come posso connettermi al mio cluster Amazon MSK utilizzando il connettore Kafka-Kinesis?

Breve descrizione

Per connettersi al cluster MSK utilizzando Kafka-Kinesis-Connector, la configurazione deve soddisfare i seguenti requisiti:

  • Un abbonamento AWS attivo.
  • Un cloud privato virtuale (VPC) visibile sia alla macchina client che al cluster MSK. Il cluster MSK e il client devono risiedere nello stesso VPC.
  • Connettività ai server MSK e Apache Zookeeper.
  • Due sottoreti associate al tuo VPC.
  • Argomenti creati in MSK per inviare e ricevere messaggi dal server.

Soluzione

Creazione del file di progetto

1.    Clona il progetto kafka-kinesis-connector per scaricare il connettore Kafka-Kinesis.

2.    Usa il comando mvn package per creare il file amazon-kinesis-kafka-connector-X.X.X.jar nella directory di destinazione:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Il connettore Kafka-Kinesis cerca le credenziali nel seguente ordine: variabili di ambiente, proprietà del sistema java e file del profilo delle credenziali.

3.    Aggiorna la tua configurazione all'impostazione DefaultAWSCredentailsProviderChain:

[ec2-user@ip-10-0-0-71 target]$ aws configure

Questo comando assicura che la chiave di accesso allegata all'utente AWS Identity and Access Management (IAM) disponga delle autorizzazioni minime richieste. Il comandoaws configure assicura inoltre che sia disponibile una policy per accedere al flusso di dati Amazon Kinesis o ad Amazon Kinesis Data Firehose. Per ulteriori informazioni sull'impostazione delle credenziali AWS, consulta Lavorare con le credenziali AWS.

Nota: Se utilizzi un Java Development Kit (JDK), puoi utilizzare anche la classe EnvironmentVariableCredentialsProvider per fornire le credenziali.

4.    Se utilizzi il flusso di dati Kinesis, aggiorna la tua policy come segue:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Sid": "Stmt123",
          "Effect": "Allow",
          "Action": [
               "kinesis:DescribeStream",
               "kinesis:PutRecord",
               "kinesis:PutRecords",
               "kinesis:GetShardIterator",
               "kinesis:GetRecords",
               "kinesis:ListShards",
               "kinesis:DescribeStreamSummary",
               "kinesis:RegisterStreamConsumer"
          ],
          "Resource": [
               "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
          ]
     }]
}

Se utilizzi Kinesis Data Firehose, aggiorna la tua policy in modo che assomigli all'esempio seguente:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Effect": "Allow",
          "Action": [
               "firehose:DeleteDeliveryStream",
               "firehose:PutRecord",
               "firehose:PutRecordBatch",
               "firehose:UpdateDestination"
          ],
          "Resource": [
               "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
          ]
     }]
}

Per ulteriori informazioni sulle impostazioni del flusso di consegna di Kinesis Data Firehose, consulta Configurazione e impostazioni dei file di credenziali.

Configurazione del connettore

Nota: puoi configurare il connettore Kafka-Kinesis per pubblicare messaggi da MSK. I messaggi possono essere pubblicati nelle seguenti destinazioni: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o servizio OpenSearch di Amazon.

1.    Se stai impostando il flusso di dati Kinesis, puoi configurare il connettore con i seguenti valori:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstanding records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

-oppure-

Se stai impostando un tipo di flusso diverso, configura le proprietà del flusso di consegna di Kinesis Data Firehose in questo modo:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configura le proprietà worker, operatore, per la modalità autonoma o distribuita:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Per ulteriori informazioni sulla modalità autonoma o distribuita del connettore Kafka-Kinesis, consulta Kafka Connect sul sito web di Apache.

3.    Copia il file amazon-kinesis-kafka-connector-0.0.X.jar nella tua directory ed esporta il classpath.

Nota: puoi aggiungere anche il file amazon-kinesis-kafka-connector-0.0.X.jar alla directory JAVA_HOME/lib/ext.

4.    Esegui il connettore Kafka-Kinesis utilizzando la seguente sintassi di comando:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Informazioni correlate

Creazione di un cluster Amazon MSK

AWS UFFICIALE
AWS UFFICIALEAggiornata 3 anni fa