亚马逊AWS官方博客

在 Amazon EMR 上监控 Spark Streaming 应用程序

Original URL:https://aws.amazon.com/cn/blogs/big-data/monitor-spark-streaming-applications-on-amazon-emr/

对于企业级的应用而言,在将其部署到生产环境之前,有许多和应用相关的方面需要进行全面的考虑,同时您需要确保应用的运维可见性。您可以衡量应用程序的运行状况与性能表现,并向应用程序仪表板与警报机制发送指标数据,从而实现运维可见性。

在流处理应用程序当中,您需要对程序不同阶段以及各阶段内的任务进行监控。Spark提供了多种接口,来帮助您实现对应用程序的实时监控。SparkListeners是一款功能强大并且灵活的、同时面向流处理和批处理应用程序的工具。用户可以将其与Amazon CloudWatch指标、仪表板以及警报机制配合使用来进行应用程序的运维工作,并在问题发生时生成通知,或者自动地扩展集群和服务。

本文将展示如何去实施一套简单的SparkListeners,用于监控Spark Streaming的应用程序并设置警报功能。本文还将阐述如何根据CloudWatch的自定义指标,使用警报功能来实现Amazon EMR集群的自动伸缩。

监控Spark Streaming应用程序

对于生产用例,您需要提前规划以确定Spark应用程序所需要的资源量。实时应用程序通常需要满足特定的SLA标准,例如每一个批处理能够安全运行多长时间,或者每个微批处理最多可承受多久的延迟。在应用程序的生命周期中,输入流中的数据量一旦突然增加,通常要求您为应用程序添加更多资源以满足数据流带来的高强度需求。

在这些用例中,您需要关注的主要是各类常见指标,例如每个微批处理任务中的记录数量、运行过程中微批处理任务的延迟以及每个批处理任务的运行时长等。以Amazon Kinesis Data Streams为例,您可以监控IteratorAge指标。如果是使用Apache Kafka作为流数据源,您则可以监控消费端的延迟,例如最新偏移量与消费端偏移量之间的差异。对于Kafka,目前有多种开源工具可以用来实现监控。

您可以通过配置更多资源或减少未使用的资源来优化成本,从而根据环境变化实时做出反应或发出警报。

目前可供选择的Spark流处理程序监控方法多种多样。Spark本身也提供一种高效的、开箱即用的功能,即Spark指标系统。此外,Spark还能够通过多种形式来展现指标,包括HTTP、JMX以及CSV文件等。

您也可以通过日志提交的方式监控并记录应用程序中的各项指标。这要求您运行count().print(),在 Map (映射)的进程中打印指标并读取可能导致延迟的数据,将其添加到应用程序的各个阶段,或者为了测试而去增加Shuffle,虽然从长远来看这些Shuffle是不必要的。

在本文中会讨论另一种的方法:使用SparkStreaming接口。下面的截图展示了Spark UI的Streaming选项卡中的部分可用指标。

Apache Spark监听器

Spark内部依赖于SparkListeners,以事件驱动的方式实现各个内部组件间的通信。另外,Spark Scheduler(调度器)还会在各项任务的阶段发生变化时向SparkListeners提交事件。SparkListeners监听来自Spark DAGScheduler的事件,后者是Spark执行引擎中的核心。您可以使用自定义Spark监听器拦截SparkScheduler事件,从而知道每个任务或阶段的开始与完成时间。

Spark Developer APISparkListener trait当中提供了8种方法,用于调用不同的SparkEvents,主要包括Start(启动)、Stop(停止)、Failure(失败)、Completion(完成)、submission of receivers(提交到接收器)和Outputs(输出)等操作。您可以通过使用这些方法在每一个事件中执行应用程序逻辑。关于更多详细信息,请参阅GitHub上的StreamingListener.scala

要注册您的自定义Spark监听器,请在应用程序启动时设置spark.extraListeners,或者在为应用程序创建SparkContext的时候调用addSparkListener

SparkStreaming微批处理模型

