亚马逊AWS官方博客

Spark 小文件合并功能在 AWS S3 上的应用与实践

序言

AWS S3,即Amazon Simple Storage Service (简称Amazon S3),是一种对象存储服务,可提供业界领先的可扩展性、数据可用性、安全性和性能,这意味着各行各业都可以使用它来存储和保护各种使用案例(诸如湖内数仓、网站、存档、企业级应用程序、IoT设备和大数据分析)的任意量的数据。目前,OPPO也逐步使用S3为核心,在海外构建PB级别的数据湖,以广泛支持OPPO在海外的各类数据服务。

而在OPPO的大数据使用场景中,随着数据量持续增大,特别是Hive,Spark的使用加剧了小文件的产生,操作涉及的文件数达到了百万级,对作业的执行时间和性能都造成了显著影响,在基于AWS S3做底层存储的数仓领域也不例外。对于提升S3上小文件场景的读写性能,常见的方法有两个:

1)增加S3分区,根据AWS官方最佳实践,可以通过在前缀增加hash key等的方式区分前缀,以让S3根据前缀生成更多分区,提升S3读写性能。

2)通过repartition/coalesce的方式,利用spark提供的参数实现小文件的合并,如AWS把Spark 3.0上的AQE移植到EMR 5.32+上,使用户能够通过相关参数控制partition的数量和大小。

但上述改造方式在OPPO的实施会存在不少挑战,一方面由于大量的存量数据,S3前缀的改造方式对业务侧会造成一定程度使用体验的变化,也难以应对单个分区下大量小文件产生的性能影响。同时OPPO运行上万的Spark作业,参数需要根据不同的使用场景进行调整,当上游业务发生细微的变化,可能也需要调整参数来进行适配,给业务增加维护成本。为了让业务能更为透明地解决小文件性能问题,OPPO 进一步调研了Spark写入文件的行为并开发相应的合并功能,相比于其它小文件优化方法,该方案主要具有如下两方面的优势:

  1. 对业务使用非常友好,只需在Spark作业中动态配置参数即可使用;
  2. 使用set spark.sql.sources.commitProtocolClass 的方式引入包含合并小文件功能的 CommitProtocol 协议,对Spark代码无侵入,便于Spark源代码的维护。

经过初步对Spark小文件合并功能的应用实践,我们发现,它能在多个方面给用户带来非常大的收益:其一,大幅减少底层存储中的小文件,对于HDFS,可以极大减小NameNode元数据管理的压力,对于AWS S3,可以极大提高对数据生命周期管理的效率;其二,有效提高MapReduce或Spark作业的数据读取效率,减少对内存和计算资源的消耗;其三,有效提高Spark作业的稳定性,以及对大型数据表的处理速度。

下文将基于Spark文件提交机制来介绍Spark小文件合并功能的基本原理,并进一步阐述我们在AWS S3上所进行的适配工作,以及应用Spark小文件合并功能带来的收益。

Spark小文件产生的原因

大数据领域的小文件,通常是指文件大小显著小于HDFS Block块(64MB或128MB)的文件,小文件过多会给HDFS带来严重的性能瓶颈(主要表现在NameNode节点元数据的管理以及客户端的元数据访问请求),并且对用户作业的稳定性和集群数据的维护也会带来很大的挑战。

Spark程序产生的文件数量直接取决于RDD中partition分区的数量和表分区的数量,需要注意的是,这里提到的两个分区,概念并不相同,RDD的partition分区与任务并行度高度相关,而表分区则是指的Hive表分区(对于Hive,一个分区对应一个目录,主要用于数据的归类存储),产生的文件数目一般是RDD分区数和表分区数的乘积。因此,当Spark任务并行度过高或表分区数目过大时,非常容易产生大量的小文件。

Spark小文件合并基本原理

