亚马逊AWS官方博客

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (1) – File Layouts

在以往众多介绍 Apcache Hudi 的文章中,对核心概念的解读大多会引用官方文档中的概念图,像 Timeline(时间线)、File Layouts(文件布局)这类结构清晰,逻辑严谨的概念,图解是很好的说明方式。但是,抽象概念与实际运行状况还是有不少差异的,相信很多学习和使用 Hudi 的开发者都曾尝试过:将文档中的概念和 Hudi 的实际运行状况结合起来推导每个动作背后的逻辑是什么。这个过程非常有意义,因为它可以帮助我们透彻地理解 Hudi 的工作原理。

在这方面,Notebook 是一个绝佳的工具,我们可以利用 Notebook 良好的交互能力,设计一系列有针对性的操作,让这些操作去触发 Hudi 的某些机制,然后观察 Hudi 数据集的状态(包括元数据和存储文件),再结合对 Hudi 相关概念的介绍,解读这些行为。利用 Notebook 提供的统一环境和上下文,我们可以非常生动地观察到 Hudi 的运行机制,并深刻体会到其背后的控制原理,这也正是本系列文章的写作灵感:我们希望借助 Notebook“探索,发现,思考,领悟”的思维模式,带领大家开启一段 Hudi 核心概念的探索之旅。

1. Notebook 介绍

本系列文章配备一组专门开发的 Notebook,文章的讲解与 Notebook 演示紧密结合,Notebook 项目地址如下:

项目名称 项目地址
Apache Hudi Core Conceptions https://github.com/bluishglc/apache-hudi-core-conceptions

Notebook 的运行环境使用的是 Amazon EMR Studio(一种面向 Amazon EMR 的托管 Notebook 环境),如果您没有 AWS 账号,可以自行修改 Notebook 适配到任何支持 Spark Kernel 的 Notebook 环境中。Notebook 还使用了一个公共数据集:Amazon Customer Reviews,它是 Amazon 购物网站上的用户评价数据,总体积 50GB,存放在 S3 上,地址:s3://amazon-reviews-pds[1],如果您没有 AWS 账号,可以通过 S3 客户端工具下载到本地使用。Notebook 使用的 Hudi 版本是 0.12.1,Spark 集群建议配置:32 vCore / 128 GB 及以上。

为了更好地演示 Hudi 的特性,我们对原始数据做了一些必要的裁剪,并将裁剪工作封装成一个独立的 Notebook:《Apache Hudi Core Conceptions (1) – Data Preparation》,对应文件:1-data-preparation.ipynb,这个 notebook 主要完成两项工作:

  1. 使用 Spark SQL 创建外部表:all_reviews,location 指向 s3://amazon-reviews-pds,便于通过数据表的形式读取原始数据。
  2. 使用 Spark SQL 创建正式的源数据表:reviews,该表在 all_reviews 基础上做了如下裁剪:
    • 为保证所有记录大小基本一致[2],仅选取 review_id, star_rating, review_body, review_date 和 year 五个必要字段,同时将长短不一的 review_id 和 review_body 两个字段的值使用定长的 uuid 字符串替换。
    • 为便于按年份读取数据,用 year 重新进行了分区。

其他的 Notebook 均以测试用例的形式组织,每个用例都会创建用例专属的数据表,并根据测试意图配置特定的 Hudi 属性。这些数据表的结构与数据准备时创建的 reviews 表基本一致,只是多了 Hudi 需要的 preCombineField 列 timestamp 和为演示特别设计的分区列 parity。parity 是基于 review_id 的 crc32 编码对 2 取余后得到的数值 0 或 1,设计该列作为分区列的考量是:保证测试表即不会没有分区,这样说明性会不足,又不会有太多分区,这样会干扰解读,所以使用这种只有 0、1 两种取值的列作分区是非常适合的。

提示:运行任何 Notebook 前,请先至少执行一次《Apache Hudi Core Conceptions (1) – Data Preparation》,以确保源数据表 reviews 被创建。所有 Notebook 都定义了一个环境变量:S3_BUCKET,需要您将其修改为自己的 S3 桶。

2. 运行 Notebook

本文会使用到两个 Notebook:《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》《Apache Hudi Core Conceptions (3) – MOR: File Layouts & File Sizing》,对应文件分别是:2-cow-file-layouts-file-sizing.ipynb 和 3-mor-file-layouts-file-sizing.ipynb。运行前,请先修改 Notebook 中的环境变量 S3_BUCKET,将其设为您自己的 S3 桶,并确保用于数据准备的 Notebook:《Apache Hudi Core Conceptions (1) – Data Preparation》已经至少执行过一次。本文,我们只关注两个 Notebook 中和 COW 表以及 MOR 表文件布局有关的内容,对于运行过程中发生的 File Sizing 和 Compaction 动作会在后续文章中专门解读。

