亚马逊AWS官方博客

Amazon EMR 对 Spark 集群提升弹性并增强恢复能力的实现

Original URL:https://aws.amazon.com/blogs/big-data/spark-enhancements-for-elasticity-and-resiliency-on-amazon-emr/

在工作流程完成或运行轻型作业时,客户可以利用 Amazon EMR 的弹性通过缩减集群规模来来节省成本。这同样适用于使用低成本 Amazon EC2 Spot 实例来启动集群。借助 Amazon EMR 中的 自动扩展功能,客户可以根据集群使用情况或其他与作业相关的指标来动态扩缩集群。虽然这些功能可帮助您有效利用资源,但也可能会导致 EC2 实例在作业运行过程中关闭。这可能会导致计算和数据丢失,从而影响作业的稳定性或者因重新计算产生重复工作。为了能够从容关闭节点而不影响正在运行的作业,Amazon EMR 使用了 Apache Hadoop 的停用机制,这是 Amazon EMR 团队开发并回馈社区的一项功能。对于大多数 Hadoop 工作负载来说效果很好,但不太适用于 Apache Spark。目前,Spark 在处理节点丢失方面存在各种缺陷。这可能会导致在尝试恢复和重新计算丢失的任务和数据时作业卡住,而且在某些情况下最终导致作业崩溃。有关 Spark 中某些未解决问题的更多信息,请访问以下链接:

为了避免其中的某些问题并帮助客户在使用 Spark 时充分利用 Amazon EMR 的弹性功能,Amazon EMR 对开源 Spark 进行了定制,增强了节点丢失的恢复能力。将重新计算的工作量将至最低,并且加快了作业从节点故障和 EC2 实例终止恢复的速度。Amazon EMR 发行版 5.9.0 及更高版本提供这些改进功能。

本博文概述了开源 Spark 处理节点丢失时存在的问题,以及为了解决这些问题对 Amazon EMR 做出的改进。

Spark 如何处理节点丢失

在 Spark 作业运行期间,如果节点关闭,会带来以下风险:

  • 该节点上正在运行的任务可能无法完成,而必须转移到其他节点上运行。
  • 节点上的Cached RDDs(弹性分布式数据集resilient distributed dataset)可能会丢失。虽然这确实会影响性能,但不会导致故障或影响应用程序的稳定性。
  • 内存中的Shuffle output文件或节点上写入磁盘的文件可能会丢失。由于 Amazon EMR 默认启用External Shuffle Service务,因此shuffle output将写入磁盘。丢失shuffle output可能会导致应用程序停止运行,直到在另一个活跃节点上重新计算才会继续运行,因为后续任务可能依赖于它们。有关shuffle操作的更多信息,请参阅Shuffle operations

要想从节点丢失恢复,Spark 应执行以下操作:

  • 如果丢失了正在运行的任务,则必须将这些任务安排到其他节点上。另外,还必须恢复对未安排的剩余任务进行计算。
  • 必须重新执行产生这些shuffle output的任务,从而重新计算在丢失的节点上计算的shuffle output。

以下是节点丢失后 Spark 恢复事件的顺序:

  • Spark 将节点上正在运行的任务视为失败,然后在另一个活跃节点上重新运行这些任务。
  • 如果该节点具有后续任务所需的shuffle output文件,则其他活跃节点上的目标执行程序在尝试从故障节点获取缺少的shuffle blocks时,会收到 FetchFailedException
  • 发生 FetchFailedException 时,目标执行程序会在一段时间内重新尝试从故障节点获取数据块,具体时间由 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait 配置值决定。执行完所有重试尝试后,会将故障传播到driver
  • Driver收到 FetchFailedException 后,会将故障发生时正在运行的shuffle stage标记为失败,然后停止执行。它还会把无法从中获取shuffle数据块的节点或执行程序上的shuffle output标记为不可用/丢失,以便可以对其执行重新计算。这将触发上一个map stage重新尝试重新计算缺少的shuffle数据块。
  • 计算出缺少的shuffle output后,将触发对失败的shuffle stage的重新尝试,以从停止的位置恢复作业。然后,它会运行失败或尚未安排的任务。

Spark 在处理节点丢失方面存在的问题

Spark 的恢复过程可帮助其恢复在任何云环境中可能发生的+执行程序和节点故障。但是,Spark 仅在节点已发生故障且在尝试获取shuffle数据块时收到了 FetchFailedException 之后,才会开始恢复进程。这会导致本部分所述的一些问题。

