亚马逊AWS官方博客

Verizon Media Group 如何将本地的 Apache Hadoop 和 Spark 迁移到 Amazon EMR

Original URL: https://aws.amazon.com/blogs/big-data/how-verizon-media-group-migrated-from-on-premises-apache-hadoop-and-spark-to-amazon-emr/
这是一篇由 Verizon Media Group 提供的客座博文。
在 Verizon Media Group (VMG),我们面对的主要问题之一是无法在所需时间内扩展计算能力,因为硬件资源的取得通常需要数个月才能完成。另外为适应工作负载变化而扩展和升级硬件在经济上不可行,升级大量的管理软件又需要长时间停机,而且要承受很大风险。在 VMG,我们依赖诸如 Apache Hadoop 和 Apache Spark 之类的技术来运行我们的数据处理管道。我们之前使用 Cloudera Manager 管理我们的集群,但它的发布周期较慢。因此,我们运行较旧的可用开放源版本,但却无法从 Apache 项目的最新错误修复和性能改进中获得好处。基于上述原因,再加上我们对 AWS 的现有投资,使我们探索将我们的分布式计算管道迁移到 Amazon EMR。Amazon EMR 是一个托管集群平台,可简化运行大数据框架的过程,如 Apache Hadoop 和 Apache Spark。本文将讨论我们在构建管道以满足我们的数据处理需求期间遇到并解决了哪些问题。

关于我们

Verizon Media 基本上是一家在线广告公司。当今大多数在线广告都采用展示广告的形式,也被称作横幅或视频广告。无论是哪种形式,所有互联网广告通常都会通过发出各种信标来追踪服务器,它们一般都是高度可扩展的网络服务器部署,其唯一的任务就是记录一个或多个事件接收器所收到的信标。

管道架构

在我们主要处理视频广告的小组中,使用在多个地理位置部署的 NGINX 网络服务器,它们会记录从视频播放器直接向 Apache Kafka 发出用于实时处理的事件,以及向 Amazon S3 发出用于批量处理的事件。我们的小组中的典型数据管道涉及处理(如输入源),应用验证和扩充数据,聚合结果,以及出于报告目的将其复制到其他目标位置。下图显示的是我们创建的典型管道架构图。

首先从 NGINX 信标服务器上获取数据。数据会采用 gzip 文件格式并以 1 分钟为间隔保存到本地磁盘。然后每分钟都会从 NGINX 服务器将数据移动到 S3 中的原始数据位置。在到达 S3 之前,文件会向 Amazon SQS 发送一条消息。Apache NiFi 侦听 SQS 消息,开始对文件进行处理。在这段时间里,NiFi 会将较小的文件分组到较大的文件,并将结果保存在 S3 的特殊路径下。特殊路径构成采用逆向时间戳加路径名称方式,确保我们将数据随机存储以避免读取瓶颈。

每个小时,我们都会在 Amazon EMR 上扩展 Spark 集群,以便对原始数据进行处理。这样的处理包括扩充与验证数据。此数据以 Apache ORC 列格式在S3中进行持久化存储。我们也会更新 AWS Glue 数据目录,在 Amazon Athena 中显示此类数据,以防止我们有可能需要针对某些问题对其进行调查。在原始数据处理结束以后,我们会缩小 Spark EMR 集群的规模,并且开始在 Amazon EMR 上依据采用 Presto 的预定义聚合模板聚合数据。聚合的数据会以 ORC 的格式保存在 S3 上专为聚合数据准备的特殊位置。

我们也会更新我们的数据目录及数据的位置,以便使用 Athena 对其进行查询。另外,我们会从 S3 将数据复制到 Vertica 以用于报告,向内部和外部客户展示此类数据。在这种情况下,我们使用 Athena 作为 Vertica 的灾难恢复 (DR) 解决方案。只要报告平台一发现 Vertica 的运行状况出现问题,我们会自动使用 Amazon Athena 进行故障切换。这一套解决方案被证明对我们来说极具成本效益。我们还将 Athena 用于另一个实时分析案例,但本文不会进行讨论。

迁移挑战

迁移至 Amazon EMR 要求我们做些设计更改,以获得最佳结果。当在云中运行大数据管道时,运营成本优化成了最关键的问题。其中两项主要的成本即存储和计算。在传统的本地 Hadoop 仓库中,它们通常采用存储节点和计算节点紧耦合模式。此种方式缺点在于任何对存储层的更改,如维护等都有可能会对计算层产生影响。在类似于 AWS 的环境中,我们可以将 S3 用于存储,Amazon EMR 用于计算,从而对存储和计算进行解耦。所有集群都是临时性的,这也为集群维护提供了重要的灵活性优势。

