Kinesis-Kafka-Connector를 사용하여 Amazon MSK 클러스터에 연결하려면 어떻게 해야 합니까?

최종 업데이트 날짜: 2020년 9월 1일

Kinesis-Kafka-Connector를 사용하여 Amazon Managed Streaming for Apache Kafka(Amazon MSK)에 연결하려고 하면 오류 메시지가 표시됩니다. Kinesis-Kafka-Connector를 사용하여 Amazon MSK 클러스터에 연결하려면 어떻게 해야 합니까?

간략한 설명

Kinesis-Kafka-Connector를 사용하여 MSK 클러스터에 연결하려면 설정이 다음 요구 사항을 충족해야 합니다.

  • 활성 AWS 구독.
  • 클라이언트 머신과 MSK 클러스터 모두에 표시되는 VPC(가상 프라이빗 네트워크). MSK 클러스터와 클라이언트는 동일한 VPC에 상주해야 합니다.
  • MSK Kafka 및 Apache Zookeeper 서버에 대한 연결.
  • VPC에 연결된 서브넷 2개.
  • 서버의 메시지를 송수신하기 위해 MSK Kafka에 생성된 주제.

해결 방법

프로젝트 파일 구축

1.    kinesis-kafka-connector 프로젝트를 복제하여 Kinesis-Kafka-Connector를 다운로드합니다.

2.    mvn package 명령을 사용하여 대상 디렉터리에 amazon-kinesis-kafka-connector-X.X.X.jar 파일을 구축합니다.

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Kinesis-Kafka-Connector는 환경 변수, Java 시스템 속성 및 자격 증명 프로파일 파일 순서로 자격 증명을 찾습니다.

3.    구성을 DefaultAWSCredentailsProviderChain 설정으로 업데이트합니다.

[ec2-user@ip-10-0-0-71 target]$ aws configure

이 명령은 AWS Identity and Access Management(IAM) 사용자에 연결된 액세스 키에 필요한 최소 권한이 있는지 확인합니다. 또한 aws configure 명령은 Amazon Kinesis Data Streams 또는 Amazon Kinesis Data Firehose에 액세스할 때 사용할 수 있는 정책이 있는지 확인합니다. AWS 자격 증명 설정에 대한 자세한 내용은 AWS 자격 증명 작업을 참조하십시오.

참고: JDK(Java Development Kit)를 사용하는 경우 EnvironmentVariableCredentialsProvider 클래스를 사용하여 자격 증명을 제공할 수도 있습니다.

4.    Kinesis Data Streams를 사용하는 경우 정책을 다음으로 업데이트합니다.

{
"Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt123",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:ListShards",
        "kinesis:DescribeStreamSummary",
        "kinesis:RegisterStreamConsumer"
      ],
      "Resource": [
        "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/SteamName"
      ]
    },
}

Kinesis Data Firehose를 사용하는 경우 다음 예와 같이 정책을 업데이트합니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination"
            ],
            "Resource": [
                "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
            ]
        }
    ]

Data Firehose 전송 스트림 설정에 대한 자세한 내용은 구성 및 자격 증명 파일 설정을 참조하십시오.

커넥터 구성

참고: MSK의 메시지를 Amazon Simple Storage Service(Amazon S3), Amazon Redshift 또는 Amazon Elasticsearch Service(Amazon ES) 대상에 게시하도록 Kinesis-Kafka-Connector를 구성할 수 있습니다.

1.    데이터 스트림을 설정하는 경우 다음 값을 사용하여 커넥터를 구성할 수 있습니다.

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstandings records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true

--또는--

다른 유형의 스트림을 설정하는 경우 다음과 같이 Data Firehose 전송 스트림 속성을 구성합니다.

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    독립 실행형 또는 분산형 모드에 대한 작업자 속성을 구성합니다.

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

3.    amazon-kinesis-kafka-connector-0.0.X.jar 파일을 디렉터리에 복사하고 classpath를 내보냅니다.

참고: amazon-kinesis-kafka-connector-0.0.X.jar 파일을 JAVA_HOME/lib/ext 디렉터리에 추가할 수도 있습니다.

4.    다음 명령 구문을 사용하여 kinesis-kafka-connector를 실행합니다.

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/

worker.properties /home/ec2-user/kinesis-kafka-connector/kinesis-kafka-connector/

kinesis-kafka-streams-connecter.properties

이 문서가 도움이 되었습니까?


결제 또는 기술 지원이 필요합니까?