亚马逊AWS官方博客

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (2) – File Sizing

在本系列的上一篇文章中,我们通过 Notebook 探索了 COW 表和 MOR 表的文件布局,在数据的持续写入与更新过程中,Hudi 严格控制着文件的大小,以确保它们始终处于合理的区间范围内,从而避免大量小文件的出现,Hudi 的这部分机制就称作“File Sizing”。本文,我们就针对 COW 表和 MOR 表的 File Sizing 进行一次深度探索。

1. 运行 Notebook

本文将继续使用《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》《Apache Hudi Core Conceptions (3) – MOR: File Layouts & File Sizing》两个 Notebook,对应文件分别是:2-cow-file-layouts-file-sizing.ipynb 和 3-mor-file-layouts-file-sizing.ipynb。运行前,请先修改 Notebook 中的环境变量 S3_BUCKET,将其设为您自己的 S3 桶,并确保用于数据准备的 Notebook:《Apache Hudi Core Conceptions (1) – Data Preparation》已经至少执行过一次。Notebook 使用的 Hudi 版本是 0.12.1,Spark 集群建议配置:32 vCore / 128 GB 及以上。

2. COW 表的 File Sizing

2.1. 关键配置

《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》的第 1 个测试用例展示了 COW 表是如何控制文件大小的。测试用的数据表有三个关键配置项:

配置项 默认值 设定值
hoodie.parquet.max.file.size 125829120(120MB) Default Value
hoodie.parquet.small.file.limit 104857600(100MB) Default Value
hoodie.copyonwrite.record.size.estimate 1024 175

我们知道,在 Hudi 中有“大文件”和“小文件”之说,它们就来自于 hoodie.parquet.max.file.size(默认值 120MB)和 hoodie.parquet.small.file.limit(默认值 100MB)这两项配置。简单地说,在默认情况下:小于 100MB 的是小文件,100MB ~ 120MB 之间的是大文件,大文件的大小不一,但都不会超过 120MB[1]

当磁盘上有小于 100MB(即 hoodie.parquet.small.file.limit 规定的默认值)的文件时,Hudi 会将其视为“小文件”,在下次写入数据时,Hudi 会优先选择复制小文件的数据,然后与输入数据一起合并写入到新文件中(即 Copy On Write 操作),如果一个文件超过了 100MB,则它将不再参与新输入数据的 Copy On Write 操作。当磁盘上没有小文件的时候,Hudi 就会创建新的 File Group 承接新数据。

不管是上述的 Copy On Write 操作还是新开 File Group 写入新数据,单一 Parquet 文件的体积是有最大值限制的,这个最大值就是 120MB(即 hoodie.parquet.max.file.size 规定的默认值),如果单次写入的数据量超过了 120MB,Hudi 会保证单一文件最多写满 120MB,超出的部分会建新的 File Group写入。

回到 reviews_cow_layouts_sizing_1 表的配置,我们并没有修改小文件和文件上限的阈值,只是特意拿出来解释一下它们的作用,表中唯一一项修改值是 hoodie.copyonwrite.record.size.estimate,在我们这个测试用例中,它是一项很有必要的配置,至于其具体作用,我们放到后面解释。

2.2. 测试计划

在该测试用例中, 我们会先后插入四批数据,然后重点观察过程中文件大小的变化,整体测试计划如下表所示:

步骤 操作 数据量(单分区) 文件系统
1 Insert 96MB +1 Small File
2 Insert 14.6MB +1 Big File
3 Insert 3.7MB +1 Small File
4 Insert 188.5MB +1 Max File, +1 Small File

提示:我们将使用色块标识当前批次的 Instant 和对应存储文件,每一种颜色代表一个独立的 File Group。

2.3. 第 1 批次

第 1 批次单分区写入了 96MB 数据,Hudi 将其写入到一个 Parquet 文件中,第一个 File Group 随之产生。

2.4. 第 2 批次

