Comment me connecter à mon cluster Amazon MSK à l'aide du connecteur Kafka-Kinesis ?

Date de la dernière mise à jour : 23/12/2020

Lorsque j'essaie d'utiliser le connecteur Kafka-Kinesis pour me connecter à Amazon Managed Streaming for Apache Kafka (Amazon MSK), je reçois un message d'erreur. Comment me connecter à mon cluster Amazon MSK à l'aide du connecteur Kafka-Kinesis ?

Brève description

Pour vous connecter à votre cluster MSK à l'aide du connecteur Kafka-Kinesis, votre configuration doit répondre aux exigences suivantes :

  • Un abonnement AWS actif.
  • Un Virtual Private Cloud (VPC) visible à la fois de l'ordinateur client et du cluster MSK. Le cluster MSK et le client doivent résider dans le même VPC.
  • Connectivité aux serveurs MSK et Apache Zookeeper.
  • Deux sous-réseaux associés à votre VPC.
  • Rubriques créées dans MSK pour envoyer et recevoir des messages du serveur.

Résolution

Création de votre fichier de projet

1.    Clonez le projet kafka-kinesis-connector pour télécharger Kafka-Kinesis-Connector.

2.    Utilisez la commande mvn package pour créer le fichier amazon-kinesis-kafka-connector-X.X.X.jar dans le répertoire cible :

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Le connecteur Kafka-Kinesis recherche les informations d'identification dans l'ordre suivant: les variables d'environnement, les propriétés système Java et le fichier de profil d'informations d'identification.

3.    Mettez à jour votre configuration vers le paramètre DefaultAWSCredentailsProviderChain :

[ec2-user@ip-10-0-0-71 target]$ aws configure

Cette commande garantit que la clé d'accès attachée à l'utilisateur AWS Identity and Access Management (IAM) dispose des autorisations minimales requises. La commande aws configure garantit également qu'une stratégie est disponible pour accéder à Amazon Kinesis Data Streams ou Amazon Kinesis Data Firehose. Pour plus d'informations sur la configuration des informations d'identification AWS, consultez Utilisation des informations d'identification AWS.

Remarque : si vous utilisez un kit de développement Java (JDK), vous pouvez également utiliser la classe EnvironmentVariableCredentialsProvider pour fournir des informations d'identification.

4.    Si vous utilisez Kinesis Data Streams, mettez à jour votre stratégie comme suit :

{
"Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt123",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListShards",
        "kinesis:DescribeStreamSummary",
        "kinesis:RegisterStreamConsumer"
      ],
      "Resource": [
        "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/SteamName"
      ]
    },
}

Si vous utilisez Kinesis Data Firehose, mettez à jour votre stratégie pour qu'elle ressemble à l'exemple suivant :

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
            ]
        }
    ]

Pour plus d'informations sur les paramètres de flux de diffusion de Kinesis Data Firehose, consultez Paramètres du fichier de configuration et d'identification.

Configuration du connecteur

Remarque : vous pouvez configurer le connecteur Kafka-Kinesis pour publier des messages à partir de MSK. Les messages peuvent être publiés vers l'une des destinations suivantes: Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon Elasticsearch Service (Amazon ES).

1.    Si vous configurez Kinesis Data Streams, vous pouvez configurer le connecteur avec les valeurs suivantes :

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true

-ou-

Si vous configurez un autre type de flux, configurez les propriétés du flux de diffusion Kinesis Data Firehose comme suit :

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configurez les propriétés de travail pour le mode autonome ou distribué :

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Pour plus d'informations sur le mode autonome ou distribué de Kafka-Kinesis-Connector, consultez Kafka Connect sur le site web Apache.

3.    Copiez le fichier amazon-kinesis-kafka-connector-0.0.X.jar vers votre répertoire et exportez chemin de classe.

Remarque : vous pouvez également ajouter le fichier amazon-kinesis-kafka-connector-0.0.X.jar au répertoire JAVA_HOME/lib/ext.

4.    Exécutez le connecteur kinesis-kafka connector à l'aide de la syntaxe de commande suivante :

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Cet article vous a-t-il été utile ?


Besoin d'aide pour une question technique ou de facturation ?