3. 核心概念

File Layouts(文件布局)是指 Hudi 的数据文件在存储介质上的分布,Hudi 会严格管理数据文件的命名、大小和存放位置,并会在适当时机新建、合并或分裂数据文件,这些逻辑都会体现在文件布局上。

Hudi 在文件操作上有一个重要“原则”:Hudi always creates immutable files on disk。 即:文件一旦创建,永远不会再更新,任何添加、修改或删除操作只会在现有文件数据的基础上合并输入数据一起写入到下一个新文件中,清楚这一点对我们解读文件布局很有帮助。

整体上,Hudi 文件布局的顶层结构是数据表对应的 base 目录,下一层是各分区目录,分区目录可根据分区列数量嵌套多层,这和 Hive/Spark 的表结构是一致的。在最底层分区文件夹上,Hudi 不再创建子文件夹,全部都是平铺的数据文件,但是这些文件在逻辑上依然有着清晰的层级关系。顶层的文件集合是 File Group,File Group 下面是 File Slice,File Slice 下面就是具体的数据文件了。我们反过来,按从下到上的顺序梳理一下这些文件和文件集合:

  • Base File

Base File 是存储 Hudi 数据集的主体文件,以 parquet 等列式格式存储,所以我们在 Hudi 中看到的 parquet 文件基本都是 Base File。实际上,Base File 的命名是为了呼应 Log File,在没有 Log File 的 COW 表里,Base File 就是基层的数据存储文件,没必要强调它的“Base”身份,直接叫 Parquet 文件就可以。Base File 遵循一致的命名规范,格式为:

<fileId>_<writeToken>_<instantTime>.parquet

以下是一个真实的 Base File 文件名:

fileId 部分是一个 uuid,我们会在多个文件中看到相同的 fileId,这些 fileId 相同的文件就组成了一个 File Group。instantTime 是写入这个文件对应的 instant 的时间,也是该文件的一个“版本”标注,因为一个 Base File 历经多轮增删改操作后就会产生多个版本,Hudi 就使用 instantTime 对它们进行标识。不管是 MOR 表还是 COW 表,都有 Base File,只是在 COW 表里只有 Base File,在 MOR 表里除了 Base File 还有 Log File。

  • Log File

Log File 是在 MOR 表中用于存储变化数据的文件,也常被称作 Delta Log,Log File 不会独立存在,一定会从属于某个 Parquet 格式的 Base File,一个 Base File 和它从属的若干 Log File 所构成的就是一个 File Slice。Log File 也遵循一致的命名规范,格式为:

.<fileId>_<baseCommitTime>.log.<fileVersion>_<writeToken>

以下是一个真实的 Log File 文件名:

不同于 Base File,Log File 文件名中时间戳部分并不是 Log File 自己对应的 instanceTime,而是它所从属的 Base File 的 instanceTime,即 baseCommitTime。如此一来,就没有办法通过时间戳来区分 Log File 提交的先后顺序了,所以 Hudi 在 Log File 文件名中加入了 fileVersion,它是一个从 1 开始单调递增的序列号,用于标识 Log File 产生的顺序。

  • File Slice

在 MOR 表里,由一个 Base File 和若干从属于它的 Log File 组成的文件集合被称为一个 File Slice。应该说 File Slice 是针对 MOR 表的特定概念,对于 COW 表来说,由于它不生成 Log File,所以 File Silce 只包含 Base File,或者说每一个 Base File 就是一个独立的 File Silce。总之,对于 COW 表来说没有必要区分 File Silce,也不没必要强调 Base File 的“Base”身份,只是为了概念对齐,大家会统一约定 Hudi 文件的三层逻辑布局为:File Group -> File Slice -> Base / Log Files。

  • File Group

在前面介绍 Base File 时,我们已经提到了 File Group,简单说,就是 fileId 相同的文件属于同一个 File Group。同一 File Group 下往往有多个不同版本(instantTime)的 Base File(针对 COW 表)或 Base File + Log File 的组合(针对 MOR 表),当 File Group 内最新的 Base File 迭代到足够大( >100MB)时,Hudi 就不会在当前 File Group 上继续追加数据了,而是去创建新的 File Group。

4. COW 表的 File Layouts

运行《Apache Hudi Core Conceptions (2) – COW: File Layouts & File Sizing》的第 1 个测试用例,可以在 2.8 节看到类似下图这样一个经由实际操作产生的 COW 表的文件布局:


图中:青色、紫色和红色标注的文件分属于三个 File Group,因为它们开头的 uuid 是三个不同的值,从文件名尾部的 instantTime 可以推断它们被创建的先后时间。当前的文件布局是历经 4 次操作演进而来,以分区 parity=0 为例,时间线(Timeline)如下:

