亚马逊AWS官方博客
Spark SQL 任务迁移到Amazon EMR 及性能调优
背景介绍
IDC客户上云也会伴随将IDC中的一些大数据的工作负载迁移到云上来,和大多数线下Hadoop集群相比,亚马逊云科技也提供了更为适合云上的Hadoop服务:Amazon EMR。本文针对Hadoop常见的SQL任务迁移上云提供了一些优化总结,帮助客户规划整体迁移方案,包含使用EMR时候更多考虑长时间稳定运行还是临时工作负载和确认是否有更适合云上的性能优化,提升运行效率。客户的需求大致如下:
- 任务全部是Spark SQL,在IDC中运行时间大约1小时;
- 能兼容线下的调度方式,在IDC中支持SQL和Shell两种提交方式;
- 最终结果数据可以继续和线下数据无缝交互。
一、方案选择
考虑到对线下调度系统的兼容性,选择了EMR:EMR本身和线下Hadoop较为接近,客户上手更快;EMR可以通过多种方式直接提交Spark SQL,不需要经过转换或者封装;以及给客户提供了Spark,Hive,甚至Presto等更多的选择;同时还提供了对Spot Instance的支持,降低成本。
二、方案架构
架构说明:我们将整个架构无缝集成入客户的调度系统中,包含了数据,代码,流程三部分。
- 所有的数据和代码都存放在S3上,流程存放在客户的调度系统里,所以EMR是一个无状态的计算服务,每日动态启动,完成当天的ETL工作之后,再终止掉该集群,最大化发挥云上的弹性优势;
- 和客户的调度系统集成EMR的启动脚本,封装成Shell,并记录下集群id;附录1提供了EMR提交任务的几种方式供参考;
- 通过客户调度系统,往该集群id提交Steps的方式提交Spark SQL脚本,并给调度系统返回Steps的执行日志;这里我们选择了更云原生的Steps提交方式;
- 最终确认所有的任务执行完毕,通过客户调度关闭集群。
三、方案实施
- 存储:新建S3 Bucket和prefix,将所有的数据和依赖包和输出放在上面;
- 元数据管理:IDC中一般使用MySQL作为后端的Hive Metastore,但我们云上优先选择托管的Glue元数据目录服务,主要好处有两点:免于管理运维;成本较MySQL更为低廉;
- 计算:考虑到性能,优先选择EMR 5.32之后的版本,亚马逊云科技的服务团队为我们带来了大量的Spark和Hive性能调优;但考虑到兼容性,此次并没有选择使用了Hadoop 3/Hive 3/Spark 3的EMR 6.2.0,降低了迁移成本,最终选择了EMR 5.33.0作为本次上云的最终版本。当然, 由于在第一步和第二步已经做到了存算分离,后续我们可以随时同步跟随EMR升级的步伐,获取更好的性能和特性而不需要任何额外的负担和成本;
- 机型选择:根据到客户的任务性质和数据量,选择单Master方案,Master Node为r5d.2xlarge一台,Core Node为r5d.12xlarge两台;最终每天所有ETL任务跑完的费用只需要3-5美元。
- 集群建立:shell封装;
- 任务提交:EMR Steps提交方式,使用shell封装;提交任务的代码已经上传在github:https://github.com/hades1712/EMR_Steps
- UDF嵌入:一般用户会在提交查询语句的时候注册临时UDF,临时的UDF并不能在集群起停整个生命周期保留,所以在集群创建后将UDF统一上传到S3,进行统一注册:
create function if not EXISTS xxx as "UDF_name" USING JAR 's3://s3_UDF.jar_URL'
四、方案测试
我们将所需要的原始数据同步上了S3的目录,准备测试工作。在测试中发觉从线下迁上来任务可能存在一些小问题,主要有两个:
1.UDF注册:在线下IDC可能是数据平台完成的,但线上我们需要上传到S3并在任务SQL文件中声明;
2.SQL改造:这个也是有些意外的,大部分客户平台会弥补一些小的细节,迁移上云之后我们在创建临时表的时候增加了DROP table if EXIST的语句来避免报错,增加任务的幂等性。
最终将任务都搬上云后,我们测试了所有的任务,其中运行时间最长的任务从35分钟缩短到了25分钟左右,体现了云上的优势。
五、方案优化
亚马逊云科技在EMR上对Spark做了包含写入优化器,AQE等不同的优化,有些来自开源社区,有些是亚马逊云科技针对云上产品特性定制开发的。虽然目前整体的任务时间已经从一个小时以上缩短了20%,但我们希望能给客户在云上更好的体验,参考亚马逊云科技已经做的这些特性,对客户任务做进一步的优化。
首先,我们从输出入手。客户的任务已经在存储格式上选择了ORC这样针对数据仓库优化的列式存储,能观察到在执行过程中,EMR上的Spark针对ORC启用了DFOC这样的优化型输出:由于S3和HDFS不一样,对象存储会把mv操作变成一系列的copy-delete,导致在HDFS上原本秒级的操作会放大到分钟级别,同时在任务强度不大的情况下闲置大量计算资源,但EMR 5.32开始我们针对Spark输出ORC做了优化,DirectFileOutputCommitter会直接使用S3 Multipart-Upload的技术将copy-delete变成finalize文件,把时间从分钟级别变回秒级,提升任务运行速度和资源利用率。
虽然EMR Spark DFOC已经缩短了很多等待时间,但在检查输出文件的时候我们发觉了大量的小文件,重复跑了几次发觉最终输出都是1000个左右KB级别的文件,不管是对于HDFS还是S3,我们都建议把单个文件的大小落在MB级别,最好是64-128MB。但Spark调整输出的参数是Spark 3.0开始才正式引入的功能,如何在Spark 2.4.7上调整呢?
其实EMR一直在改进Spark的运行效率,很早就开始引如EMR Spark Runtime和EMR S3-optimized committer这样的特性。从EMR 5.32开始,更是把Spark 3.0上的Adaptive Query Engine 移植到EMR 5 系列上的Spark 2.4上,我们找到了Spark 3.0上的分区大小和数量的参数,根据客户的任务做了一些调整,打开动态调整的特性,并将分区数和分区大小分别调整为100和128MB:
再次执行任务后,输出文件降低到200个左右,并且任务时间也随之降低到了20分钟以内。第一次优化目标达成。
但最后在检查集群整体使用率的时候发觉另一个问题,我们把集群内存资源和运行时长乘积之后和所有任务的执行GB*seconds比较,有巨大的差异,如果光看数据,集群的利用率还不到30%,对于大数据来说是不可接受的,于是决定开始第二次优化。
重新检查了所有运行日志,发现日志中间出现了好几次rename的阶段,最长的一次甚至超过了5分钟,在rename阶段,整个集群都处于空闲状态。仔细检查了任务中的所有环节,原来问题在临时表,这个任务执行过程中会产生好几张临时表,这些临时表全部都是内表,且没有任何文件格式指定,导致了其实这些中间表都是以文本格式输出到S3上。
既然找到了问题所在,那么我们就可以继续第二次优化。在保证输入和输出的格式不变,和客户线下的ETL流程维持兼容的情况下,我们引入了Parquet作为中间表的存储格式,使用EMR S3-optimized committer的特性,看看是否能带来性能提升。仍然选择最长的任务作为实验对象,将左右的临时表建表语句变成CREATE table USING Parquet;
再次执行任务之后,观察运行状况,能看到所有rename阶段都已经去掉。实际这个任务能稳定在5分钟内完成,和原来IDC中35分钟比起来,获得了令人满意的进步,并且所有的改动几乎没有造成客户上云额外的代码变更和验证。
六、方案总结
Amazon EMR 在保证了对线下Hadoop兼容的情况下,很好的引入了存算分离的架构:集中式的数据存储(Data and Raw Data)和集中式的元数据管理(Meta Data),具备了以下几大特点:
- 使用无状态的EMR,极大的降低了每日使用的成本;
- 未来的升级方案和路径也很清晰,易于跟进新技术而不受到存储的限制;
- 保证了对开源产品的兼容性在使用,但又在EMR上引入大量的性能优化,包括对于Parquet写入的性能优势非常明显,值得大家升级。
大量从云下的Hive或者Spark SQL迁移上云,只需要简单通过存算分离的EMR+S3架构,并把迁移重点放在数据格式,中间临时表的状态,就可以便捷快速的完成迁移任务,获得巨大的性能提升,同时具备面向未来的架构无限升级可能性。
附录:
EMR任务提交方式:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
EMR优化写入器:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html
Spark性能优化:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-performance.html