亚马逊AWS官方博客

新增功能 – 使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。

我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:

  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。
  • 在您必须处理指定数据插入和更新事件时,处理流数据。
  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。
  • 恢复延迟到达的数据,或分析截至特定时间点的数据。

从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。

使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。

启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。

Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:

  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。
  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。

我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。

将 Apache Hudi 与 Amazon EMR 结合使用
我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。

当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
              --conf "spark.sql.hive.convertMetastoreParquet=false"
              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//将各种输入值设置为变量
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName

//设置我们的 Hudi 数据源选项
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb",
    HoodieWriteConfig.TABLE_NAME -> hudiTableName,
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp",
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb",
    DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

//从 S3 中读取数据并使用分区和记录密钥创建 DataFrame
val inputDF = spark.read.format("parquet").load(inputDataPath)

//将数据写入 Hudi 数据集中
inputDF.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save(hudiTablePath)

在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:

scala> inputDF2.count()
res1: Long = 10491958

在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:

hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958
...

现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:

val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"

我执行 SQL 语句来查看列的当前值:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_003|
+------------+

然后,我选择并更新记录:

//使用单个记录创建 DataFrame 并更新列值
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
                      .withColumn("elb_name", lit("elb_demo_001"))

现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:

//将 DataFrame 编写为现有 Hudi 数据集的更新
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(hudiTablePath)

在 Spark Shell 中,我检查了更新的结果:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:

//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
                "org.apache.hudi.EmptyHoodieRecordPayload")
        .mode(SaveMode.Append)
        .save(hudiTablePath)

在 Spark Shell 中,我发现记录不再可用:

scala> spark.sql(sqlStatement).show()
+--------+                                                                      
|elb_name|
+--------+
+--------+

Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:

此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。

使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。

创建“读取时合并”数据集时,会创建两个 Hive 表:

  • 第一个表与数据集的名称匹配。
  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time

查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。

现已推出
此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!

Danilo