为进一步节约成本,我们必须想办法在最大程度上利用我们的计算层。这就意味着,我们必须切换平台,为不同管道使用多个集群,而每个集群又会根据管道的需求自动扩展。

切换到 S3

在 S3 上运行 Hadoop 数据仓库时需要考虑更多因素。S3 并不是 像HDFS 那样的文件系统,无法提供相同的一致性保证。您可以将 S3 视作可通过 REST API 访问的最终一致性对象存储。

重命名

S3 的一个关键差异是它的重命名不是一项原子操作。S3 上重命名对象时需要复制现有对象,然后进行删除操作。考虑到运行的时间成本,在 S3 上执行重命名并不可取。想要高效使用 S3,您必须弃用任何重命名操作。重命名常被用于不同提交阶段的 Hadoop 仓库,如作为一项原子操作将临时目录移动到其最终的目的地。最好的办法是避免任何重命名操作,而以一次性写入数据取而代之。

输出提交程序

Spark 和 MapReduce 作业都有提交阶段,它们将由多个分布式作业线程所产生的输出文件提交到最终输出目录。解释输出提交程序如何运作不在本文的讨论范围以内,但必须要了解的是,设计在 HDFS 上运作的标准默认输出提交程序依赖于重命名操作,它在类似 S3 的存储系统上有一套性能代偿机制,这一点我们在之前也做过说明。禁用推测执行,并切换输出提交程序的 算法版本,这对我们来说是一项有效而简单的策略。您还可以编写自己的不依赖于重命名的自定义提交程序。例如,从 Amazon EMR 5.19.0 开始,AWS 推出了适用于 Spark 的自定义 OutputCommitter,并对 S3 的写入进行优化。

最终一致性

使用 S3 的另一个重要挑战是最终一致性,而 HDFS 采用的是强一致性。S3 为新对象的 PUT 提供先写后读保证,但还不足以在它上方构建一致的分布式管道。在大数据处理中遇到的一种常见情况是,某项作业正在向目录输出一系列文件,而另一项作业正从该目录读取。对于要运行的第二项作业来说,它必须列出目录以查找必须读取的全部文件,而S3 中没有目录,我们只能列出具有相同前缀的文件,这意味着可能无法在第一项作业结束运行以后立即看到所有新文件。

要解决此问题,AWS 提供 EMRFS,它是建立在S3 之上的一致性层,使其能够像一致的文件系统那样运行。EMRFS 使用 Amazon DynamoDB,并在 S3 上保留与每一个文件有关的元数据。简而言之,在列出 S3 前缀时启用 EMRFS,实际的 S3 响应会与 DynamoDB 上的元数据进行比较。若不匹配,S3 驱动程序会轮询更长时间,并等待数据在 S3 上出现。

在一般情况下,我们发现 EMRFS 对于确保数据一致性来说很有必要。针对有些数据管道,我们使用 PrestoDB 来聚合保存在 S3 上的数据,而且我们会选择在 S3 上运行 PrestoDB 而不需要 EMRFS 支持。虽然它会使我们暴露在上游作业的最终一致性风险当中,但我们发现可以通过监控下游和上游数据的差异,并在必要时重新运行上游作业来规避此类问题。根据我们的经验,一致性问题很少发生,但可能性依然存在。如果选择在无 EMRFS 支持下运行,您应该对系统进行相应设计。

自动扩展策略

一项非常重要,但在某种程度上又显得微不足道的挑战是,如何想办法充分利用 Amazon EMR 的自动扩展功能。为了实现运营成本最优化,我们要确保所有服务器都不处于空闲状态。

要做到这一点,答案可能看起来非常简单:创建长时间运行的 EMR 集群,然后利用EMR集群自动扩展功能,以便基于参数(如集群上的可用内存)控制集群的大小。不过,有些批处理管道会在每个小时启动,并以计算密集型方式运行 20 分钟。由于处理时间十分重要,我们希望确保不会浪费任何时间。对我们来说最佳的策略是,在特定大批量处理管道启动前通过自定义脚本预先调整集群的大小。

除此以外,因为每个管道都有些差异,要在单个集群上运行多个管道并尝试在特定时刻保持最优容量可能有点困难。因此,我们转而选择在独立的 EMR 集群上运行我们的全部主要管道。这样做的好处很多,缺点寥寥无几。它的好处包括,可在必要时调整每个集群的大小,运行其管道所需的软件版本,以及在管理时不会对其他管道产生影响。它的缺点是,运行额外的名称节点和任务节点会造成少量的计算浪费。