第 2 批次单分区写入了 14MB 数据,由于上一批次创建的 96MB 文件尚未超过 100MB 的阈值,所以它被判定为了“小文件”,Hudi 会将这个文件中的 96MB 数据与新插入的 14MB 数据合并写入到一个新的 File Group 中。值得注意的是:新文件的 fileId 并没有变,两个 Parquet 文件是同一个 File Group 的新旧两个版本。

2.5. 第 3 批次

第 3 批次单分区只写入了 3.7MB,它没有和上一批次生成的那个 110M 的文件合并生成一个 113.7MB 的文件,而是“独享”了一个新文件,一个新的 File Group,因为它的文件名是一个新的 UUID。这一批次的写入印证了:“超过 100MB 后,大文件将不再参与 COW 操作的论断”,即上一批次生成的那个 110M 的文件再也不会和任何数据合并写入到新文件中了。

2.6. 第 4 批次

第 4 批次单分区写入 188.5MB,我们要通过这一批次观察:当单次写入超过 120MB 数据时,Hudi 如何分裂文件。通过输出的文件布局可以清晰地看到:首先,上一批次生成的 3.7MB 小文件必然会参与到这一轮的 COW 操作中,Hudi 会根据记录的平均大小和输入数据的条数估算出输入数据的体积,将其中约 116.3MB 的数据(实际是按条数切分的)与 3.7MB 的小文件数据一起合并写入到一个 120MB 的文件,将剩余 66MB 数据(实际是按条数切分的)写入到了一个新的 File Group 中(实际是预先划分好,并行写入,但由于较大的文件(例如本例中的 120MB 文件)写入长间更长,所以文件显示的最后更新时间也往往较晚)。由于此次操作更新了一个 File Group 又新建了一个 File Group,所以下图使用了两个色块。

2.7. 复盘

最后,让我们将此前的全部操作汇总在一起,重新看一下整体的时间线和最后的文件布局:

备注:在本节演示中,我们只使用了 Insert 操作来演示 COW 的核心逻辑,读者可以在 Notebook 的基础上自行尝试 Update,Delete 和 Merge 操作,会观察到 Hudi 更全面的行为逻辑。

3. MOR 表的 File Sizing

3.1. 关键配置

《Apache Hudi Core Conceptions (3) – MOR: File Layouts & File Sizing》的第 1 个测试用例展示了 MOR 表是如何控制文件大小的。测试用的数据表有一个关键配置项:

配置项 默认值 设定值
hoodie.logfile.max.size 1073741824(1GB) 262144000(250MB)

与 COW 中的 Parquet 文件有所不同,MOR 中的 Log File 只有最大值限制(默认 1GB),没有所谓的“小文件”阈值,即:Log File 不检查小文件。原因也不难理解,因为 Log File 可以看作是一种存在时间不会太长的临时文件,它们最终都会被 Compact 到 Parquet 文件中,所以不会累积大量的小文件。

此外,Log File 也没有类似 Parquet 文件的 hoodie.copyonwrite.record.size.estimate 配置项,Log File 的记录平均大小只能由 Hudi 根据上次提交的元数据进行估算,所以在后面的测试用例中,我们会看到 Hudi 对 Log File 的切分没有 Parquet 文件那么精准。注意,这里不是说 Hudi 对于 Parquet 和 Log 文件的 File Sizing 有什么差异,Hudi 对文件大小的切分都是基于估算的,只是对于 Parquet 文件来说由于多了一个 hoodie.copyonwrite.record.size.estimate 配置项,在我们的测试用例里可以看到非常精准的文件切割(因为我们的测试数据每条记录基本都是一样大的),而 Log 文件没有类似配置项的支持。

3.2. 测试计划

在该测试用例中, 我们会先后插入或更新四批数据,然后重点观察过程中文件大小的变化,整体测试计划如下表所示:

步骤 操作 数据量(单分区) 文件系统
1 Insert 96MB +1 Base File
2 Update 804KB +1 Log File
3 Update 1.2MB +1 Log File +1 Base File
4 Update 307MB +1 Log File +1 Max Log File

提示:我们将使用色块标识当前批次的 Instant 和对应存储文件,每一种颜色代表一个独立的 File Slice(不是 File Group,请注意和上一节的差别)。

