Flink에서 Kinesis Data Streams로 작성 시 발생하는 시간 초과 오류를 해결하려면 어떻게 해야 합니까?

최종 업데이트 날짜: 2020년 5월 6일

Flink에서 Amazon Kinesis Data Streams로 데이터를 작성하려고 하는데 시간 초과 오류 또는 예외 오류가 발생합니다. 이런 일이 발생하는 이유는 무엇이고 이 오류를 해결하려면 어떻게 해야 합니까?

간략한 설명

FlinkKinesisProducer를 사용하는 Flink 응용 프로그램은 다음의 오류 메시지 중 하나를 생성할 수 있습니다.

Caused by: org.apache.flink.kinesis.shaded.org.apache.http.conn.ConnectTimeoutException: Connect to kinesis.us-east-1.amazonaws.com:443 [kinesis.us-east-1.amazonaws.com/xxx.xxx.xxxx.xxx] failed: connect timed out
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

이 두 시간 초과 오류는 네트워크 문제와 Flink 응용 프로그램이 실행되는 환경의 시스템 리소스 부족으로 인해 발생합니다.

해결방법

Kinesis Data Streams 서비스 엔드포인트에 연결할 수 없음

Flink 응용 프로그램이 Data Streams 서비스 엔드포인트에 연결할 수 없는 경우 다음 오류가 발생합니다.

Caused by: org.apache.flink.kinesis.shaded.org.apache.http.conn.ConnectTimeoutException: Connect to kinesis.us-east-1.amazonaws.com: 443 [ kinesis.us-east-1.amazonaws.com/xxx.xxxx.xxx] failed:connect timed out

이 오류가 반복적으로 발생하면 네트워크 구성에 문제가 있을 수 있습니다.

이 문제를 해결하려면 다음 단계를 수행합니다.

1.    Flink 응용 프로그램이 인터넷에 연결할 수 있는지 확인합니다.

2.    Flink 응용 프로그램이 Virtual Private Cloud(VPC)의 AWS 리소스에서 실행 중인 경우 다음 VPC 기능이 올바르게 구성되어 있는지 확인합니다.
       라우팅 테이블
       보안 그룹
       네트워크 ACL(액세스 제어 목록)

3.    (선택 사항) 데이터 스트림의 VPC 엔드포인트를 사용하여 VPC 내에서 통신할 수도 있습니다.

제출된 요청에 대한 응답이 구성된 제한 시간 내에 반환되지 않음

다음 Curl 28 오류는 제출된 요청에 대한 응답이 구성된 제한 시간 내에 반환되지 않았음을 나타냅니다. 따라서 시간 초과가 발생했습니다.

[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

일시적인 네트워크 문제로 인해 시간 초과가 발생했습니다. 또한 레코드가 Kinesis Producer Library(KPL) 데몬으로 전송되는 데이터 스트림에 대해 대기 중인 요청이 너무 많아서 시간 초과가 발생할 수 있습니다. FlinkKinesisProducer는 KPL을 사용하여 Fink 스트림에서 Amazon Kinesis 스트림으로 데이터를 전송하기에 레코드는 KPL로 전송됩니다.

이 문제를 해결하려면 FlinkKinesisProducer 객체의 다음의 구성 매개 변수를 변경합니다.

Request timeout period: producerConfig.put (“RequestTimeout”, “****”); I
  • 내부 대기열 크기: FlinkKinesisProducer #setQueueLimit(queueLimit)

데이터 손실을 방지하기 위해 다음의 매개 변수를 업데이트하는 것도 모범 사례입니다.

Internal Queue Size: FlinkKinesisProducer #setQueueLimit (queueLimit)
time-to-live on records: producerConfig.put("RecordTtl", "*****");

setQueueLimit 값의 계산에 대한 자세한 내용은 Apache 웹 사이트의 Backpressure를 참조하십시오.


이 문서가 도움이 되었습니까?

AWS에서 개선해야 할 부분이 있습니까?


도움이 필요하십니까?