在制定自动扩展策略时,我们最先尝试在每次需要运行管道时创建与删除集群。不过,我们很快就发现从头开始引导启动集群所花的时间比我们预期的更长。因此我们会让这些集群始终保持运行,然后通过在管道启动前添加任务节点扩大集群大小,并在管道结束时立即移除任务节点。我们发现只需通过添加任务节点,我们可以更快地启动运行我们的管道。如果遇到与长时间运行的集群有关的问题,我们可以快速回收,并从头开始新建一个。我们将继续与 AWS 合作解决这些问题。

我们的自定义自动扩展脚本为简单的 Python 脚本,它通常会在管道启动前运行。例如,假定我们的管道包括一项简单的 MapReduce 作业,包含一个Mapper和 一个reduce 阶段。而且假定该Mapper阶段的计算代价更高。我们可以编写一段简单的脚本,查看需要在下一个小时内处理的数据数量,并算出采用与 Hadoop 作业相同的方式处理此数据需要多少个Mapper。如果了解映射任务的数量,我们就可以决定需要多少虚拟机才能支撑运行全部Mapper任务。

在运行 Spark 实时管道时,情况会变得更棘手一些,因为我们有时需要在应用程序正在运行时移除计算资源。创建一个独立的实时集群与现有集群并行,根据最近一个小时内处理的数据量将其调整到所需的大小同时预留一些额外容量,并在新集群上重启实时应用程序,这对我们来说是一项有效而简单的策略。

运营成本

您可以通过 EC2 计算器评估全部 AWS 预先成本。在运行大数据管道时的主要成本来源于存储和计算成本,另外还有一些额外的少量成本,如在使用 EMRFS 时的 DynamoDB。

存储成本

首先要考虑的是存储成本。由于 HDFS 的默认副本数为 3,因此对于要存储1PB的文件,它需要 3 PB 的实际存储容量。

在 S3 上存储 1 GB 每个月会产生的成本为 ± 0.023 USD。S3 已高度冗余,所以您无需考虑副本,从而立即将成本降低 67%。您还应该思考其他写入或读取请求的成本,但这些成本通常较低。

计算成本

紧随存储之后的第二大成本支出为计算成本。要降低计算成本,您应该尽量利用预留实例定价。如果预留 3 年,在 AWS 上有 16 个 VCPU 的 m4.4xlarge 实例类型每小时会产生 0.301 USD 成本,而且全部都是预先付费。一个按需实例的每小时成本为 0.8 USD,中间存在 62% 价差。这在定期进行容量规划的较大型组织中更容易实现。若使用 Amazon EMR 平台,每台 Amazon EMR 系统都将增加一项每小时 0.24 USD 的额外费用。如果使用 Amazon EC2 Spot 实例,该成本还有可能进一步降低。如需更多信息,见实例购买选项

要实现运营成本最优化,尝试确保您的计算集群永远不会处于空闲状态,并试着基于集群的workload进行动态扩缩。

最后的思考

我们在 Amazon EMR 上运行大数据管道已超过一年时间,并将全部数据存储在 S3 上。我们的实时处理管道时常达到峰值,每秒处理超过 200 万个事件,总的处理延迟从初始事件发起到更新聚合完成大约1 分钟左右。我们喜欢 Amazon EMR 的灵活性和在数分钟内拆除与重新创建集群的能力。我们对 Amazon EMR 平台的整体稳定性也感到十分满意,并将继续与 AWS 合作加以改进。

正如我们在之前提到的那样,成本是需要考虑的一项重要因素,而且您可以认为在自己的数据中心运行 Hadoop 的成本可能会更低。不过,这样的结论要取决于所在组织高效行事的能力;其中可能有隐藏的运营成本以及弹性的减少。我们通过第一手经验得知在本地运行不是一项应该被轻忽的事情,需要对它进行大量规划与维护。我们相信诸如 Amazon EMR 之类的平台会在设计大数据系统时为我们带来不少好处。

免责声明:本博文中的内容和意见属于第三方作者,AWS 不对本博文的内容或准确性负责。


关于作者

Lev Brailovskiy 是 Verizon Media 的工程总监,领导着该公司供应方平台 (SSP) 的服务工程小组。他拥有 15 年以上设计与构建软件系统的经验。在过去六年里,Lev 花时间在私有数据中心和公有云领域设计、开发与运行大规模报告及数据处理软件。您可以通过 LinkedIn 与他联系。

 

 

 

Zilvinas Shaltys 是 Verizon 视频联合云数据仓库的技术主管。Zilvinas 拥有大规模部署各种大数据技术的多年经验。他负责从 AOL 数据中心将大数据管道迁移到 Amazon EMR。Zilvinas 目前正在致力于改善现有批处理和实时大数据系统的稳定性与可扩展性。您可以 通过 LinkedIn 与他联系。