Spark任务在执行过程中,通常都要经历从数据源获取数据,然后在内存中进行计算,最后将数据进行持久化的过程,其中有两个非常关键的操作:1、executor端的task任务执行commitTask方法,将数据文件从task临时目录转移到Job临时目录;2、driver端执行commitJob方法,将各个task任务提交的数据文件,从Job临时目录转移到Job的最终目标目录。

Spark小文件合并的基本原理:在executor端,各个task任务执行完commitTask方法提交数据后,先获取作业对应的所有小文件,然后按照分区对小文件进行分组合并,最后driver端执行commitJob方法,将合并后的数据文件转移到Job的最终目标目录。在Spark作业中,引入小文件合并功能的执行流程,如下图:

开源的Spark有两种不同的文件提交机制,即FileOutputCommitter V1和FileOutputCommitter V2,它们是Spark小文件合并功能的算法基础,只有充分了解FileOutputCommitter V1和FileOutputCommitter V2文件提交机制,才能更好地理解Spark小文件合并功能的工作原理。首先,我们来了解下,Spark写文件跟其他的程序写文件有什么不一样的地方。

Spark写文件不一样的地方

通常情况下,在单机上写文件时,都会生成一个指定文件名的文件,而调用Spark DataFrame的writer接口来写文件时,所得到的结果却不尽相同。如下图右侧所示,在指定路径下写入了3个数据文件。为什么会这样?这就与Spark的执行方式有关了,Spark是分布式计算系统,其RDD中的数据是分散在多个Partition中的,而每个Partition对应一个task来执行,这些task会根据vcores数量来并行执行。在下图示例中,分配了3个Partition,所以生成了part-00000、part-00001、part-00002共3个文件(文件名中间的一长串UUID是在job中统一生成的)。按照这样的执行方式,如果直接把数据写入到指定的路径下,会出现哪些问题呢?

  • 问题1:由于是多个task并行写文件,如何保证所有task写的所有文件要么同时对外可见,要么同时不可见?在上图示例中,3个task的写入速度是不同的,那就会导致不同时刻看到的文件个数是不一样的。此外,如果有一个task执行失败了,会导致有2个文件残留在这个路径下;
  • 问题2:同一个task可能因为Speculation推测执行或其他原因,导致某一时刻有多个task attempt并行执行,即同一个task有多个实例同时写相同的数据到相同的文件中,势必会造成冲突。如何来保证最终只有一个task是成功的并且数据是正确的呢?

FileOutputCommitter V1 和 FileOutputCommitter V2文件提交机制,很好地解决了上述问题,虽然它们在执行逻辑上非常相似,但在保证原子性和数据一致性等方面,差异很大。下面,我们就来详细了解这两种文件提交机制。

FileOutputCommitter V1文件提交机制

FileOutputCommitter V1文件提交机制的基本工作原理,需要经历两次rename过程,每个task先将数据写入到如下临时目录:

finalTargetDir/_temporary/appAttemptDir/_temporary/taskAttemptDir/dataFile

等到task完成数据写入后,执行commitTask方法做第一次rename,将数据文件从task临时目录转移到如下临时目录:

finalTargetDir/_temporary/appAttemptDir/taskDir/dataFile

最后,当所有task都执行完commitTask方法后,由Driver负责执行commitJob方法做第二次rename,依次将数据文件从job临时目录下的各个task目录中,转移到如下最终目标目录中,并生成_SUCCESS标识文件:

finalTargetDir/dataFile

FileOutputCommitter V1文件提交机制的执行流程,如下图:

FileOutputCommitter V2文件提交机制

FileOutputCommitter V1文件提交机制较好地解决了数据一致性的问题,因为只有在rename的过程中才可能出现数据一致性问题,而通常情况下,这种问题出现的概率非常低。但是,两次rename带来了性能上的问题,主要表现在:如果有大量task写入数据,即使所有task都执行完成了,仍需等待较长一段时间作业才结束,这些时间主要耗费在driver端做第二次rename,这个问题在对象存储中尤为突出。因此,在EMR上,基于文件格式和EMR版本的不同,默认使用的是基于FileOutputCommitter V2的DirectWriter或者EMR S3 Optimized Committer,而非FileOutputCommitter V1,以提高作业写入S3的性能。

