Kinesis Data Streams の KCL アプリケーションがブロックまたはスタックされている場合のトラブルシューティング方法を教えてください。

最終更新日: 2020 年 5 月 6 日

Amazon Kinesis クライアントライブラリ (KCL) アプリケーションがスタックしたため、Amazon Kinesis Data Streams レコードを処理できません。この問題を解決するにはどうすれば良いですか。

簡単な説明

KCL アプリケーションは、以下の理由でスタックまたはブロックされることがあります。

  • レコードプロセッサ (ユーザーが実装したメソッド) がブロック操作を実行しているか、通常よりも時間がかかっています。
  • シャードに配置されたデータレコードはありません。
  • レコードの取得中に KCL がスタックされます。
  • KCL が処理をスケジュールできないか、チェックポイントに失敗します。

KCL の問題を検出してトラブルシューティングするには、次の操作を行います。

  • KCL メトリクスを分析します。
  • KCL アプリケーションの Amazon DynamoDB テーブルを分析します。
  • KCL 設定を確認します。
  • KCL 警告ログを有効にします。
  • KCL デバッグログを有効にします。

解決方法

KCL メトリクスを分析する

RecordProcessor.processRecords.Time メトリクスをモニタリングし、レコードプロセッサの processRecords メソッドにかかった時間が 60 秒を超えているかどうかを確認します。processRecords メソッドがブロックされている場合、KCL は待機しなければなりません。レコードプロセッサがジョブを完了したら、processRecords メソッドを最適化してみてください。

KCL 設定を確認する

KCL フリートの数を確認し、Kinesis データストリームのシャード数を書き留めます。シャード数が増えた場合は、KCL のシャード数に応じて maxLeasesPerWorker パラメータを増やします。

KCL アプリケーションの DynamoDB テーブルを分析する

すべての KCL アプリケーションは、KCL アプリケーションと同じ名前で DynamoDB テーブルを作成し、アプリケーションの状態を追跡します。KCL アプリケーションのトラブルシューティングを行うには、DynamoDB テーブルの列を分析します。

テーブルのチェックポイント列が更新されていない場合、processRecords メソッドロジックはスタックされます。checkpoint 列と leaseCounter 列の両方が更新されない場合、maxLeasesPerWorker=1 パラメータにより、他のワーカーがリースを引き受けることができなくなります。processRecords メソッドのブロックを解除するには、パラメータ値を増やします。

高度な KCL 警告ログを有効にする

レコードプロセッサがブロックされているかどうかを確認するには、KCL 設定の logWarningForTaskAfterMillis 値をミリ秒に設定します。その後、KCL はレコードプロセッサが完了するのを待ってから、処理時間に関する警告メッセージをログに出力します。警告メッセージが記録された場合、JVM から連続するスタックダンプをキャプチャすると、ブロックされている対象を検出するのに役立ちます。jstack コマンドを使用して、スタックトレースをキャプチャできます。

logWarningForTaskAfterMillis 値の詳細については、アマゾン ウェブ サービス - GitHub のラボを参照してください。

KCL デバッグログを有効にする

KCL デバッグログを有効にして、KCL が Kinesis Data Streams からデータの使用を停止する原因となった問題を特定できます。また、KCL アプリケーションを再起動して、他のアプリケーションの問題を解消することもお勧めします。

KCL を再起動してもまだスタックされている場合は、シャード所有権の移転が原因で問題が発生している可能性があります。これにより、再現しようとしているデータのログが KCL に存在しないという問題も発生します。KCL フリートでログ記録機能を有効にすれば、この問題を解決できます。

ログを有効にするには、以下の手順通りに実行します。

1.    [ロガー] を選択します。

2.    src/main/resources フォルダに log4.properties ファイルを作成し、ログメッセージをコンソールにリダイレクトします。

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.logger.httpclient.wire=DEBUG

注意: この例では、log4j を使用して Java でログをデバッグしています。

3.    ログメッセージをログファイルにリダイレクトします。

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/Users/harshdev/Desktop/logfolder/    <== Give the log location where you want to create log files
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.rootLogger=DEBUG, stdout, file

4.    POM ファイルに log4j 依存関係を含めます。

<dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
</dependency>