如何为 Kinesis Data Streams 排查受阻或停滞的 KCL 应用程序?

上次更新时间:2020 年 5 月 6 日

我的 Amazon Kinesis 客户端库 (KCL) 应用程序停滞,并且无法处理任何 Amazon Kinesis Data Streams 记录。如何解决此问题?

简短描述

KCL 应用程序可能会出于以下原因停滞或受阻:

  • 记录处理器(用户实施的方法)执行了阻挡操作或需要的时间超过正常情况。
  • 没有数据记录放入分区。
  • 当检索记录时 KCL 发生停滞。
  • KCL 无法将处理或故障安排到检查点。

您可以通过下面的操作检测和排查 KCL 问题:

  • 分析 KCL 指标。
  • 分析 KCL 应用程序的 Amazon DynamoDB 表。
  • 检查 KCL 配置。
  • 启用 KCL 警告日志。
  • 启用 KCL 调试日志。

解决方法

分析 KCL 指标

监控 RecordProcessor.processRecords.Time 指标并确认记录处理器的 processRecords 方法所用的时间是否超过 60 秒。如果您的 processRecords 方法被阻挡,则 KCL 必须等待。在您的记录处理器完成其作业后,请尝试优化您的 processRecords 方法。

检查 KCL 配置

检查 KCL 队列的数量并记下 Kinesis 数据流中的分区数量。如果分区数量增加,则根据 KCL 中的分区数量升高 maxLeasesPerWorker 参数。

分析 KCL 应用程序的 DynamoDB 表

每个 KCL 应用程序创建一个 DynamoDB 表,名称与 KCL 应用程序相同,以便于跟踪应用程序的状态。要排查 KCL 应用程序,请分析 DynamoDB 表中的列。

如果表中的检查点列未更新,则表明 processRecords 方法逻辑停滞。如果检查点列和 leaseCounter 列都未更新,则表明 maxLeasesPerWorker=1 参数正在阻止其他工作线程占用租约。要取消阻挡 processRecords 方法,升高该参数的值。

启用高级 KCL 警告日志

要验证记录处理器是否已被阻挡,将 KCL 配置的 logWarningForTaskAfterMillis 值设置为毫秒级。KCL 随后会等待记录处理器完成任务,然后再向日志发出关于处理时间的警告消息。如果记录了警告消息,从 JVM 捕获连续的堆栈转储可能有助于揭示被阻挡的是什么。您可以使用 jstack 命令捕获任何堆栈跟踪。

如需了解更多关于 logWarningForTaskAfterMillis 值的信息,请参阅 GitHub 中的 Amazon Web Services – 实验室。

启用 KCL 调试日志

您可以启用 KCL 调试日志,以找出导致 KCL 停止消耗来自 Kinesis Data Streams 的数据的问题。重新启动 KCL 应用程序以清除任何其他应用程序问题也属于最佳实践。

如果您重新启动 KCL 后它仍然停滞,则可能存在分片所有权转移导致的问题。这还会导致 KCL 未为您尝试复制的数据保存日志的问题。为了解决此问题,您可以在 KCL 队列上启用日志记录功能。

要启用日志,请执行以下步骤:

1.    选择一个记录器。

2.    在 src/main/resources 文件夹中创建一个 log4.properties 文件以将日志消息重定向到控制台:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.logger.httpclient.wire=DEBUG

注意:在本示例中,我们使用 log4j 在 Java 中调试日志。

3.    将日志消息重定向到日志文件:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/Users/harshdev/Desktop/logfolder/    <== Give the log location where you want to create log files
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.rootLogger=DEBUG, stdout, file

4.    在 POM 文件中包含 log4j 依赖关系:

<dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
</dependency>