FileOutputCommitter V2文件提交机制的出现,解决了两次rename存在的性能问题,其执行流程如下图。相比于FileOutputCommitter V1文件提交机制,主要去掉了在commitJob阶段做第二次rename来提高性能,但是牺牲了一部分的数据一致性。在FileOutputCommitter V2文件提交机制中,如果部分task已执行成功,而此时job执行失败,就会出现一部分数据对外可见,也就是出现了脏数据,需要数据消费者根据是否新生成了_SUCCESS标识文件来判断数据的完整性。

AWS官方EMRFS S3-optimized Committer

除了上述提到的两种开源committer,AWS也为基于S3的数据湖定制开发了EMRFS S3-optimized Committer,它是在FileOutputCommitter V2上的改进,利用S3的 multipart upload机制在保证数据一致性的同时,还进一步提升了Spark在S3的写入性能。在满足特定条件时,EMRFS S3-optimzied Committer在EMR 5.20及更高版本将被默认开启。从EMR 6.4.0版本开始,EMRFS S3-optimzied Committer将进一步适用于更多常见的文件格式,包括Parquet,ORC,CSV 及JSON等。在executor的日志中,我们也可以通过下图方式来确认其是否开启。

如下图executor的日志中可见,EMRFS S3-optimzied Committer会使用multipart upload机制将output file上传至具有唯一UUID的staging目录,同时标记destination key为最终路径下的文件。multipart upload会处于未完成状态,直到commitTask阶段时,对multipart upload发起complete操作,从而将task相关的output file的key全部替换成destination key。通过这个机制,进一步避免了FileOutputCommitter V2中,commitTask 的rename动作所引起的list-copy-delete开销。

Spark小文件合并功能在AWS S3上的特殊考量

基于FileOutputCommitter V2的文件提交机制下,无论底层是基于HDFS文件系统还是AWS S3对象存储服务,我们在on-premise环境中的Spark小文件合并方案都无法正常执行,主要原因在于,driver端在执行commitJob方法前,待写入数据文件就已经被task任务转移到Job的最终目标目录中,原先采用的获取所有数据文件的方式无法取得预期的效果。

FileOutputCommitter V2文件提交机制下如何获取所有数据文件

前面已经介绍过,在FileOutputCommitter V2文件提交机制下,executor端的task任务在执行完commitTask方法后,便会将数据文件直接转移到job的最终目标目录,不再由driver端执行commitJob方法来依次转移这些数据文件,以提高整个Spark作业的执行效率。前文提到的获取所有数据文件的方案,仅适用于FileOutputCommitter V1文件提交机制,那么,如何获取所有数据文件,来进一步实施小文件合并操作?针对这个问题,我们首先想到的是,从Spark框架API源码着手,寻找对应的解决方法但没有成功。

通过梳理Spark作业的执行流程,我们发现,executor端的task任务先执行newTaskTempFile方法以生成临时数据文件,然后执行commitTask方法来提交数据文件。于是,我们想到了利用中间存储介质来暂存相关数据,作为executor端和driver端通信的替代手段。executor端的task任务在执行newTaskTempFile方法时,将对应文件的分区目录结构等信息写入到中间存储介质上,在driver端执行commitJob方法前,先从中间存储介质上获得数据文件的分区目录结构等信息,再根据这些信息从job目标目录中找到对应的数据文件,最后执行小文件合并操作。

如何暂存文件的分区目录结构等信息

我们第一时间想到了利用第三方存储组件(如MySQL、Zookeeper、Redis等)来暂存数据文件的分区目录结构等信息。经过仔细斟酌,我们发现这种方案有太多不可取之处,Spark作为一个独立的计算框架,不应该过度依赖于第三方组件,主要原因有下面几个:

