亚马逊AWS官方博客
Verizon Media Group 如何从本地 Apache Hadoop 与 Spark 迁移至 Amazon EMR
本文为Verizon Media Group撰写的特约文章。
在Verizon Media Group(VMG),我们面临的一大主要问题,就是无法在理想的时间之内完成计算容量扩展——硬件采购通常需要几个月才能落实到位。这就意味着,我们无法让硬件的扩展与升级与工作负载变化匹配起来,这不仅造成了巨大的资金浪费,同时也给冗余管理软件的升级流程带来大量停机时间,进而极大提升运营风险。
在VMG,我们使用Apache Hadoop以及Apache Spark等技术方案运行我们自己的数据处理管道。我们之前曾经使用过Cloudera Manager进行集群管理,但其发布周期过慢,跟不上技术发展与业务需求的变化。结果就是,我们只能使用较为陈旧的开源版本,导致无法充分使用Apache项目上的最新bug修复与性能改进成果。出于以上原因,再加上我们对AWS的现有投资,最终促使VMG决定尝试将分布式计算管道迁移至Amazon EMR当中。
Amazon EMR是一套托管集群平台,能够简化各类大数据框架(例如Apache Hadoop与Apache Spark)的运行流程。
本文主要讨论我们在构建数据处理管道时,经常遇到的问题以及相关解决办法。
关于我们
Verizon Media在本质上属于一家在线广告企业。目前,大多数在线广告主要通过展示广告(亦称「横幅广告」或「视频广告」)形式实现。无论具体采取哪种方式,所有互联网广告都需要发送各种信标以实现服务器跟踪。这些服务器主要为具备高度可扩展性的Web服务器部署,负责将接收到的信标记录至一个或者多个事件接收器当中。
管道架构
在我们负责处理视频广告的专项小组中,我们主要使用部署在多个地理位置的NGINX Web服务器将由视频播放器触发的事件直接记录至Apache Kafka以供实时处理,而后将结果记录至Amazon S3接受后续批处理。我们使用的典型数据管道一般需要完成输入内容摘要、应用验证以及充实例程、结果数据汇总,并将其复制至其他目的地以供报表等具体操作任务。下图所示,为我们典型管道的基本架构。
我们首先在NGINX信标服务器上获取数据。数据以1分钟的间隔以gzip文件格式被存储在本地磁盘之上。每分钟,我们都会将数据从NGINX服务器进一步转移至S3中的原始数据位置。在登录S3时,文件将消息发送至Amazon SQS。Apache NiFi开始监听SQS消息并启动文件处理。在这段时间内,NiFi将把较小的文件组织成数个较大文件,并将结果存储在S3某特殊路径的临时位置当中。路径名称使用反向时间戳进行组合,以确保我们的数据始终使用随机存储位置,避免引发读取操作瓶颈。
每小时,我们都会在Amazon EMR上对一套Spark集群进行向外扩展,借此处理原始数据。整个处理过程包括数据的验证与充实。这些数据以Apache ORC列格式存储在S3上的永久位置文件夹当中。我们还更新了AWS Glue数据目录,用于在Amazon Athena中公开此数据,以供出现问题时进行调查取证。原始数据处理完成后,我们将削减Spark EMR集群的规模,并使用Amazon EMR上基于预定义的Presto聚合模板对数据进行聚合。聚合数据以ORC格式存储在S3之上用于保存聚合数据的特定位置。
我们还使用数据位置更新数据目录,以保证可以通过Athena随时执行查询。此外,我们将数据从S3处复制至Vertica中以进行报表生成,进而将数据公开给各内部与外部客户。在这种情况下,Athena将作为Vertica的灾难恢复(DR)解决方案。每当报表平台发现Vertica运行状态不佳时,我们都会将负载自动故障转移至Amazon Athena。事实证明,这套解决方案为我们带来了出色的成本效益。在实时分析当中,我们还拥有另一个Athena用例,但与本文主题无关,因此不具体讨论。
迁移的挑战
要迁移至Amazon EMR,要求我们在设计层面做出调整以获得最佳结果。在云端运行大数据管道时,运营成本优化无疑是最重要的目标之一。而成本的两大组成部分,分别为存储成本与计算成本。在传统的本地Hadoop仓库当中,各仓库会被耦合为存储节点,同时亦充当计算节点。这种耦合的缺点在于,指向存储层的一切变更(例如维护)都会给计算层造成影响。在AWS等云环境中,我们可以专门指定S3作为存储方案、Amazon EMR作为计算方案,借此轻松实现存储与计算资源的剥离。由于各集群仅在工作负载出现时临时运行,因此这将给集群维护工作带来巨大的灵活性优势。
为了进一步节约成本,我们还需要找到在计算层上实现资源利用率最大化的方法。这意味着我们需要将原本的整体平台转换为与多条不同管道对接的多个集群,其中各个集群都能够根据管道需求实现自动规模伸缩。
切换至S3
在S3上运行Hadoop数据仓库,我们需要留心以下注意事项。S3并不属于HDFS这类文件系统,而且并不提供同样的即时一致性保证。大家可以将S3理解成一种最终一致性对象存储库,可以使用REST API对其执行访问。
重命名
S3的一大核心差异,在于其重命名(rename)不属于原子操作。S3上的所有重命名操作都会先执行复制操作,而后再执行删除操作。考虑到运行时间成本,我们最好不要在S3上直接执行重命名。为了高效使用S3服务,大家应注意删除所有重命名操作。重命名通常被用于Hadoop仓库的各个提交阶段当中,例如通过原子操作将临时目录移动至其最终目的地。最好的方法就是避免一切重命名操作,转而选择一次性数据写入。
输出committers
Spark与Apache MapReduce作业中都包含提交阶段,这些阶段负责将多个由分布式工作程序生成的输出文件提交至最终输出目录。受篇幅所限,本文无法具体解释输出committers的实际工作原理,但最重要的是,默认输出committers在设计上基于HDFS标准、因此会在其中涉及重命名操作。如前所述,重命名操作会给S3这类存储系统造成严重的性能损失,因此最简单的策略就是禁用推测执行并切换输出committers的算法版本。当然,大家也可以自行编写定制化committers程序,确保其中不涉及任何重命名操作。例如,从Amazon EMR 5.19.0版本开始,AWS正式提供自定义OutputCommitter for Spark,其中针对S3进行了写入优化设计。
最终一致性
在S3的实际使用中,另一大挑战在于其具有最终一致性属性,这与HDFS的强一致性完全不同。S3虽然能够为新对象的PUTS操作提供写后读取保证,但这还不足以始终支撑起分布式管道的一致性需求。大数据处理中经常出现的一类情况在于,我们的某一项作业会将文件列表输出至目录,而另一项作业则对该目录执行读取操作。要运行第二项作业,其必须列出目录内容以查找需要读取的所有文件。在S3中不存在这样的目录。我们仅列出具有相同前缀的文件,意味着在第一项作业完成之后,我们可能无法立即看到所有新文件。
为了解决这个问题,AWS提供EMRFS,即添加在S3之上的一致性层,可以使S3拥有与一致性文件系统一致的运行表现。EMRFS使用Amazon DynamoDB,并在S3上保留关于各个文件的元数据。简单来说,在启用EMRFS之后,当我们列出某S3文件前缀时,实际S3响应结果将与DynamoDB上的元数据进行比较。如果存在不匹配,则S3驱动程序会延长轮询时间,以等待S3完成数据更新。
一般而言,EMRFS能够保证良好的数据一致性。对于一部分数据管道,我们会选择使用不支持EMRFS的PrestoDB,并使用PrestoDB对S3上的存储数据进行聚合。虽然这会给上游作业带来最终一致性不匹配的风险,但我们发现只要监控上游与下游数据间的差异,并在必要时重新运行上游作业即可解决这类问题。根据我们的经验,一致性问题的发生几率不高,但确实存在。如果选择不使用EMRFS,则应在系统设计中考虑到相关影响。
自动规模伸缩策略
另一个重要但却经常受到忽视的挑战,就是弄清楚该如何使用Amazon EMR的自动规模伸缩功能。为了获得最佳运营成本,我们需要尽可能减少处于闲置状态的服务器数量。
实现这一目标的方式看似简单:创建一套长期运行的EMR集群,并通过随时可用的自动规模伸缩功能根据参数(例如集群上的可用内存容量)控制集群的大小。但是,我们的某些批处理管道每小时只启动一次,每次运行20分钟,且带来巨大的计算量需求。由于处理速度非常重要,我们必须保证不浪费任何时间。对我们来说,最佳策略是在特定批处理管道启动之前,通过自定义脚本预先调整集群的大小。
另外,我们往往很难在单一集群之上运行多条数据管道,并在任意给定时间将其保持在最佳容量水平。这是因为不同的管道在负载需求方面总是有所区别。相反,我们选择在多个独立的EMR集群上对应运行各主要管道。这种作法有很多优势,缺点则只有小小的一个。优点在于,各个集群都能够根据速度需求精确调整大小,运行最适合其实际需求的软件版本,并在不影响其他管道的前提下接受管理。而小缺点是,多集群加多管道的组合会额外增加命名节点与任务节点,导致少量资源浪费。
在制定自动规模伸缩策略时,我们首先尝试在每次需要运行管道时创建并删除集群。但我们很快发现,从零开始引导集群所耗费的时间往往远超我们的预期。相反,我们应当让这些集群始终保持运行状态,并在管道启动前添加任务节点的方式扩展集群规模,并在管道结束时立即删除相应任务节点。我们发现,只需要添加更多任务节点,我们就能够快速开始运行管道。如果在集群的长期运行中出现其他问题,我们可以快速回收并从零开始创建新的集群。在解决这些问题的过程中,我们一直与AWS保持密切合作。
我们的自定义自动规模伸缩脚本是一套简单的Python脚本,通常会在管道启动之前运行。例如,假定我们的管道由具有单一映射与归约阶段的简单MapReduce作业构成,同时假设映射阶段的计算成本更高,那么我们可以编写出一个简单脚本,预测下一小时需要处理的数据量,并借助Hadoop作业的方式计算出处理这部分数据所需要的映射器数量。当我们了解当前映射任务数量后,即可判断出并行运行所有映射任务所需要的服务器数量。
在运行Spark实时管道时,事情要更棘手一些。这是因为我们有时候必须删除应用程序运行时中的计算资源。对我们来说,最简单有效的策略之一,是在现有集群之外并行创建一个独立的实时集群,根据最近一小时内的数据处理量将其扩展至所需的大小,而后增加部分容量再重新启动新集群上的实时应用程序。。
运营成本
我们可以使用EC2计算器提前预估所有AWS成本。运行大数据管道的主要成本为存储成本与计算成本,外加使用EMRFS时的DynamoDB等少部分额外成本。
存储成本
首先需要考虑的自然是存储成本。由于HDFS的默认复制倍数为3,因此其实际需要的存储容量为3 PB,而非1 PB。
在S3上存储1 GB数据的成本每月约为0.023美元左右。S3已经具有高度冗余性,因此我们无需考虑额外的复制倍数,可以将存储成本立即降低67%。当然,大家还需要考虑由写入或读取请求带来的其他开销,但这部分成本通常极低。
计算成本
在存储之外,第二大成本为计算成本。为了降低这部分成本,大家应尽可能使用预留实例以享受更低的资源费率。在AWS上,如果以3年为承诺使用周期,则具有16个vCPU的m4.4xlarge实例类型的预留实例成本每小时为0.301美元;相比之下,按需实例成本每小时为0.8美元,相差62%。在大型组织当中,容量规模往往更容易实现。在使用Amazon EMR平台时,每台Amazon EMR设备每小时将额外收取0.24美元的费用。如果选择Amazon EC2竞价实例,则可进一步降低使用成本。关于更多详细信息,请参阅实例购买选项。
为了获得最佳运营成本,请尽可能保证您的计算集群不致发生闲置,并努力根据给定时间点上的实际工作负载量对集群进行动态规模伸缩。
总结
我们的大数据管道已经在Amazon EMR上运行超过一年,且所有数据都存储在S3当中。在特定时段中,我们实时处理管道的峰值速率高达每秒200多万个事件,而从事件出现到汇总结果更新,总体处理延迟仅为1分钟。Amazon EMR为我们带来出色的灵活性,帮助我们在几分钟之内快速完成集群的清理与重新创建。我们对Amazon EMR平台的整体稳定性非常满意,也将继续与AWS一道探索EMR的进一步改善之道。
如前所述,成本已经成为当前业务运营中的核心因素。也许有些朋友认为在本地数据中心内运行Hadoop可能成本更低,但实际情况取决于具体组织执行运营任务的能力;此外,本地基础设施中可能还存在某些隐藏成本,且弹性水平也难以与公有云环境相媲美。结合第一手经验,我们意识到在本地运营业务是一项繁重且成本不菲的工作,总会带来很大的规划与维护压力。在我们看来,在设计大数据系统时充分考虑Amazon EMR等商用平台,将给企业的运营体系带来诸多优势。
免责声明:本文中的内容与观点来自第三方作者。AWS对本文的内容或正确性不承担任何责任。