Flink から Kinesis Data Streams に書き込むときのタイムアウトエラーのトラブルシューティング方法を教えてください。

最終更新日: 2020 年 5 月 6 日

Flink から Amazon Kinesis Data Streams にデータを書き込もうとしましたが、タイムアウトまたは例外エラーが表示されます。タイムアウトまたは例外エラーが表示される理由は何ですか? また、これらのエラーをトラブルシューティングする方法を教えてください。

簡単な説明

FlinkKinesisProducer を使用する Flink アプリケーションは、次のいずれかのエラーメッセージを生成できます。

Caused by: org.apache.flink.kinesis.shaded.org.apache.http.conn.ConnectTimeoutException: Connect to kinesis.us-east-1.amazonaws.com:443 [kinesis.us-east-1.amazonaws.com/xxx.xxx.xxxx.xxx] failed: connect timed out
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

これら 2 つのタイムアウトエラーは、ネットワークの問題と、Flink アプリケーションが実行されている環境でのシステムリソースの不足が原因で発生します。

解決方法

Kinesis Data Streams サービスエンドポイントに接続できない

次のエラーは、Flink アプリケーションが Data Streams サービスエンドポイントに接続できない場合に発生します。

Caused by: org.apache.flink.kinesis.shaded.org.apache.http.conn.ConnectTimeoutException: Connect to kinesis.us-east-1.amazonaws.com: 443 [ kinesis.us-east-1.amazonaws.com/xxx.xxxx.xxx] failed:connect timed out

このエラーが繰り返し発生する場合は、ネットワーク設定に問題がある可能性があります。

この問題を解決するには、以下の手順を実行します。

1.    Flink アプリケーションがインターネットに接続できることを確認します。

2.    Flink アプリケーションが Virtual Private Cloud (VPC) の AWS リソースで実行されている場合は、次の VPC 機能が正しく設定されていることを確認します。
       ルートテーブル
       セキュリティグループ
       ネットワークアクセスコントロールリスト (ACL)

3.    (オプション) Data Stream の VPC エンドポイントを使用して、VPC 内で通信することもできます。

送信されたリクエストのレスポンスが、設定されたタイムアウト期間内に返されませんでした

次の Curl 28 エラーは、送信されたリクエストのレスポンスが設定されたタイムアウト期間内に返されなかったことを示します。従って、タイムアウトが発生しました。

[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

タイムアウトは、一時的なネットワーク問題が原因で発生しました。また、タイムアウトは、レコードが Kinesis プロデューサーライブラリ (KPL) デーモンに送信される Data Streams への保留中のリクエストが多すぎることによって発生する場合があります。FlinkKinesisProducer は KPL を使用して Flink ストリームから Amazon Kinesis ストリームにデータを送信するため、レコードが KPL に送信されます。

この問題を解決するには、FlinkKinesisProducer オブジェクトの次の設定パラメータを変更します。

Request timeout period: producerConfig.put (“RequestTimeout”, “****”); I
  • 内部キューサイズ: FlinkKinesisProducer #setQueueLimit (queueLimit)

データ損失を避けるために、以下のパラメータを更新することもお勧めします。

Internal Queue Size: FlinkKinesisProducer #setQueueLimit (queueLimit)
time-to-live on records: producerConfig.put("RecordTtl", "*****");

setQueueLimit 値を計算する方法については、Apache ウェブサイトのバックプレッシャを参照してください。


この記事はお役に立ちましたか?

改善できることはありますか?


さらにサポートが必要な場合