1、增加环境部署、系统架构的复杂性;

2、降低系统运行的稳定性;

3、提高用户系统使用的复杂度。

对于HDFS文件系统来说,创建、遍历目录和文件,执行效率都比较高,而由于EMR集群默认会使用core节点部署集群本地的HDFS,因此我们进一步考虑利用这个HDFS集群来暂存中间数据。此外,我们还发现,无论底层存储是基于HDFS文件系统还是AWS S3对象存储服务,Spark都会在诸如hdfs:///user/spark/.sparkStaging的目录中暂存作业相关的一些临时数据信息。最后,我们参考Spark框架的这种机制,在Spark小文件合并功能中,使用诸如hdfs:///user/spark/.sparkCompacting的目录来暂存小文件合并的临时数据信息,如下图所示。

暂存文件分区目录结构等信息的过程,其实现思路是:在executor端task任务执行完newTaskTempFile方法时,获取新生成数据文件的分区目录结构及文件名等信息,在执行完commitTask方法后,在当前作业的hdfs:///user/spark/.sparkCompacting/jobId小文件合并目录下,生成task目录(如task_20211128070355_000_m_000111),并根据获取的信息创建分区目录及空文件(这里需要注意的是,为了优化性能,这里的文件名是原始数据文件名和文件大小的拼接字符串),以供后续driver端搜索该目录来获得待写入数据文件的分区目录结构及文件名、文件大小等信息。

Spark小文件合并功能给用户带来的收益

经过反复的测试验证,无论底层存储是AWS S3还是HDFS,Spark小文件合并功能都能正常工作,并且二者在性能和效率上无明显差异。通过一段较长时间的线上应用实践,Spark小文件合并功能在多个方面给用户带来了较大的收益,主要归纳为如下四点,后文将展开进行详细介绍。

  • 减少文件数量,便于数据生命周期管理;
  • 提高用户作业的数据读取效率;
  • 降低Spark driver端出现OOM的概率;
  • 加快大型Spark作业的执行速度。

减少文件数量,利于数据生命周期的管理

对于HDFS这种文件系统,任何一个文件、目录或者数据块(Block)都会在NameNode机器节点的内存中保存一份元数据,受限于NameNode机器节点物理内存的大小,HDFS上存放的文件数量也将受到极大的制约,将大量小文件合并成为少数几个大文件,能很大程度地减小NameNode的负载压力,提高执行性能。此外,处理小文件并非HDFS的擅长之处,HDFS的设计目标是流式访问大数据集(TB级别),在HDFS上存放大量小文件,访问这些小文件获取数据时,需要频繁连接访问不同的DataNode节点,严重影响数据的读写效率。

对于AWS S3这类对象存储服务,其没有文件目录树这个概念,所有数据都在同一个层次,仅仅通过数据的唯一地址标识来识别并查找数据。基于这个特性,对于遍历目录下所有数据文件,AWS S3相比于HDFS性能要差上一个数量级。

Spark小文件合并功能的设计目标,就是在Spark作业的执行过程中,将属于同一分区的大量小文件,合并成为一个或多个大文件(这个合并后目标文件的大小阈值可由用户自行配置),在较大程度上减少小文件的数量,并且使文件大小的分布更合理、更均衡,这无论对于何种底层存储,都非常有利于数据生命周期的管理。通过实践测试发现,在对AWS S3上的数据进行生命周期管理时,多个分区目录下总共存放近百万小文件,删除这些分区目录需要花20分钟左右的时间,而在引入Spark小文件合并功能后,这些分区目录下的文件总数量会降低到2~3千,删除时间仅需要花3~5秒,效果非常显著。

提高用户作业的数据读取效率

