亚马逊AWS官方博客
使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (3) – Compaction
Compaction 是 MOR 表的一项核心机制,Hudi 利用 Compaction 将 MOR 表产生的 Log File 合并到新的 Base File 中。本文我们会通过 Notebook 介绍并演示 Compaction 的运行机制,帮助您理解其工作原理和相关配置。
1. 运行 Notebook
本文使用的 Notebook是:《Apache Hudi Core Conceptions (4) – MOR: Compaction》,对应文件是:4-mor-compaction.ipynb,请先修改 Notebook 中的环境变量 S3_BUCKET
,将其设为您自己的 S3 桶,并确保用于数据准备的 Notebook:《Apache Hudi Core Conceptions (1) – Data Preparation》已经至少执行过一次。Notebook 使用的 Hudi 版本是 0.12.1
,Spark 集群建议配置 32 vCore / 128 GB 及以上。
2. 核心概念
Compaction 负责定期将一个 File Slice 里的 Base File 和从属于它的所有 Log File 一起合并写入到一个新的 Base File 中(产生新的 File Slice),唯有如此,MOR 表的日志文件才不至于无限膨胀下去。以下是与 Compaction 有关的几项重要配置,在后面的介绍中我们会逐一介绍它们的作用:
配置项 | 默认值 |
hoodie.compact.inline | false |
hoodie.compact.schedule.inline | false |
hoodie.compact.inline.max.delta.commits | 5 |
2.1. 排期与执行
Compaction 的运行机制包括:排期(Schedule)和执行(Execute)两个阶段。排期阶段的主要工作是划定哪些 File Slices 将参与 Compaction,然后生成一个计划(Compaction Plan)保存到 Timeline 里,此时在 Timeline 里会出现一个名为 Compaction
的 Instant,状态是 REQUESTED
;执行阶段的主要工作是读取这个计划(Compaction Plan)并执行它,执行完毕后,Timeline 中的 Compaction
就会变成 COMPLETED
状态。
2.2. 同步与异步
从运行模式上看,Compaction 又分同步、异步以及半异步三种模式(“半异步”模式是本文使用的一种叫法,为的是和同步、异步两种模式的称谓对齐,Hudi 官方文档对这一模式有介绍,但没有给出命名),它们之间的差异主要体现在从(达到规定阈值的某次)提交(Commit)到排期(Schedule)再到执行(Execute)三个阶段的推进方式上。在 Hudi 的官方文档中,交替使用了 Sync/Async 和 Inline/Offline 两组词汇来描述推进方式,这两组词汇是有微妙差异的,为了表述严谨,我们使用同步/异步和立即/另行这两组中文术语与之对应。以下是 Compaction 三种运行模式的详细介绍:
- 同步模式(Inline Schedule,Inline Execute)
同步模式可概括为:立即排期,立即执行(Inline Schedule,Inline Execute)。在该模式下,当累积的增量提交(deltacommit
)次数到达一个阈值时,会立即触发 Compaction 的排期与执行(排期和执行是连在一起的),这个阈值是由配置项 hoodie.compact.inline.max.delta.commits
控制的,默认值是 5
,即:默认情况下,每 5 次增量提交就会触发并执行一次 Compaction。锁定同步模式的配置是:
配置项 | 设定值 |
hoodie.compact.inline | true |
hoodie.compact.schedule.inline | false |
- 异步模式(Offline Schedule,Offline Execute)
异步模式可概括为:另行排期,另行执行(Offline Schedule,Offline Execute)。在该模式下,任何提交都不会直接触发和执行 Compaction,除非使用了支持异步 Compaction 的 Writer,否则用户需要自己保证有一个独立的进程或线程负责定期执行 Compaction 操作。Hudi 提供了四种运行异步 Compaction 的方式:
- 通过 hudi-cli 或提交 Spark 作业驱动异步 Compaction
- 提交 Flink 作业驱动异步 Compaction
- 在 HoodieDeltaStreamer 中配置并运行异步 Compaction
- 在 Spark Structured Streaming 中配置并运行异步 Compaction
在后面的测试用例中,我们将使用第一种方式演示如何进行异步的 Compaction 排期与执行。和同步模式一样,在异步模式下,同样是当增量提交(deltacommit
)次数达到一定的阈值时才会触发排期,这个阈值依然是 hoodie.compact.inline.max.delta.commits
。
异步模式面临的场景要比同步模式复杂一些,同步模式下,每次提交时都会检查累积的提交次数是否已达规定阈值,所以在同步模式下,每次排期涵盖的增量提交数量基本是固定的,就是阈值设定的次数,但是在异步模式下,由于发起排期和增量提交之间没有必然的协同关系,所以在发起排期时,Timeline 中可能尚未积累到足够数量的增量提交,或者增量提交数量已经超过了规定阈值,如果是前者,不会产生排期计划,如果是后者,排期计划会将所有累积的增量提交涵盖进来。锁定异步模式的配置是:
配置项 | 设定值 |
hoodie.compact.inline | false |
hoodie.compact.schedule.inline | false |
- 半异步模式(Inline Schedule,Offline Execute)
半异步模式可概括为:立即排期,另行执行(Inline Schedule,Offline Execute),即:排期会伴随增量提交(deltacommit
)自动触发,但执行还是通过前面介绍的四种异步方式之一去完成。锁定半异步模式的配置是:
配置项 | 设定值 |
hoodie.compact.inline | false |
hoodie.compact.schedule.inline | true |
3. 同步 Compaction
3.1. 关键配置
《Apache Hudi Core Conceptions (4) – MOR: Compaction》的第 1 个测试用例演示了同步 Compaction 的运行机制。测试用的数据表有如下几项关键配置:
配置项 | 默认值 | 设定值 |
hoodie.compact.inline | false | true |
hoodie.compact.schedule.inline | false | false |
hoodie.compact.inline.max.delta.commits | 5 | 3 |
hoodie.copyonwrite.record.size.estimate | 1024 | 175 |
这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。
3.2. 测试计划
该测试用例会先后插入或更新三批数据,然后进行同步的 Compaction 排期和执行,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:
步骤 | 操作 | 数据量(单分区) | 文件系统 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File +1 Compacted Base File |
提示:我们将使用色块标识当前批次的 Instant 和对应存储文件,每一种颜色代表一个独立的 File Slice。
3.3. 第 1 批次
第 1 批次单分区写入 96MB 数据,Hudi 将其写入到一个 Parquet 文件中,第一个 File Group 随之产生,它也是后续 Log File 的 Base File。需要注意的一个细节是:对于 MOR 表来说,只有进行 Compaction 的那次提交才会被称为“commit”,在 Compaction 之前的历次提交都被称作“deltacommit”,即使对于新建 Base File 写入数据的那次提交也是如此,就如同这里一样。
3.4. 第 2 批次
第 2 批次更新了一小部分数据,Hudi 将更新数据写入到了 Log 文件中,大小 788KB,fileVersion 是 1,它从属于上一步生成的 Parquet 文件,即 Parquet 文件是它的 Base File ,这个 Log 文件的 fileId 和尾部的时间戳(baseCommitTime)与 Parquet 文件是一样的。当前的 Parquet 文件和 Log 文件组成了一个 File Slice。
3.5. 第 3 批次
第 3 批次再次更新了一小部分数据,Hudi 将更新数据又写入到一个 Log 文件中,大小 1.2MB,fileVersion 是 2。与上一个 Log 文件一样,fileId 和尾部的时间戳(baseCommitTime)与 Parquet 文件一致,所以它也是 Parquet 文件的 Delta Log,且按 Timeline 排在上一个 Log 文件之后。当前的 File Slice 多了一个新的 Log 文件。但是,不同于第 2 批次,第 3 批次的故事到这里还没有结束,在该测试用例中,当前测试表的设置是:每三次 deltacommit 会触发一次 Compaction,因此,第 3 次操作后就触发了第 1 次的 Compaction 操作:
于是,在 Timeline 上出现了一个 commit(No.3),同时,在文件系统上,生成了一个新的 96MB 的 Parquet 文件,它是第一个 Parquet 文件连同它的两个 Log 文件重新压缩后得到的,这个新的 Parquet 文件 fileId 没变,但是 instantTime 变成了 Compaction 对应的 commit 时间,于是,在当前 File Group 里,第二个 File Slice 产生了,目前它还只有一个 Base File,没有 Log File。
3.6. 复盘
最后,让我们将此前的全部操作汇总在一起,重新看一下整体的时间线和最后的文件布局:
4. 异步 Compaction
4.1. 关键配置
《Apache Hudi Core Conceptions (4) – MOR: Compaction》的第 2 个测试用例演示了异步 Compaction 的运行机制。测试用的数据表有如下几项关键配置:
配置项 | 默认值 | 设定值 |
hoodie.compact.inline | false | false |
hoodie.compact.schedule.inline | false | false |
hoodie.compact.inline.max.delta.commits | 5 | 3 |
hoodie.copyonwrite.record.size.estimate | 1024 | 175 |
这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。
4.2. 测试计划
该测试用例会先后插入或更新三批数据,然后进行异步的 Compaction 排期和执行,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:
步骤 | 操作 | 数据量(单分区) | 文件系统 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File |
4 | Offline Schedule | N/A | N/A |
5 | Offline Execute | 96.15MB | +1 Compacted Base File |
由于该测试用例的前三步操作与第 3 节(第 1 个测试用例)完全一致,所以不再赘述,我们会从第 4 步操作(Notebook 的 3.8 节)开始解读。
4.3. 异步排期
在完成了和第 3 节完全一样的前三批操作后,时间线和文件系统的情形如下:
这和 3.5 节执行后的状况非常不同,没有发生 Compaction,连排期也没有看到,因为我们关闭了 hoodie.compact.inline
。于是,在接下来的第 4 步操作中(Notebook 的 3.8 节),我们通过 spark-submit
手动发起了一个排期作业(--mode 'schedule'
):
执行后,文件布局没有变化,但是在时间线中出现了一个状态为 REQUESTED
的 Compaction
:
4.4. 异步执行
第 5 步操作(Notebook 的 3.9 节)通过 spark-submit
手动发起了一个执行作业(--mode 'execute'
):
执行后,原 Compaction
状态由 REQUESTED
变为 COMPLETED
,原 Base File 和两个 Log File 被合并打包成一个新的 Base File 文件,大小 96MB:
4.5. 异步排期 + 异步执行
异步的排期和执行可以通过一个命令一步完成,《Apache Hudi Core Conceptions (4) – MOR: Compaction》的第 3 个测试用例演示了这一操作。它的前三步操作与第 2 个测试用例一样,在第四步时,使用了“Schedule + Execute”一起执行的方式(--mode 'scheduleAndExecute'
)一步完成了 Compaction 操作,命令如下:
5. 半异步 Compaction
5.1. 关键配置
《Apache Hudi Core Conceptions (4) – MOR: Compaction》的第 4 个测试用例演示了半异步 Compaction 的运行机制。测试用的数据表有如下几项关键配置:
配置项 | 默认值 | 设定值 |
hoodie.compact.inline | false | false |
hoodie.compact.schedule.inline | false | true |
hoodie.compact.inline.max.delta.commits | 5 | 3 |
hoodie.copyonwrite.record.size.estimate | 1024 | 175 |
这些配置项在介绍概念时都已提及,通过这个测试用例将会看到它们组合起来的整体效果。
5.2. 测试计划
该测试用例会先后插入或更新三批数据,然后进行异步的 Compaction Execute,过程中将重点观察时间线和文件布局的变化,整体测试计划如下表所示:
步骤 | 操作 | 数据量(单分区) | 文件系统 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File |
4 | Offline Execute | 96.15MB | +1 Compacted Base File |
由于该测试用例的前三步操作与第 3 节(第 1 个测试用例)完全一致,所以不再赘述,我们会从第 3 步操作(Notebook 的 5.7 节)开始解读。
5.3. 同步排期
在完成了和第 3 节完全一样的前三批操作后,时间线和文件系统的情形如下:
在该模式下,第 3 次提交自动触发了 Compaction 排期,状态为 REQUESTED
。
5.4. 异步执行
在接下来的第 4 步操作中,通过 spark-submit
手动发起了一个执行作业,排期计划被 consume,原 REQUESTED
状态的 Compaction 变成了 COMPLETED
:
关联阅读
使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (1) – File Layouts
使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (2) – File Sizing
使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (4) – Clustering