SparkStreaming默认提供一套微批处理执行模型。Spark会在连续的数据流上以一定间隔地启动作业。每个微批处理中包含多个阶段,每一阶段都有自己对应的任务。各个阶段都是基于DAG和应用程序代码所定义的操作执行的,而每个阶段中的任务数量则由DStream中分区数量决定的。

在流处理程序启动时,接收器(Receiver)将以长期运行任务的形式,通过轮转的方式被分配给执行器(Executor)。

Receiver(接收器)基于blockInterval来创建数据块。接收到的数据块由Receiver(接收器)的BlockManager来负责分发,并且运行在驱动程序上的Network Input Tracker(网络输入跟踪器)将获取到数据块的所在位置以供后续处理。

在驱动程序上,RDD会被创建去存储每个 batchInterval上的数据块。每个数据块会被转换为RDD中的一个分区,每个分区则将拥有负责对应处理的任务。

下图说明了该体系的架构。

创建一个自定义SparkListener并将指标发送至CloudWatch

您可以利用CloudWatch自定义指标的功能,通过从自定义Spark监听器中收集到的自定义Spark指标,从而来触发特定操作或发布警报。

如果使用Scala语言编写程序,您可以直接使用SparkListener trait来实现自定义的流数据监听器。同样的,也可以使用Java interface或者是PySpark的pyspark.streaming.listener

在本文中,您只需要收集关于微批处理任务的指标,因此只需要覆写onBatchCompletedonReceiverError

通过OnBatchCompleted,您可以提交以下指标:

  • 心跳(Heartbeat):每完成一个批处理则计数1。所以,您可以通过求和或者平均的方法了解一个时间段内运行了多少微批处理任务。
  • 记录数(Records):每个批处理中的记录数量。
  • 计划延迟(Scheduling Delay):从批处理任务提交到实际开始运行之间的延迟时长。
  • 处理延迟(Processing delay):批处理的执行时长。
  • 总延迟(Total delay):处理延迟与计划延迟的总和。

通过OnRecieverError,当接收器(Receiver)失败时,您可以在提交数字1。详见以下代码:

/**
    * This method executes when a Spark Streaming batch completes.
    *
    * @param batchCompleted Class having information on the completed batch
    */

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    log.info("CloudWatch Streaming Listener, onBatchCompleted:" + appName)

    // write performance metrics to CloutWatch Metrics
    writeBatchStatsToCloudWatch(batchCompleted)

  }
  /**
  * This method executes when a Spark Streaming batch completes.
  *
  * @param receiverError Class having information on the reciever Errors
  */

  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { 
    log.warn("CloudWatch Streaming Listener, onReceiverError:" + appName)

    writeRecieverStatsToCloudWatch(receiverError)
  }

关于此示例中完整的Scala源代码以及一个Spark Kinesis的示例代码,请参阅AWSLabs GitHub repository。

要注册您的自定义监听器,请创建一个自定义监听器的对象实例,而后使用addStreamingListener方法在驱动程序代码中将该对象传递至StreamingContext。具体请参见以下代码:

val conf = new SparkConf().setAppName(appName)
val batchInterval = Milliseconds(1000)
val ssc = new StreamingContext(conf, batchInterval)
val cwListener = new CloudWatchSparkListener(appName)

ssc.addStreamingListener(cwListener)

在运行应用程序时,您可以在EMR集群所在的帐号中看到CloudWatch指标。详见以下截图。

使用示例代码

本文为您提供一套现成的AWS CloudFormation模板,其包含了所有的代码。您可以从GitHub代码库中下载emrtemplate.json文件。这套模板会在公有子网中启动一个EMR集群和一个包含3个Shard的Kinesis数据流,并为此数据流赋予了默认的AWS Identity and Access Management(IAM)角色。该Spark Kinesis流处理程序是一个简单的单词计数程序,其通过Amazon EMR的步骤脚本编译和打包而成,并且包含了自定义的StreamListener

在CloudWatch中使用应用程序警报