对于MapReduce作业,Map任务通常是一次处理一个块大小的输入数据,如果文件非常小,并且拥有大量的这种小文件,那么每一个map task都仅仅处理非常少的输入数据,因此会产生大量的map task。默认情况下,每个map task会启动一个JVM进程,在其执行结束时会销毁该JVM进程,大量的map task会对应大量JVM进程的创建和销毁,显然,这样会增加很多额外的开销。例如,一个1GB的文件拆分成16个块大小的文件(默认块大小为64MB),相对于拆分成10000个100KB的小文件,后者每一个小文件启动一个map task,那么job的数据读取效率将会比前者下降一个数量级。

对于Spark作业,情况也是类似的,如果数据源分布着大量小文件,则Spark作业在从数据源读取数据时,也会产生大量的task,而Spark作业可用的Executor数量有限(相对应地,并行执行的task数量有限),在这种情况下,每个Executor会依次执行多个task,执行task的时候,会进行反序列化等一系列操作;如果数据源都是文件大小均衡、接近数据块大小的大文件,则Spark作业在从数据源读取数据时,产生的task数量相对会大幅减少,在这种情况下,每个Executor可能只需要依次执行少量几个task,避免在多个task之间切换,提高执行效率。

下面两个表展示了在AWS S3实际生产环境测试中,数据规模不同的两个用户表,引入Spark小文件合并功能相比未引入Spark小文件合并功能,Spark作业数据读取效率的提升情况。

表1 针对数据规模较小的用户表,Spark作业数据读取情况统计

A A 未引入小文件合并功能 引入小文件合并功能 提升情况
1 1 平均数据量(行) 1230319915 1230319915 平均数据量保持一致
2 2 文件平均数目(个) 3982 38 文件平均数目减少99.0%
3 3 task执行并行度(个) 40 40 task执行并行度保持一致
4 4 总内存平均消耗(MB) 11438718 8755048 总内存平均消耗减少23.5%
5 5 CPU核心数平均消耗(个) 1854 1416 CPU核心数平均消耗
减少23.6%
6 6 Spark作业数据读取
平均耗时(s)
65 54 Spark作业数据读取平均耗时
减少16.9%

表2 针对数据规模较大的用户表,Spark作业数据读取情况统计

A A 未引入小文件合并功能 引入小文件合并功能 提升情况
1 1 平均数据量(行) 1737216599 1737216599 平均数据量保持一致
2 2 文件平均数目(个) 235194 1919 文件平均数目减少99.2%
3 3 task执行并行度(个) 80 80 task执行并行度保持一致
4 4 总内存平均消耗(MB) 142561352 54868136 总内存平均消耗减少61.5%
5 5 CPU核心数平均消耗(个) 25447 8913 CPU核心数平均消耗
减少65.0%
6 6 Spark作业数据读取
平均耗时(s)
347 129 Spark作业数据读取平均耗时减少62.8%

从上述对比测试结果可以发现,在给Spark作业分配相同且数量有限的Executor的条件下,对于数据量较小且小文件问题不显著的用户表,通过引入Spark小文件合并功能减少小文件数量,Spark作业数据读取效率能提高16%左右;对于数据量较大且小文件问题显著的用户表,通过引入Spark小文件合并功能减少小文件数量,Spark作业数据读取效率能提高62%左右,效果非常突出。

降低Spark作业driver端出现OOM的概率

从前文的介绍可知,对于Spark作业,数据源分布的小文件数量越多,则Spark作业在从数据源读取数据时,产生的task数量也会更多,而数据分片信息以及对应产生的task元信息都保存在driver端的内存中,会给driver带来很大的压力,甚至引发OOM异常导致作业执行失败。

我们在实践中发现,有个Spark作业从源表对应分区查询数据(数据分散在多个分区目录,总计有二十~三十万个小文件),产生的task数量达到20万个,在Spark作业执行后不久,出现driver报OOM异常导致作业执行失败,反复尝试多次都是同样的结果,通过调大driver端内存进行重试,对应Spark作业才执行成功。后来,通过在源表Spark作业中引入Spark小文件合并功能,大幅减少了对应分区的文件数量,同样的Spark作业从源表查询数据(此时,数据分散在多个分区目录,总计有6000~7000个大文件),产生的task数量在7000左右,即使不调大driver端内存,该Spark作业也能够执行成功。