Amazon EMR 可以根据手动调整大小、EC2 触发的 Spot 实例终止或自动扩展事件,判断哪些节点即将关闭,因此可以尽早开始恢复。它可以立即向 Spark 发出有关这些节点的通知,以便 Spark 可以主动采取措施来妥善处理节点丢失问题并尽早开始恢复。但是,Spark 目前没有用来接收节点即将关闭(例如 YARN 停用)的通知的机制。所以,它不能立即采取相关措施来帮助更快地恢复。因此,Spark 恢复存在一些问题:

  • 节点在map stage过程中关闭,如下图所示:

在这种情况下,不必安排shuffle stage,且应用程序必须等收到 FetchFailedException 后,才能重新计算丢失的shuffle。这非常耗时。因此,最好是在map stage立即重新计算所有丢失的shuffle输出,然后再继续执行shuffle stage。

  • 节点在shuffle stage进行过程中关闭,如下图所示:

如果可以立即就节点丢失向 Spark 发出通知,而不是依靠 FetchFailedException 并重试获取,则可以节省恢复时间。

  • 当获得第一个 FetchFailedException 时,Spark Driver将开始重新计算。它将丢失的节点上的shuffle文件视为缺少的文件。但是,如果多个节点同时关闭,则在上一个map stage的第一次重试中,Spark Driver仅重新计算从其收到 FetchFailedException 的第一个节点shuffle输出。从首次收到获取失败消息到开始重试之间的短暂时间内,driver可能会从其他故障节点收到获取失败的消息。因此,它可以在同一次重试中为多个丢失的节点重新计算shuffle输出,但无法保证一定能做到。在大多数情况下,即使节点同时关闭,Spark 也需要对映射和shuffle stage进行多次重试,以重新计算所有丢失的shuffle输出。这很容易导致作业长时间阻塞。理想情况下,Spark 可以在一次重试中重新计算在大致在同一时间丢失的所有节点上的shuffle输出。
  • 只要 Spark 可以访问即将关闭的节点,就可以继续在其上安排更多任务。这导致要计算更多的shuffle输出,最终导致可能需要重新计算。理想情况下,可以将这些任务重定向到运行状况良好的节点,以防止重新计算并缩短恢复时间。
  • Spark 对中止作业之前某个stage允许的连续失败尝试设置了次数限制。次数可以通过 spark.stage.maxConsecutiveAttempts 进行配置。当节点发生故障并发生 FetchFailedException 时,Spark 会将正在运行的shuffle stage标记为失败,并在计算缺少的shuffle输出之后触发重试。在shuffle stage频繁扩展节点很容易导致stage故障,致使达到阈值并中止作业。理想情况下,当某个stage因有效理由(如手动缩减、自动扩展事件或 EC2 触发 Spot 实例终止)失败时,应该可以通知 Spark 不要将其计入该stage的spark.stage.maxConsecutiveAttempts

Amazon EMR 如何解决这些问题

 本部分介绍了 Amazon EMR 为解决上一部分指出的问题,而对 Spark 进行的三项主要改进。

与 YARN 的停用机制集成

Amazon EMR 上的 Spark 使用 YARN 作为集群资源的基础管理器。Amazon EMR 自身拥有适用于 YARN 的从容停用机制(Graceful Decommission),该机制无需在即将停用的节点上调度新容器即可从容关闭 YARN 节点管理器。对于正在运行的容器上的现有任务,Amazon EMR 通过在节点停用前等待其完成或超时来实现这一机制。目前,该停用机制已回馈给开源 Hadoop。

我们将 Spark 与 YARN 的停用机制集成在一起,以便当节点在 YARN 中进入即将停用已停用状态时通知 Spark Driver。如下图所示:

此通知使Driver可以采取适当的措施并尽早开始恢复,因为所有节点在删除之前都要经历停用过程。

扩展 Spark 的黑名单机制

YARN 的停用机制非常适合 Hadoop MapReduce 作业,因为它不会在即将停用的节点上再启动容器。这有助于防止在该节点上安排更多 Hadoop MapReduce 任务。但是,它不太适合 Spark 作业,因为在 Spark 中,为每一个执行程序都分配了一个 YARN 容器,YARN 容器是一直存在并持续接收任务的。

阻止启动新容器只会阻止将更多执行程序分配给节点。处于活跃状态的执行程序/容器将继续安排新任务,直到该节点关闭。它们最终可能会失败,并且必须重新运行。同样,如果这些任务写入shuffle输出,它们也会丢失。这增加了重新计算工作量和恢复所需的时间。

