亚马逊AWS官方博客
使用 Serverless 架构快速构建基于 Iceberg 的事务型实时数据湖
1. 背景介绍
随着业务的不断的发展,对海量数据分析场景也变的更加多样化,当前流行的大数据技术场景往往需要处理 PB 级的大规模数据,并且对分析场景的数据实时性要求更高。用户通常会面临以下问题:1. 在处理大规模数据时需要频繁的扩缩资源,带来繁重的运维工作;2. 对于数据库到数据湖的同步,需要对数据库事务性的数据变更进行处理才能够使得数据库与数据湖之间数据保持一致性。因此,本文提出基于 Serverless 服务来构建数据湖,该方案主要利用 MSK Connect,MSK Serverless,Glue,Athena 来构建无服务的数据湖方案,来帮助用户降低海量数据实时分析场景中的架构复杂度以及运维难度。
2. 架构设计
本方案包含以下几个步骤:
- 在 MSK Connect 中创建 MySQL 到 MSK 的连接器,以便将数据流入 Kafka 集群中。
- 配置 Glue Streaming Job,通过消费 MSK 中的 CDC 数据,将数据实时写入 Iceberg 表中。
- 在 Athena 中对存入 Iceberg 表的数据进行查询和分析,并且对 Iceberg 表进行优化。
方案架构
MSK Connect:MSK Connect 是 Amazon MSK 的一项功能,它使开发人员可以轻松地将数据流入和流出他们的 Apache Kafka 集群。MSK Connect 使用 Kafka Connect 2.7.1 版本,这是一个用于将 Apache Kafka 集群与数据库、搜索引擎和文件系统等外部系统连接起来的开源框架。使用 MSK Connect,可以部署专为 Kafka Connect 构建的完全托管的连接器,这些连接器将数据移入常用数据存储(如 Amazon S3 和亚马逊 OpenSearch 服务)或从中提取数据。可以部署第三方开发的连接器,例如 Debezium,用于将数据库中的更改日志流式传输到 Apache Kafka 集群,或者部署现有连接器,无需更改代码。Connector 会自动扩展以适应负载的变化,只需为使用的资源付费。
MSK Serverless:MSK Serverless 是 Amazon MSK 的一种集群类型,它使您无需管理和扩展集群容量即可运行 Apache Kafka。它可以在管理主题中的分区的同时自动预置和扩展容量,因此您可以流式传输数据,而无需考虑调整群集大小或扩展群集。MSK Serverless 提供基于吞吐量的定价模式,因此您只需按实际用量付费。如果您的应用程序需要自动向上和向下扩展的按需流式传输容量,请考虑使用无服务器集群。
Glue:AWS Glue 是一项无服务器数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。您可以将其用于分析、机器学习和应用程序开发。它还包括用于编写、运行任务和实施业务工作流程的额外生产力和数据操作工具。
Athena:Amazon Athena 是一种交互式查询服务,能够轻松使用标准 SQL 直接分析 Amazon Simple Storage Service (Amazon S3) 中的数据。Athena 也支持 ACID 事务性数据的查询,可以使用标准的 SQL 对 Iceberg 表数据进行增、删、改、查的操作。
Apache Iceberg:Apache Iceberg 是 Amazon S3 中适用于大型数据集的开放表格式,可提供快速的大型表查询性能、原子提交、并发写入和 SQL 兼容表演进等功能。Glue 3.0/4.0 原生支持开源数据库框架 Iceberg。Iceberg 是为大数据平台设计的,开发者可以使用它来存储和查询 PB 级数据集。除了 Glue,Amazon Athena 也已经与 Iceberg 深度集成,借助 Glue Data Catalog 对元数据的管理,可以快速使用 Athena 对 Iceberg 的表进行查询分析,并且也可以通过 Athena 对 Iceberg 表进行写操作。
3. 方案实现
3.1 CDC
我们使用 MSK Connect 来收集 MySQL binlog 产生的 CDC 数据。在 MSK Connect 中,只需要简单的几步操作,即可完成 CDC 数据的采集任务的配置。
3.1.1 自定义插件
使用 MSK Connect 前,我们需要为 MSK Connect 添加一个 Debezium MySQL Source Connector 插件(使用 1.9.7 版本),插件下载。下载后,在 MSK Connect 控制台的自定义插件菜单中,创建一个自定义插件,并且下载的插件上传到此处。
3.1.2 配置 MSK Connect
在 MSK Connect 控制台,点击创建 Connector,选择已经创建好的 mysql 自定义插件。在 Connector configuration 中,如下为例子:
这里,我们需要注意以下几个配置参数:
- max:MySQL 连接器只使用一个 Task,因此不使用这个值,默认即可。
- creation.enable:由于 MSK Serverless 集群不能通过预置的方式设置集群参数,而 MSK Serverless 集群默认是不会自动创建 topic 的,因此在配置 MSK Connect 向 MSK Serverless 集群摄入数据的时候,需要指定自动创建 topic。
- on.delete:控制是否应在删除事件后生成一个 delete 事件。当设置为 “true “时,删除操作由一个删除事件和一个后续的 tombstone 事件表示。当设置为 false 时,只发送一个删除事件。发出墓碑事件(默认行为)允许 Kafka 在源记录被删除后完全删除所有与给定键有关的事件。 因此,如果设置为 true 时,会收到一个空记录的事件,由于,我们将数据写入 Kafka 之后,需要消费数据根据 I/U/D 处理,因此建议将这个值设置为 false。
- transforms:如果我们需要将 MySQL 中的多个表写入到一个 Topic 中,需要设置此参数,该参数通过使用正则表达式来将符合条件的表数据写入到同一个 Topic 中。
需要注意的是 MSK Serverless 目前只支持 IAM 认证,在配置 MSK Connect 时需要指定 IAM 认证配置参数。
记录 offset
当 MSK Connect 创建时,会自动创建 3 个 Topic (config.storage.topic,offset.storage.topic,status.storage.topic)用于记录当前 Connect 相关的配置、状态、offset 等。当把这个 Connect 删除后,重建时,需要从上一个 Connect 的最后的 offset 恢复,需要指定 offset.storage.topic
参数,这个参数需要配置在 Work Configuration 中。参考
另外,这个 topic 需要预先创建,并且设置 cleanup.policy=compact
。命令如下:
有序性
MSK Connect 通过配置 message.key.columns
作为消息体的 key, kafka 的消息体的 key 默认是所采集的表的主键,如果表没有主键,则消息体的 key 为空。
如果将多表摄入到一个 Topic 时,msk connect 会根据用户表名+主键字段的方式来记录 Key,并且按照 Key 分布到同一个 partition 中。
如果表没有主键,也可以单独根据表来设置该字段,作为数据的有序性。设置方式:
3.2 实时摄入
可以分别使用 Glue 或者 EMR Serverless 来实现数据的实时摄入。Glue 和 EMR Serverless 都支持 Spark,因此核心的业务实现都可以复用。下面分别来介绍这两种数据实时摄入的方法。
3.2.1 Glue 实现方案
3.2.1.1 在 Glue 中创建 Kafka connection
打开【AWS Glue Studio】,在左侧菜单中点击 Connectors,打开 Connectors 页面,单击【Create connection】
- Connection Type 选择 Kafka
- Connection access 选择 MSK,可以在下拉框中选择已经创建好的 MSK 集群。(如果需要关联自建的 Kafka 集群,可以选择 Customer managed Apache Kafka),如果 MSK 设置了身份认证方式,需要在 Authentication 中选择对应的身份认证类型
- 其他参数默认,点击【Create Connector】
3.2.1.2 Glue Streaming 任务
获取 Glue Streaming 代码,参考代码地址,在 Glue Studio 中创建一个 Spark script 任务,并且上传下载的代码。此代码实现了通过 Glue 消费 MSK 中 CDC 日志数据,解析 I/U/D,将数据实时写入 Iceberg 格式的表中。(也可以使用下面提交任务的脚本,自动下载样例代码并且在 Glue 中创建 Job)
多表的实现
在 CDC 进行数据采集时,在 3.2.1 章节中已经介绍了通过 MSK Connect 的配置可以实现将多表数据写入到一个 Topic 中,在 Glue 中,我们可以通过在 topicName
参数中设置多个 Topic,来实现多表的消费。而针对多表数据的处理,通过读取 CDC log 中的 source
的内容来识别多表。
Iceberg Automatic Schema Evolution
对于实时数仓的场景,源表到数据湖时的 automatic schema evolution 是非常重要的。当源端的数据库表结构发生变化,在目标端的数据湖中也需要做到实时的变更。目前 Spark 在 3.2 以上版本(Glue 4.0 支持 Spark 3.3)已经支持针对 Iceberg 表的 Schema 变更自适应处理方法。参考如下方法:
- 创建 Iceberg 表时,在表属性中设置
spark.accept-any-schema=true
Spark 默认在生成计划的时候会检查写入的 DataFrame 和表的 schema 是否匹配,不匹配就抛出异常。所以需要增加一个 TableCapability(TableCapability.ACCEPT_ANY_SCHEMA),这样 Spark 就不会做这个检查,交由具体的 DataFrame 来检查 - 通过 DataFrame 的 writeTo 方法向 Iceberg 表写数据时,设置参数
writeTo(tableName).option("merge-schema", "true").append()
这样,当数据写入 Iceberg 表时,Spark 会通过 DataFrame 对目标表进行表结构的变更处理。
表参数配置
在数据实时入湖时我们还需要注意以下几点:
- 对数据中 Update/Delete 的场景,在数据入湖时,需要根据表的主键字段来更新数据或者删除数据。
- 时间字段的处理。
- 从消息队列消费数据时,可能出现的乱序情况。
因此,增加了一个针对表属性的配置文件,需要在这个配置文件来设置参数。配置文件采用 Json 的格式,示例如下:
参数说明:
参数项 | 描述 | 默认 |
db | 写入到 Iceberg 的库名 | null |
table | 同步的表名,自动建表,因此需要保证这个表名和同步的数据源的表名一致 | null |
primary_key | 用于 UPDATE/DELETE 的主键字段 | null |
write.merge.mode | Iceberg 建表参数【copy-on-write/merge-on-read】 | copy-on-write |
write.update.mode | Iceberg 建表参数【copy-on-write/merge-on-read】 | copy-on-write |
write.delete.mode | Iceberg 建表参数【copy-on-write/merge-on-read】 | copy-on-write |
timestamp.fields | 如果表中存在 timestamp 的字段,将字段名配置在这里,数据处理过程中将会将这个字段在 spark 中设置成 timestamp 类型。 | |
precombine_key | 如果一批数据中同一条记录发生了多次更改,可以选择一个能从多次变化记录中取到最新的那条记录的字段,例如 updatetime 。 |
提交任务
为了方便测试,可以通过以下脚本,来启动一个 Glue Job。
首先,需要将代码依赖的 python 脚本打包成 whl。
创建一个 Glue Job(注意替换参数,例如 kafka-server,s3-bucket 为当前环境的服务地址)
创建成功后,我们可以在 Glue ETL Jobs 中看到它,在控制台中启动这个作业。
3.2.2 EMS Serverless 实现方案
出于成本或者技术栈的选择,我们也可以选择使用 EMR Serverless 来替代 Glue ETL。因为都是使用 Spark 代码实现,因此任务的配置参数已经表参数配置都与 Glue Job 保持一致。只是在 EMR Serverless 中有依赖的 python 库需要通过打包依赖项的方式导入。下面是打包和提交脚本的方法:
- 获取代码
- 打包公共类
- 打包 python 环境并且上传到 S3
- 提交 EMR Serverless 作业
在提交 EMR Serverless 前,需要确认已经在 EMR Studio 中创建了 EMR Serverless Application,并且获取到了 Application ID,需要作为下面提交脚本的输入参数。提交之后,就可以在 EMR Serverless 控制台观察作业的执行状态了。
3.3 使用 Athena 查询 Iceberg 表
3.3.1 查询
当同步任务运行起来之后,就可以在 Athena 中查询相关的数据写入情况。
3.3.2 Iceberg 表优化
对于实时摄入的场景,由于 Iceberg 元数据和版本管理的机制,会导致比较多的小文件,过多的小文件会导致查询变慢,也会带来更多的 S3 请求数量,导致成本的增加,因此需要定时对 Iceberg 表已经维护。通常可以在建表时设置以下参数来指定小文件合并的策略。
1. 优化参数
可以通过设置以下两个参数来控制 Iceberg 元数据膨胀的问题
快照过期
2. 合并小文件参数
- 从 Iceberg 0.11 开始,支持了流式小文件合并。可以通过建表时初始化表属性
TBLPROPERTIES ('write.distribution-mode'='hash')
,动态合并小文件。这样可以从源头直接合并文件,一个 Task 会处理某个分区的数据,提交自己的 Datafile 文件。 - 也可以利用 Athena(以下章节 3.3.3) 或者 Glue (以下章节 3.3.4)来维护 Iceberg 的小文件。
3.3.3 Athena 中优化 Iceberg 表的方法
我们在 Iceberg 表属性中添加快照过期、版本等参数,就可以通过 Athena,对 Iceberg 进行维护。
合并小文
可以通过以下命令对 Iceberg 的文件合并,执行后,它将按照设置的 write.target-file-size-bytes
(默认 512M)参数来对文件进行合并。
在 Athena 提供的合并小文件方法中提供了一个过滤条件,可以选择符合查询条件的数据进行合并。所以如果需要优化的表分区数超出了 100,在 Athena 中就无法对整张表进行优化合并,这是由于 Athena 对于分区查询的一个限制,这个时候可以在 WHERE 中添加分区条件,分批次的执行优化命令。
小文件合并完成后,会生成新的 snapshot,但是这个时候虽然小文件被合并成大文件了,但是历史的文件仍然存放在 S3 中,没有使用但是会占用存储空间。我们需要根据实际的业务需求,对历史变化数据的保留周期来设置合理的快照过期时间,并且设置版本相关参数。
可以在 Athena 中修改表的属性 vacuum_max_metadata_files_to_keep=3
(配置保留历史数量的版本数,默认 100),可以按照实际场景来设置版本。例如设置为 3,元数据和数据都保留 3 份历史数据和 1 份最新数据。
在 Athena 中设置快照过期以及保留元数据文件的数量
可以在 Athena 中修改表的属性
vacuum_max_snapshot_age_seconds
默认 432000 秒 (5 天)vacuum_max_metadata_files_to_keep
默认 100
然后,可以执行下面的命令,删除过期快照以及快照关联的历史数据文件
3.3.4 通过 Glue Data Catalog 优化 Iceberg 表
如果希望通过自动的方式对 Iceberg 表进行优化,那么可以使用 Glue Data Catalog 帮我们自动对 Iceberg 表进行压缩。前提条件是,创建的 Iceberg 表需要使用 Glue Data Catalog。
- 在 Glue Data Catalog 中选择一个 Iceberg 表。可以看到 Action 菜单中会多一个 Enable compation 的选项。
- 开启之后,当向 Iceberg 表中写入数据后,我们可以在 Glue Data Catalog 中观察到执行文件压缩的记录。当然,这里会利用 Glue 的资源进行压缩,所以我们可以看到会有 DPU 的消耗。
- 也可以通过 Iceberg 表的 snapshot 记录查看,如下,每次写入数据的 append 之后,都会有一次 replace 的操作,这就是由 Glue Data Catalog 的自动压缩功能触发的。
4. 总结
在当今数据分析领域,实时分析数据已经成为关键需求。实时数据湖能够提供即时的数据可见性,支持实时决策和业务洞察,为企业带来巨大价值。在这个方案中我们使用 Amazon MSK Connect 实现数据变更数据(CDC)的实时采集并且摄入 MSK Serverless, 无需管理 Kafka 集群。利用 AWS Glue 完成 CDC 数据的解析和处理,实现数据的增删改操作,让数据湖与数据库之间保持了数据一致。采用 Amazon Athena 提供交互式的数据查询。基于 Iceberg 表格式构建实时数据湖,提供统一的数据访问层。所有服务均采用 Serverless 服务,能够大幅降低运维成本和复杂度,提高开发效率。
该方案适用于各种实时数据分析场景,如监控、业务分析、IoT 数据处理等;能够快速构建可扩展的数据湖,满足企业对于数据实时性和分析灵活性的需求;另外也通过 Serverless 架构实现了高可用性、弹性扩展和按需付费等优势,大幅降低了运营成本。