加快大型Spark作业的执行速度

从前文内容介绍可知,在基于FileOutputCommitter V1文件提交机制的应用场景下,Spark作业在所有task执行完commitTask方法提交数据后,driver端会继续执行commitJob方法来依次将数据文件转移到job的最终目标目录下,当文件数量非常大时,会成为一个严重的性能瓶颈,导致Spark作业迟迟不能结束。因此,AWS针对S3的写入也开发了性能优化的committer。

针对上述这种情况,如果引入Spark小文件合并功能,虽然会占用作业一部分执行时间,以及内存和CPU资源来执行小文件合并操作,但在driver端执行commitJob方法时,需要转移的文件数量大幅减少,在很大程度上能提高执行commitJob方法的效率,进而提高整个Spark作业的执行速度。

下表展示了在实践测试中,引入Spark小文件合并功能相比未引入Spark小文件合并功能,Spark作业执行速度的提升情况。

A A 未引入小文件合并功能 引入小文件合并功能 提升情况
1 1 Spark任务平均耗时(s) 2376 1486 任务平均耗时减少37.5%
2 2 commitJob操作
平均耗时(s)
1865 283 commitJob操作平均
耗时减少84.8%
3 3 文件平均数目(个) 318434 233 文件平均数目减少99.9%

从以上对比测试结果可以发现,在AWS S3 上,对于小文件问题非常突出的数据表,通过引入Spark 小文件合并功能,能够显著提高 driver 端执行 commitJob 方法的效率,进而大幅加快 Spark 作业的执行速度。

总结与展望

作为Spark计算框架的一个组件,Spark小文件合并功能使用简单、方便,并且非常灵活,用户可以根据实际业务情况,合理配置相关参数,使其发挥出最好的效果。截止到目前,我们开发实现的Spark小文件合并功能,初步取得了如下几个方面的成效:

  • 显著降低了存储系统上小文件的数量,让文件大小的分布更合理、均衡;
  • 有效提高了存储系统上数据的处理效率,更利于数据生命周期的管理;
  • 大幅降低了用户作业数据读取阶段的并发度,能有效节约集群的计算资源;

此外,Spark小文件合并功能,对于特定的存储系统,如AWS S3,能明显降低S3 503 Slow Down问题发生的次数,提高用户作业执行的稳定性;对于特定的计算框架,如Spark,能有效防止driver端发生OOM异常,提高Spark作业执行的成功率。

接下来,我们将在大规模线上用户作业的实践中,对Spark小文件合并功能进行全方位的检验,并针对出现的问题进行优化升级,以便其能更好地为线上作业做贡献。

本篇作者

申鹏

OPPO高级数据平台工程师,毕业于电子科技大学,目前就职于OPPO数据架构团队,主要负责Spark计算引擎和Yarn资源调度框架的开发。

David Fu

OPPO大数据计算平台架构师,负责大数据计算平台技术演进设计开发,曾供职于阿里云MaxCompute,去哪儿网大数据平台,拥有10年大数据架构,开发经验。

Huan Yang

OPPO大数据质量负责人,负责大数据平台支持维护及服务质量保证工作,曾供职于京东科技,有较丰富的大数据任务开发和性能优化经验,同时对产品体验和成本优化有较多兴趣和经验。

Aiden Huang

OPPO高级数据平台工程师,Apache Spark Contributor,负责大数据计算引擎开发和持续优化,曾供职于网易,BIGO等互联网公司,拥有较丰富大数据架构和开发经验。

Florence Liu

亚马逊云科技技术客户经理,负责企业级客户的架构,运维和成本管理的持续优化,曾供职于IBM,腾讯及跨国快消公司,拥有10年IT架构、运维经验。