您需要根据应用程序对于SLA的要求来设置警报。一般来讲,您不希望批处理的执行时长超过微批处理任务之间的时间间隔,即处理速度落后于流的输入速度,因为这会导致计划的批处理任务出现排队。另外,如果由于数据的激增,导致接收器(Receiver)读取数据的速率超过了批处理任务的处理能力,这将会引发读取的记录溢出到磁盘上和更大的处理延迟。为此,您可以设置CloudWatch警报,保证系统在处理延迟接近应用程序的batchInterval时发出通知。关于设置警报的更多详细信息,请参阅使用Amazon CloudWatch警报

本文中的CloudFormation模板包含两个监控警报。一个是对于processingDelays指标的异常检测;另外一个是以schedulingDelay与totalDelay之比或者(schedulingDelay/totalDelay)*100作为阈值的监控警报。

对流处理程序进行规模伸缩

在扩张伸缩方面,在数据量增长时,DStream分区也会变的越来越多,这也取决于流处理程序的blockIntervals。除了需要保证系统性能从而可以处理完接收到的记录数,以及保证在批处理间隔之内完成每一个任务的处理之外,接收器(Receiver)还需要可以支持到数据记录的突然涌入。

源工作流应该为接收程序提供充足的带宽以从流中快速读取数据,并保证有充足的接收程序以合适的速率执行读取操作,及时消费源中存在的记录。

如果您的DStreams以接收器(Receiver)与Write Ahead Logging (WAL)作为底层的基础,则需要提前考虑接收器(Receiver)的数量。当应用程序启动之后,除非重新启动应用程序,否则将不能改变接收器(Receiver)的数量。

在默认情况下,当Spark Streaming应用程序启动之后,除非提前已经为接收器(Receiver)定义了偏好的位置,否则驱动程序将以循环方式在可用的执行器(Executor)上调度接收器(Receiver)。当所有执行器(Executor)都被分配到接收器(Receiver)时,剩余的接收器(Receiver)会在执行器(Executor)上继续接受调度,从而均衡每个执行器(Executor)上的接收器(Receiver)的数量。接收器(Receiver)会以长期运行任务的形式驻留在执行器(Executor)当中。关于在执行器(Executor)上调度接收器(Receiver)的更多详细信息,请参阅GitHub上的ReceiverSchedulingPolicy.scala以及Spark问题网站上的SPARK-8882

有时候,您可能希望降低接收器(Receiver)的速度以减少微批处理任务中的数据量,从而保证运行时间不超过微批处理操作的间隔时间。要降低接收器(Receiver)的速度,以保证在无法跟上记录增长时保留记录的内容,您可以启用BackPressure功能以适应接收器(Receiver)的当前输入速率。为了实现这个目的,可以将spark.streaming.backpressure.enabled设置为true。

另一个可以考虑的因素,是对流处理程序进行动态分配。在默认情况下,Amazon EMR会启用spark.dynamicAllocation,且此选项与spark.streaming.dynamicAllocation为互斥关系。如果大家希望让驱动程序为DStream任务请求更多执行器(Executor)的话,则需要将spark.dynamicAllocation.enabled设定为false并将spark.streaming.dynamicAllocation.enabled设定为true。Spark会定期查看平均的批处理持续时间,如果高于scale-up的比率,则会请求更多执行器(Executor);如果低于scale-down的比率,则会释放部分闲置的执行器(Executor)。关于更多详细信息,请参阅GitHub上的ExecutorAllocationManager.scala以及Spark流处理编程指南

ExecutorAllocationManager会持续查看批处理任务的平均运行时间,并根据scale-up和scale-down的比率对执行器(Executor)的数量做出调整。因此,你可以在Amazon EMR中设置自动规模伸缩,特别是对于任务实例组,借此根据ContainerPendingRatio实现节点的添加与删除,以及将接收器(Receiver)的PreferredLocation分配给各核心节点(Core Node)。在本文的示例代码中提供了一个自定义的KinesisInputDStream,其允许为每个请求的接收器(Receiver)分配偏好的位置。这项函数的基本功能就是返回一条最适合放置接收程序的主机名称。GitHub repo中还提供一个示例应用程序,其使用customKinesisInputDStreamcustomKinesisReciever为各接收程序请求preferredLocation