3.3. 第 1 批次

第 1 批次单分区写入 96MB 数据,Hudi 将其写入到一个 Parquet 文件中,第一个 File Group 随之产生,它也是后续 Log File 的 Base File。需要注意的一个细节是:对于 MOR 表来说,只有进行 Compaction 的那次提交才会被称为“commit”,在 Compaction 之前的历次提交都被称作“deltacommit”,即使对于新建 Base File 写入数据的那次提交也是如此,就如同这里一样。

3.4. 第 2 批次

第 2 批次更新了一小部分数据,Hudi 将更新数据写入到了 Log 文件中,大小 804KB,fileVersion 是 1,它从属于上一步生成的 Parquet 文件,即 Parquet 文件是它的 Base File ,这个 Log 文件的 fileId 和尾部的时间戳(baseCommitTime)与 Parquet 文件是一样的。当前的 Parquet 文件和 Log 文件组成了一个 File Slice。

3.5. 第 3 批次

第 3 批次再次更新了一小部分数据,Hudi 将更新数据又写入到一个 Log 文件中,大小 1.2MB,fileVersion 是 2。与上一个 Log 文件一样,fileId 和尾部的时间戳(baseCommitTime)与 Parquet 文件一致,所以它也是 Parquet 文件的 Delta Log,且按 Timeline 排在上一个 Log 文件之后。当前的 File Slice 多了一个新的 Log 文件。

但是,不同于第 2 批次,第 3 批次的故事到这里还没有结束,在该测试用例中,当前测试表的设置是:每三次 Deltacommit 会触发一次 Compaction,因此,第 3 次更新操作后就触发了第 1 次的 Compaction 操作。于是,在 Timeline 上出现了一个 commit(No.3)。同时,在文件系统上,生成了一个新的 96MB 的 Parquet 文件,它是第一个 Parquet 文件连同它的两个 Log 文件重新压缩后得到的,这个新的 Parquet 文件 fileId 没变,但是 instantTime 变成了 Compaction 对应的 commit 时间,于是,在当前 File Group 里,第二个 File Slice 产生了,目前它还只有一个 Base File,没有 Log File。

3.6. 第 4 批次

第 4 批次更新了全部数据,总的更新数据量达到了 307MB[2],由于当前测试表的 Log File 体积上限被设定为 250MB,于是,Hudi 需要将这批更新数据分拆成两个 Log File 写入,其中一个大小会在 250MB 上下,另一个就应该在 57MB 上下,最终落地的文件一个是 268MB,另一个是 39MB,产生偏差的原因就是我们在 3.1 节解释的,这是 Hudi 的估算结果,无法做到精准切分,这种偏差不会有什么影响。两个 Log File 的 fileId 和尾部的时间戳(baseCommitTime)与第二个 Parquet 文件一致,它们属于第二个 File Slice。

3.7. 复盘

最后,让我们将此前的全部操作汇总在一起,重新看一下整体的时间线和最后的文件布局:

备注:在本节演示中,我们主要使用了 Update 操作来演示 MOR 的核心逻辑,读者可以在 Notebook 的基础上自行尝试 Insert,Delete 和 Merge 操作,会观察到 Hudi 更全面的行为逻辑。

4. 揭秘默认行为

相信应该有不少读者都做过向 Hudi 表中插入数据然后观察文件变化的实验,如果你使用的都是默认配置,那么大概率是无法复现像第 2 节那样“教科书式”的行为的,特别是在初始的几轮操作中,Hudi 的表现让人“琢磨不透”。在《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》 的第 2 个测试用例中,我们就使用了全默认配置观察 COW 表的 File Sizing 行为,当第 1 批数据插入时,情形就已经很“迷惑”了,因为你将看到这样的结果:


第 1 批单分区插入了 99MB 数据,按照我们此前的理解,此时应该生成一个 99MB 的 Parquet 文件才对,而现在的状况是:Hudi 一次就创建了 5 个 Parquet 文件,分属 5 个 File Group,且每个文件都不超过 21MB,好像此前建立起来的秩序瞬间“崩塌”了。