①:第 1 次插入 96M 数据,生成了第一个 Parquet 文件;

②:第 2 次插入 14M 数据,在 Copy On Write 机制运作下,插入的 14M 数据与原 96M 数据合并写入新的 Parquet 文件,大小 110M,fileID 不变;

③:第 3 次插入 3.7M 数据,由于现有 Parquet 文件已经超过了 100M 的阈值,被 Hudi 判定为大文件,故不再选择它进行 Copy On Write 操作,转而生成新的 fileId,创建新的 File Group,并将数据写入新 File Group 的 Parquet 文件中,大小 3.7M;

④:第 4 次插入 182M 数据,现有 3.7M 的文件是一个小文件,Hudi 会选择以该文件为基础,将其 3.7M 的数据和新插入数据一起合并写入新的 Parquet 文件,由于这次插入的数据量较大,写入文件的体积将会超过 Hudi 规定的单一 Parquet 文件的上限(120M),所以 Hudi 将 182M 中的 116M 与现有 3.7M 数据合并,写满一个 Parquet 文件(120M),同时创建第 3 个 File Group,将另外 66M 数据写入第 3 个 File Group 下的 Parquet 文件中。注意:66M 的文件使用红色标记,属于一个独立的 File Group,但它却是第④次提交的产物,不存在第⑤次提交。

关于整条时间线上发生的与 File Sizing 有关的细节我们会在后续文章中单独介绍,本文先聚焦文件布局本身。

5. MOR 表的 File Layouts

运行《Apache Hudi Core Conceptions (3) – MOR: File Layouts & File Sizing》的第 1 个测试用例后,可以在 2.8 节看到类似下图这样一个经由实际操作产生的 MOR 表的文件布局:


图中:单一分区下的所有文件属于一个 File Group(uuid 全部相同),黄色和蓝色标注的文件分属于两个不同的 File Slice,其中两个 Parquet 文件是两个 File Slice 各自的 Base File,它们各带两个 Log File。当前的文件布局是历经 4 次操作(5 次提交)演进而来,以分区 parity=0 为例,时间线(Timeline)如下:

①:第 1 次插入 96M 数据,生成了第一个 Base File;

②:第 2 次更新了其中的一小部分数据,生成了从属于第一个 Base File 文件的第一个 Log File,大小 804K,fileVersion 是 1;

③:第 3 次又更新了其中的一小部分数据,生成了从属于第一个 Base File 文件的第二个 Log File,大小 1.2M,fileVersion 是 2;

④:由于该表启用了同步压缩(Inline Compaction),并将触发 Compaction 的 deltacommits 阈值设为了 3,所以第 3 次提交后触发了同步的 Compaction 操作,Hudi 将此前的 Base File 和两个 Log File 压缩成一个新的 Base File,大小 96M,fileId 不变,这样就出现了第二个 File Slice。每次 Compaction 都会进行一次独立的提交(即 commit,非 deltacommit),所以新 Base File 尾部的时间戳更新为 Compaction 这次提交对应的 instantTime;

⑤:第 4 次更新(但却已是第 5 次提交)的数据量达到了 307M,由于该测试表的 Log File 上限被设定为了 250M,此次提交触发了 Log File 的分裂,Hudi 将更新数据写入了两个 Log File,一个 268M,fileVersion 是 1;另一个 39M,fileVersion 是 2。

关于整条时间线上发生的与 File Sizing 和 Compaction 有关的细节我们会在后续文章中单独介绍,本文先聚焦文件布局本身。

[1]该桶部署于 AWS 海外区,如果您持有 AWS 海外区账号可直接访问,如果您是 AWS 中国区用户或没有 AWS 账号,可以通过 S3 客户端工具将数据集下载至本地使用。关于该数据源的更多信息请参考:https://s3.amazonaws.com/amazon-reviews-pds/readme.html

[2]Hudi 对文件进行分割时会根据此前提交的数据情况估算记录的平均大小,如果记录本身的大小不均,Hudi 的估算就会出现较大的偏差,这会对我们的演示和解读造成干扰,关于此话题,本系列的第 2 篇文章会进行专门的讨论。

关联阅读

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (2) – File Sizing

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (3) – Compaction

使用 Amazon EMR Studio 探索 Apache Hudi 核心概念 (4) – Clustering

本篇作者

Laurence

AWS 资深解决方案架构师,多年系统开发与架构经验,对大数据、云计算、企业级应用、SaaS、分布式存储和领域驱动设计有丰富的实践经验,著有《大数据平台架构与原型实现:数据中台建设实战》一书。