Come posso connettermi al mio cluster Amazon MSK utilizzando il connettore Kafka-Kinesis?
Quando provo a utilizzare il connettore Kafka-Kinesis per connettermi a Streaming gestito da Amazon per Apache Kafka (Amazon MSK) ricevo un messaggio di errore. Come posso connettermi al mio cluster Amazon MSK utilizzando il connettore Kafka-Kinesis?
Breve descrizione
Per connettersi al cluster MSK utilizzando Kafka-Kinesis-Connector, la configurazione deve soddisfare i seguenti requisiti:
- Un abbonamento AWS attivo.
- Un cloud privato virtuale (VPC) visibile sia alla macchina client che al cluster MSK. Il cluster MSK e il client devono risiedere nello stesso VPC.
- Connettività ai server MSK e Apache Zookeeper.
- Due sottoreti associate al tuo VPC.
- Argomenti creati in MSK per inviare e ricevere messaggi dal server.
Soluzione
Creazione del file di progetto
1. Clona il progetto kafka-kinesis-connector per scaricare il connettore Kafka-Kinesis.
2. Usa il comando mvn package per creare il file amazon-kinesis-kafka-connector-X.X.X.jar nella directory di destinazione:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Il connettore Kafka-Kinesis cerca le credenziali nel seguente ordine: variabili di ambiente, proprietà del sistema java e file del profilo delle credenziali.
3. Aggiorna la tua configurazione all'impostazione DefaultAWSCredentailsProviderChain:
[ec2-user@ip-10-0-0-71 target]$ aws configure
Questo comando assicura che la chiave di accesso allegata all'utente AWS Identity and Access Management (IAM) disponga delle autorizzazioni minime richieste. Il comandoaws configure assicura inoltre che sia disponibile una policy per accedere al flusso di dati Amazon Kinesis o ad Amazon Kinesis Data Firehose. Per ulteriori informazioni sull'impostazione delle credenziali AWS, consulta Lavorare con le credenziali AWS.
Nota: Se utilizzi un Java Development Kit (JDK), puoi utilizzare anche la classe EnvironmentVariableCredentialsProvider per fornire le credenziali.
4. Se utilizzi il flusso di dati Kinesis, aggiorna la tua policy come segue:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Se utilizzi Kinesis Data Firehose, aggiorna la tua policy in modo che assomigli all'esempio seguente:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Per ulteriori informazioni sulle impostazioni del flusso di consegna di Kinesis Data Firehose, consulta Configurazione e impostazioni dei file di credenziali.
Configurazione del connettore
Nota: puoi configurare il connettore Kafka-Kinesis per pubblicare messaggi da MSK. I messaggi possono essere pubblicati nelle seguenti destinazioni: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o servizio OpenSearch di Amazon.
1. Se stai impostando il flusso di dati Kinesis, puoi configurare il connettore con i seguenti valori:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-oppure-
Se stai impostando un tipo di flusso diverso, configura le proprietà del flusso di consegna di Kinesis Data Firehose in questo modo:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Configura le proprietà worker, operatore, per la modalità autonoma o distribuita:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Per ulteriori informazioni sulla modalità autonoma o distribuita del connettore Kafka-Kinesis, consulta Kafka Connect sul sito web di Apache.
3. Copia il file amazon-kinesis-kafka-connector-0.0.X.jar nella tua directory ed esporta il classpath.
Nota: puoi aggiungere anche il file amazon-kinesis-kafka-connector-0.0.X.jar alla directory JAVA_HOME/lib/ext.
4. Esegui il connettore Kafka-Kinesis utilizzando la seguente sintassi di comando:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Informazioni correlate
Contenuto pertinente
- AWS UFFICIALEAggiornata 9 mesi fa
- AWS UFFICIALEAggiornata un anno fa
- AWS UFFICIALEAggiornata 2 anni fa
- AWS UFFICIALEAggiornata un anno fa