是的,一定是哪里还有我们没掌握的知识盲区。此前我们一直留着一个配置项没有解释,如果对比一下:《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》的第 1 和第 2 测试用例的 Hudi 配置,你就会发现,其实它们就差了一个配置项:hoodie.copyonwrite.record.size.estimate,正是前文没有解释的那项配置。该项配置用于指定记录的平均大小,Hudi 会利用该值和输入记录的条数计算落地文件可能的大小,以便提前对输入数据进行切分,保证落地的文件不会超过设定的阈值。如果用户没有显式地配置该项,在第 1 次插入时,Hudi 会使用默认值 1024(即 1KB)作为记录的平均大小进行估算,进而决定如何切分文件。

现在,我们来推导一下上一步默认配置下发生的“故事”: 初始插入的是 2003 年的全年数据,总计 1155127 条,划分到单分区是 577564 条,由于没有给出记录平均大小的参考值,且是第一次提交,没有历史数据可以借鉴,所以 Hudi 只能按默认的 1KB 进行估算,由于单个文件的体积上限是 120MB,所以单个文件最多只能写入 120MB ÷ 1KB = 122880 条记录,因此单分区的 577564 条记录需要划分成 5 份写入 5 个文件中,其中 4 份是 122880 条记录,第 5 份是 577564 – 122880 * 4 = 86044 条记录,以上就是 Hudi 基于记录平均大小为 1KB 的前提计算出的文件切分方案,如果记录的平均大小真得是 1KB,则插入后单分区应该出现 4 个 120MB 和 1 个 84MB 的文件,但我们测试数据的平均大小实际只有 175 个字节,是估算值 1KB 的 17%,因此实际落地的文件体积也“缩水”成了估算值的 17%,所以最终大家看到的是 4 个 21MB 和 1 个 15MB 的文件。

实际上,hoodie.copyonwrite.record.size.estimate 并不是一个非常重要的配置项,因为如果不显式地配置它,Hudi 会根据上次提交的元数据动态计算记录的平均大小,这一点在我们第 2 个测试用例的第 2 步就显现出来了:

在第 2 步测试中,我们向单分区插入了 417MB 的数据,正常情况下,这些数据应该能填充 3 个 Max File(120MB)和一个 Small File,从实际的输出结果来看,Hudi 较第一次的“迷惑”行为“理性”了很多,它确实生成了 3 个 116MB 的大文件和 1 个 68MB 的小文件。这次行为看上去合理很多的原因在于:由于此前已经进行了一次提交,所以 Hudi 可以从前一次提交的元数据中估算出纪录的平均大小,这个估算值要比一开始使用的默认值 1024“靠谱”的多,所以整体行为也合理了许多,至于为什么没有写到 120MB 是因为估算总会一定的误差,在实际系统中,我们很少能看到正正好好的 120MB 文件。

[1]有多种特殊情况会使得数据文件突破 120MB 的限制,例如使用了 SIMPLE HASHING 的 BUCKET INDEX,启用了 Clustering 等,此外,一个值得记录的情况是:如果在 COW 表里更新一个已经是大文件里的数据,在 COW 和索引机制的共同作用下,更新数据和原有数据(已经超过 100BM)会继续合并写入新文件,此时文件体积可能会突破 120MB。

[2]相同的记录数量下,Log 文件的体积往往要比 Parquet 文件大很多,这和 Log 文件的结构以及使用的 AVRO 格式有关。所以在这个 Case 里,单分区下存储全部数据的 Parquet 文件只有 96MB,而存储全部数据的 Log 文件却达到了 307MB。

关联阅读

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (1) – File Layouts

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (3) – Compaction

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (4) – Clustering

本篇作者

Laurence

AWS 资深解决方案架构师,多年系统开发与架构经验,对大数据、云计算、企业级应用、SaaS、分布式存储和领域驱动设计有丰富的实践经验,著有《大数据平台架构与原型实现:数据中台建设实战》一书。