为了解决这个问题,Amazon EMR 扩展了 Spark 的黑名单机制,以在 Spark Driver收到 YARN 停用信号时将其列入黑名单。如下图所示:

 

这样可以防止在列入黑名单的节点上安排新任务,转而将任务安排在运行状况良好的节点上。节点上运行的任务完成后即可安全地停用该节点,而不会产生任务失败或丢失的风险。由于即将关闭的节点没有产生新的shuffle输出,因此加快了恢复过程。这有助于减少需要重新计算的shuffle输出的数量。如果节点从即将停用状态恢复正常,并再次处于活跃状态,则 Amazon EMR 将从黑名单中删除该节点,以便可以为其安排新任务。

在 Amazon EMR 中,此“黑名单扩展”默认是启用的,而且 spark.blacklist.decommissioning.enabled 属性设置为 true。您可以使用 spark.blacklist.decommissioning.timeout 属性控制将节点列入黑名单的时间,默认设置为 1 小时,等于 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 的默认值。我们建议将 spark.blacklist.decommissioning.timeout 设置为等于或大于 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 的值,以确保 Amazon EMR 在整个停用期间将节点列入黑名单。

适用于已停用节点的操作

节点停用后,将不会再为其安排新任务,且活跃容器会变为空闲(或超时到期),节点进入停用状态 当 Spark Driver接收到已停用的信号后,它可以额外采取以下措施来加快恢复过程,而不是等待获取失败:

  • 已停用节点上的所有shuffle输出均未注册,因此将其标记为不可用。Amazon EMR 默认启用该功能,而且 spark.resourceManager.cleanupExpiredHost 设置为 true。这具有以下优势:
    • 如果节点在map stage丢失并停用,Spark 会启动恢复过程并重新计算已停用节点上丢失的shuffle输出,然后再执行下一stage。这可以防止shuffle stage获取失败,因为 Spark 在map stage结束时已计算了所有shuffle数据块并且这些数据块处于可用状态,从而显著提高了恢复速度。
    • 如果节点在shuffle stage丢失,则尝试从丢失的节点获取shuffle数据块的目标执行程序会立即发出shuffle输出不可用的通知。然后,它会将故障发送给Driver,而不是多次重新尝试获取并多次失败。随后,Driver立即停止执行stage,并开始重新计算丢失的shuffle输出。这减少了尝试从丢失的节点获取shuffle数据块所花费的时间。
    • 当集群节点数量扩展到很大时,取消注册shuffle输出的优势最明显。因为所有节点都在大致相同的时间关闭,所以它们会在大致相同的时间停用,并且会取消注册它们的shuffle输出。当 Spark 安排第一次重新尝试计算缺少的数据块时,它将发出有关已停用节点缺少数据块的通知,并且仅尝试一次即可恢复。这可以显著加快开源 Spark 的恢复过程,在这一过程中可以多次重新安排stage,以重新计算所有节点上缺少的shuffle输出,并防止作业长时间卡在故障和重新计算状态。
  • 默认情况下,当由于从停用的节点获取失败而导致stage失败时,Amazon EMR 不会将stage失败计入 spark.stage.maxConsecutiveAttempts 设置的stage允许的最大故障数。这一点,由 spark.stage.attempt.ignoreOnDecommissionFetchFailure 的设置决定,设置为True可实现上述功能。在stage由于一些有效原因(例如,手动调整大小、自动扩展事件或 EC2 触发的 Spot 实例终止)而多次失败时,这可以防止作业失败。

小结

本文介绍了 Spark 如何处理节点丢失以及在 Spark 作业运行期间扩展集群时可能发生的一些问题。还介绍了 Amazon EMR 针对 Spark 定制的功能,以及可使 Spark 在 Amazon EMR 上提高恢复能力的配置,可帮助您充分利用 Amazon EMR 提供的弹性功能。

如果您有任何问题或建议,请留言。

本篇作者

Udit Mehrotra

Udit Mehrotra 是 Amazon Web Services 的一名软件开发工程师。他致力于开发 MER 前沿功能,并参与了 Apache Spark、Apache Hadoop 和 Apache Hive 等开源项目。闲暇之余,他喜欢弹吉他、旅行、刷剧以及和朋友一起出去玩。

校译作者

谷雷

AWS APN 合作伙伴解决方案架构师,主要负责 AWS (中国)合作伙伴的方案架构咨询和设计工作,同时致力于 AWS 云服务在国内的应用及推广。