在对规模进行收缩时,Amazon EMR会在任务实例组中提名运行容器数量最少的节点以作为备选清退对象。

关于设置自动规模伸缩机制的更多详细信息,请参阅Using Automatic Scaling with a Custom Policy for Instance Groups。示例代码中包含一项schedulingDelay阈值。一般来讲,您应根据batchIntervalsprocessingDelay设定该阈值。schedulingDelay的增长通常意味着系统中的资源已经不足以完成任务的调度。

下表总结了启动Spark Streaming作业时,可能需要调整的各项配置属性。

配置属性 默认
spark.streaming.backpressure.enabled False
spark.streaming.backpressure.pid.proportional 1.0
spark.streaming.backpressure.pid.integral 0.2
spark.streaming.backpressure.pid.derived 0.0
spark.streaming.backpressure.pid.minRate 100
spark.dynamicAllocation.enabled True
spark.streaming.dynamicAllocation.enabled False
spark.streaming.dynamicAllocation.scalingInterval 60 Seconds
spark.streaming.dynamicAllocation.minExecutors max(1,numReceivers)
spark.streaming.dynamicAllocation.maxExecutors Integer.MAX_VALUE

使用监听器监控结构化数据流

结构化的数据流仍然是通过微批处理的方式处理记录,并在从接收器(Receiver)收到数据时触发查询。您可以使用另一个监听器接口StreamingQueryListener来监控这些查询。本文提供了一个用于监控Kafka上结构化数据流的示例监听器,同时还运行了一个示例应用程序。关于更多详细的信息,请参阅CloudWatchQueryListener.scala GitHub。下图展示了通过自定义StreamingQueryListerer收集到的部分CloudWatch自定义指标。

对EMR集群进行收缩

在启动一个Spark Streaming应用程序时,Spark会将接收器(Receiver)平均地调度至所有可用的执行器(Executor)之上。而在将EMR集群进行收缩时,Amazon EMR会依据自动伸缩的规则来提名当前实例组中任务运行量较少的节点。尽管Spark接收器(Receiver)是一个长期运行的任务,但Amazon EMR会等待yarn.resourcemanager.decommissioning.timeout所设置的超时时长,或者在NodeManagers下线时,去正常地终止节点来完成集群的收缩。当然,即使接收器(Receiver)存在,某些执行器(Executor)也有风险会被意外终止。因此,您应该考虑使用足够Spark数据块的复制和DStreams的CheckPointing机制,并且最好能定义一个PreferedLocation以消除接收器(Receiver)意外丢失的风险。

指标的计费

通常情况下,Amazon EMR指标不会导致CloudWatch的费用。但如果使用自定义指标,则会按照CloudWatch指标的定价产生额外的费用。关于更多详细信息,请参阅Amazon CloudWatch定价。此外,Spark Kinesis Streaming使用Kinesis Client Library,其发布的自定义CloudWatch指标同样会依据CloudWatch指标的定价产生费用。关于更多详细信息,请参阅利用Amazon CloudWatch 监控 Kinesis 客户端库

总结

如何监控和调优Spark Streaming实时应用程序是一项非常挑战的工作,您需要随时应对环境中发生的种种变化。另外,还需要监控源数据流和作业的输出,从而可以了解全面的情况。Spark是一套非常灵活并且丰富的框架,它能够提供多种方式来对任务进行监控。本文主要探讨了其中一种有效的方法,使用SparkListeners并将提取到的指标与CloudWatch指标相集成,从而实现监控Spark Streaming微批处理程序的性能。

本篇作者

Amir Shenavandeh

AWS公司Hadoop系统工程师。他与开源社区通力合作,通过各类开源应用程序、Hadoop生态系统应用的开发与改进等工作为客户提供架构指导与技术支持。