亚马逊AWS官方博客

使用 Amazon MSK、Apache Flink 和 Apache Hudi 实现流批一体的数据湖架构

现在,越来越多的业务系统已经完成了全流程的数字化。这不仅意味着业务和行为数据的量级不断上升,也意味着数据的生产和消费变得更加实时。这对大数据,特别是数据湖的架构形成了一些挑战。

数据湖本身是基于对象存储以及松散数据格式,这不仅会影响实时性,也会造成数据更新的困难。针对前者,我们可以走大数据 Lambda 架构(区分高低速处理路线),或者把批处理当做是有界流数据来进行统一处理(Apache Flink 的理念)。针对数据更新,开源社区也贡献了 Apache Hudi 这样方便、自动化地数据文件更新方案。

这篇文章中,我们使用 Lambda 架构,介绍如何分析业务,以及如何以数据湖为本来形成流批一体的架构。

三种业务问题

所有的技术需求都从业务问题开始。根据数据联动要求不同,我们可以把业务问题分成三类。

第一类,完全孤立,即每条数据之间无依赖关系。比如:今天成交了多少笔订单,本日最热门的标签等等,它不依赖过往的数据。这类业务问题即只需要流式数据即可进行计算,答案可以做到即时更新。

第二类,数据拼接,即只需把存量和新进数据进行简单的聚合与拼接关系即可。比如:在流式数据只保存一天的情况下,统计过去一个月的收入。这类业务可以在对存量数据提前进行缓存计算的前提下做到即时更新,但是需要注意数据分割错误(流式与存量数据有重叠)以及缓存出错的问题。

第三类,混合运算,需要同时对存量和新进数据进行去重、窗口、分组等运算。比如:需要找出活跃用户在使用 App 时的行为漏斗,对跨越数个月的行为数据按某个起始行为进行窗口分析。还有就是数据维度变化造成的混合运算,比如:三个月前一辆汽车的销售录入错误,今天才做了修改;咨询公司之前的规则是只要签约就计算业绩,现在则修改成签约、开始执行、执行完毕分别按照不同比例来计算。这类分析很难做到即时出结果,除非使用复杂的缓存结构和逻辑。

处理这类问题的逻辑有两种。

一种以批处理为主,将流式数据落盘,再用批处理的方式来对所有数据进行统一的处理。这个也是大数据 Lambda 架构的默认策略。

另一种是以流式为主,将外部数据纳入到流式处理的逻辑中的 Kappa 架构。比如在 Flink 中,我们就可能会引用外部的维度表,甚至其他的流式表作为旁路数据,在流式数据过来时进行打宽、联合统计等。

从理念上讲,这种方式很优雅,把批处理的数据,也视作流式数据,只不过加上了边界,再统一处理。但不难推断,这会要求流式处理框架及时处理数据的分布式腾挪(Shuffle)问题,同时还得照顾流式计算特定的逻辑,比如水位线(Watermark)、检查点管理(Checkpointing)等,给框架形成了很大的挑战。

就目前的版本(v1.14)来看,使用 Flink 来大一统地做批流一体处理,仍然有诸多的问题和限制。所以,本次我们仍然使用 Lambda 架构的基本理念,使用 Flink 来做数据入湖,以及对实时要求高的处理,而其余部分仍然使用 Amazon Athena 等传统批处理工具。

场景介绍

接下来我们结合一个场景来介绍这套架构。

假设我们是一家房地产公司,现在已经有一套完整的 ERP 系统。因为 ERP 系统的复杂逻辑以及安全上的考虑,我们没有办法去直接抽取原始维度和事实表数据,而是只能去读取汇总后的业务事件日志。

这些数据由一个宽表来存储。它类似销售记录,会把每一次的客户交互事件的详情记录下来,包括经纪人、楼盘、单元、房户、状态,及签约的金额和交互发生的日期。

这样的日志是不会修改的。如果原来的信息有误或者有更新,它会生成一条新的记录,作为对原来数据的更新。

现在,我们的大数据团队已经有一套批处理的流程。这套流程使用 Apache Sqoop,半小时一次,增量抽取 SQL Server 中的数据到我们的数据仓库,然后数据仓库开始执行后续的计算。因为数据量大,并且为了从日志中拆出维度表,写了非常复杂的计算逻辑和各种中间表,SQL 数据多达数千行,中间表上百个,导致业绩看板的延迟高达一小时。

这时候,我们的 CIO 提出了一个要求,希望能相对及时地体现整体销售情况,并且梳理整个的数据处理逻辑,方便以后快速测试不同的指标和业绩的联动关系,并且能很好的控制成本。

问题梳理

这是我们经常会遇到的一种场景,它有这么一些特征。

  • 原始数据杂乱,即我们只能拿到一个结果,需要自行拆解维度和管理数据变更
  • 已经有一套现有的数仓或者类似架构,并且为了应对业务而分成了多层、多级的方式,效率较低,成本较高
  • 没有去系统地做规划,而是尝试使用优化批处理的方式来解决问题,使得 SQL 语句和中间表难以维护,新增业务落地缓慢

通过我们前面讲述的方式,拆解三种不同类别的业务,减少层次、层级和临时表,并且形成批流一体的数据湖,我们就可以解决这些问题。

业务设计

接下来,我们简单演示一下如何形成这样一条批流一体的数据处理流水线。先把业务分成如下三层。

  • 快速层,统计过去 1 天内每个小时的客户事件,以及全天的成交量
  • 拼接层,统计过去 1 年每个销售的合同总金额
  • 慢速层,回答一些复杂问题

这些复杂问题可能包含以下内容:

  • 哪些房不好卖?我们统计每个房有多少销售经手,并且他们平均的销售漏斗步骤数量是多少。不好卖的房,可能经手的销售会很多,并且销售漏斗的路径会很长。
  • 哪些销售的成功率更高?我们统计每个销售的销售漏斗路径,路径越短说明销售成功率就越高。
  • 哪些客户的要求更高?我们统计每个客户针对每套房子的销售路径,如果客户有非常多短的销售路径,说明可能客户要求比较高,在不停地做比较。

这当然是我们假想的规则,目标是尽量让它复杂,需要混合最新的数据和旧有的数据进行运算。

解决方案

整体的架构如下。

  • 数据源:SQL Server。当然它也可以是任意会持续产生数据的源,比如其他数据库、IoT 消息、不断产生的服务器日志等等。
  • 缓冲层:Kafka。它负责接收和存储源源不断产生的数据。
  • 计算层:Flink。它负责从缓冲中持续获取数据并且持续对数据做计算,也负责对原始数据进行入湖(落盘)操作。
  • 数据湖:S3 和 Glue。S3 是数据湖的本体,负责长期、稳妥地存储数据,并且可以接受高并发的查询;Glue 则负责元数据的存储,包括 S3 上的数据的存储格式、字段定义、分区定义等。
  • 服务层:Amazon Athena 和 MySQL。Athena 应对的是交互式、临时、探索式或者对实时性要求不高的查询,而需要实时更新的结果通过 Flink 计算后直接置入 MySQL 供展现层调用。
  • 展现层:Apache Superset。我们使用开源的 Superset 对数据进行简单展示。

接下来我们来看具体步骤。

① SQL Server 建表和开启 CDC

首先,创建一个 RDS 的 SQL Server 实例,然后在同一 VPC 的 EC2 实例使用 mssql-cli 命令行工具连接到这个实例。这个工具的安装方式见这个链接

注意:把 <server> 替换成你的 SQL Server 地址。

mssql-cli -S <server>

输入用户名和密码。登录成功后,创建数据库。

CREATE DATABASE MYCOMPANY

选择新创建的数据库。

USE MYCOMPANY

然后,创建业务日志表。

CREATE TABLE SALES (
  SaleId INT,
  AgentId INT,
  AgentName VARCHAR(32),
  UnitId INT,
  UnitName VARCHAR(64),
  BuildingId INT,
  BuildingName VARCHAR(128),
  ProjectId INT,
  ProjectName VARCHAR(128),
  StatusId INT,
  StatusName VARCHAR(32),
  ContractSubtotal DECIMAL(18,4),
  ContractSigningDate DATETIME
)

使用 RDS 提供的函数启用 SQL Server 的 CDC(Change Data Capture,数据变更捕捉)功能。

EXEC msdb.dbo.rds_cdc_enable_db MYCOMPANY

再启用 SALES 表的 CDC 功能。

EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'SALES', @role_name = NULL

此时,我们可以用下列命令来查看 CDC 情况。

EXEC sys.sp_cdc_help_change_data_capture

② 创建 MSK 集群

数据源准备好之后,我们来创建 MSK 集群。

进入 MSK 服务,选择创建 MSK。因为是演示,所以我们使用默认和较低的配置,具体配置如下。

  • Kafka 版本:6.2
  • Broker 实例类型:m5.large
  • 使用可用区的数量:3
  • 每可用区中的 Broker 数量:1
  • Broker 的存储:100GB
  • 配置:使用 MSK 默认配置(注意「自动创建 topic」功能是关闭的)

然后我们会需要选择 VPC,并且选择 3 个不同可用区的子网。选择私有子网即可,因为我们不会从公有子网访问 Kafka 集群。

因为我们是演示并且是内网访问,所以直接使用「无身份验证访问」(Unauthenticated access)。在生产或敏感环境,建议用户启用集群的身份验证。同理,我们也在客户端与集群之间启用明文数据传输(Plaintext)。数据的加密使用默认 MSK 的托管密钥即可。

如果想查看日志,我们也可以开启日志传输到 Amazon CloudWatch Logs 或者 Amazon S3 桶内。日志组和桶需要提前创建。

创建集群通常需要较长时间(10-15 分钟)。创建完成后,点击「查看客户端信息」按钮,然后在「Bootstrap servers」一栏下复制「明文」的「私有节点」备用。

③ 安装 Kafka 客户端和 Debezium 并创建 Topic

由于篇幅限制,Kafka、Kafka Connect 以及 Debezium 的安装不在本文赘述,请参考我另一篇文章。注意:我们本次将使用 Debezium 的 SQL Server 连接器,而不是原文中的 MySQL,下载时请注意区分。

编辑配置文件 /etc/opt/kafka/connect-standalone.properties,配置之前复制的 MSK 节点,并且配置好插件目录。

bootstrap.servers=<msk-servers>
plugin.path=/opt/kafka-connectors

因为是 SQL Server,所以 Debezium 的配置略有不同。

cat <<EOF > /etc/opt/kafka-connectors/debezium-connector-mssql.properties
name=mssql-to-s3
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=<rds-mssql-server>
database.port=1433
database.user=<user>
database.password=<password>
database.server.id=12345
database.server.name=mssql
database.dbname=MYCOMPANY
database.history.kafka.bootstrap.servers=<msk-servers>
database.history.kafka.topic=dbhistory
decimal.handling.mode=string
include.schema.changes=false
value.converter=org.apache.kafka.connect.json.JsonConverter

在启动 Debezium 之前,我们需要手动创建 MYCOMPANY 这个 Topic。先确认 Kafka 可以正常连接。

cd /opt/kafka
bin/kafka-topics.sh --bootstrap-server <msk-servers> --list

此时应该会打出如下 MSK 内置 Topic。

__amazon_msk_canary
__consumer_offsets

然后我们可以创建 Topic。

bin/kafka-topics.sh --bootstrap-server <msk-servers> --create --topic mssql.dbo.SALES

接下来就按上一篇文章中的启动 Kafka Connect 服务即可。我们不需要配置 Sink Connector,因为数据后续会通过 Flink 来读取。

④ 启动 Amazon EMR 集群并配置 Flink

现在,新的数据会源源不断地注入 Kafka 集群,我们要做的就是用 Flink 来处理这些数据。我们直接使用 EMR 中的 Flink,因为它已经内置了对 S3、Glue 和 Hudi 的集成,而这些正是我们后续需要用到的服务。

我们先创建一个 S3 桶用作为数据湖的主题。然后,启动一个 EMR 集群,使用如下配置。

  • EMR 版本:6.4.0
  • 应用:选择 Hive 和 Flink
  • 元数据:选择使用 Glue 存储元数据
  • 权限:给应用读写数据湖 S3 桶的权限
  • 节点组:只需要 1 个主节点,不需要核心和任务节点
  • SSH 密钥:使用已有的 SSH 密钥
  • 安全组:允许本机 IP 段访问 22 端口

启动 EMR 集群后,我们 SSH 登录到主节点。注意 EMR 的用户是 hadoop 而不是常见的 ec2-user。

接下来,我们要下载 Flink Kafka 源连接器,以及 Flink Hudi 目标连接器。二者的已编译版本可以在 Maven Central 直接搜索下载。本次测试,我们下载如下版本:

  • flink-sql-connector-kafka_2.12-1.13.6.jar
  • hudi-flink-bundle_2.12-0.10.1.jar

下载好后,我们就可以测试 Flink。Flink 应用本身支持「会话」(Session)和「独立任务」(Per Job)两种部署模式。为简便起见,我们这次使用会话模式。

首先,启动一个 Flink YARN 会话。EMR 提供了一个脚本帮助我们启动会话,并且修改本地配置信息。

flink-yarn-session --detached

启动后,我们可以先用 Flink SQL 客户端来做测试。

bin/sql-client.sh embedded --library .

创建如下 Flink 表。

创建如下 Flink 表。
CREATE TABLE SOURCE__SALES (
  SaleId INT PRIMARY KEY,
  AgentId INT,
  AgentName VARCHAR(32),
  UnitId INT,
  UnitName VARCHAR(64),
  BuildingId INT,
  BuildingName VARCHAR(128),
  ProjectId INT,
  ProjectName VARCHAR(128),
  StatusId INT,
  StatusName VARCHAR(32),
  ContractSubtotal DECIMAL(18,4),
  ContractSigningDate BIGINT
)
WITH (
  'connector' = 'kafka',
  'topic' = 'mssql.dbo.SALES',
  'properties.bootstrap.servers' = '<msk-servers>',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testgroup1',
  'format' = 'debezium-json'
);

然后我们就可以直接把 Kafka 流中的数据当做普通的数据库表来查询。

可以看到 Flink 提交了一个任务,然后开始执行,数秒之后,就能看到数据。

SELECT * FROM default_catalog.default_database.SOURCE__SALES;

此时如果我们换一个窗口,继续往 SALES 库中插入数据,我们也会在数秒内看到 Flink SQL 客户端的数据实时进行了更新。

⑤ 执行 Flink 任务

测试完成后,我们就可以正式执行 Flink 任务了。为方便起见,我们使用 Flink 的 Python Table API,并且直接使用 SQL 语句来表达我们的流式计算逻辑。

在开始之前,我们需要确保安装好了 Python 3。

创建好后,我们写如下代码,存成 test.py。

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.add_jars(
  "file:///home/hadoop/flink-sql-connector-kafka_2.12-1.13.6.jar",
  "file:///home/hadoop/hudi-flink-bundle_2.12-0.10.1.jar"
)

s_env.enable_checkpointing(10000, CheckpointingMode.EXACTLY_ONCE)

t_env = StreamTableEnvironment.create(s_env)
t_env.get_config() \
  .get_configuration() \
  .set_string('parallelism.default', '1')

my_source_ddl = """
CREATE TABLE SOURCE__SALES (
  SaleId INT PRIMARY KEY,
  AgentId INT,
  AgentName VARCHAR(32),
  UnitId INT,
  UnitName VARCHAR(64),
  BuildingId INT,
  BuildingName VARCHAR(128),
  ProjectId INT,
  ProjectName VARCHAR(128),
  StatusId INT,
  StatusName VARCHAR(32),
  ContractSubtotal DECIMAL(18,4),
  ContractSigningDate BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'test1.dbo.SALES',
  'properties.bootstrap.servers' = '<msk-servers>',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testgroup1',
  'format' = 'debezium-json'
)
"""

my_sink_units = """
CREATE TABLE UNITS (
  UnitId INT PRIMARY KEY,
  UnitName VARCHAR(64),
  BuildingId INT,
  ProjectId INT
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/units/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'gzip'
)
"""

my_sink_buildings = """
CREATE TABLE BUILDINGS (
  BuildingId INT PRIMARY KEY,
  BuildingName VARCHAR(128)
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/buildings/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'gzip'
)
"""

my_sink_projects = """
CREATE TABLE PROJECTS (
  ProjectId INT PRIMARY KEY,
  ProjectName VARCHAR(128)
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/projects/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'gzip'
)
"""

my_sink_statuses = """
CREATE TABLE STATUSES (
  StatusId INT PRIMARY KEY,
  StatusName VARCHAR(32)
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/statuses/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'gzip'
)
"""

my_sink_sales = """
CREATE TABLE SALES (
  SaleId INT PRIMARY KEY,
  UnitId INT,
  StatusId INT,
  ContractSubtotal DECIMAL(18,4),
  ContractSigningDate BIGINT,
  AgentId INT
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/sales/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'gzip'
)
"""

my_sink_agents = """
CREATE TABLE AGENTS (
  AgentId INT PRIMARY KEY,
  AgentName VARCHAR(64)
) WITH (
  'connector' = 'hudi',
  'write.tasks' = '4',
  'path' = 's3://<hudi-bucket>/agents/',
  'table.type' = 'COPY_ON_WRITE',
  'hoodie.parquet.compression.codec' = 'snappy'
)
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_units)
t_env.execute_sql(my_sink_buildings)
t_env.execute_sql(my_sink_projects)
t_env.execute_sql(my_sink_statuses)
t_env.execute_sql(my_sink_sales)
t_env.execute_sql(my_sink_agents)

my_insert_units = """
INSERT INTO `UNITS`
SELECT `UnitId`,
  LAST_VALUE(`UnitName`) AS `UnitName`,
  LAST_VALUE(`BuildingId`) AS `BuildingId`,
  LAST_VALUE(`ProjectId`) AS `ProjectId`
FROM `SOURCE__SALES`
GROUP BY `UnitId`
"""

my_insert_buildings = """
INSERT INTO `BUILDINGS`
SELECT `BuildingId`, LAST_VALUE(`BuildingName`) AS `BuildingName`
FROM `SOURCE__SALES`
GROUP BY `BuildingId`
"""

my_insert_projects = """
INSERT INTO `PROJECTS`
SELECT `ProjectId`, LAST_VALUE(`ProjectName`) AS ProjectName
FROM `SOURCE__SALES`
GROUP BY `ProjectId`
"""

my_insert_statuses = """
INSERT INTO `STATUSES`
SELECT `StatusId`, LAST_VALUE(`StatusName`) AS StatusName
FROM `SOURCE__SALES`
GROUP BY `StatusId`
"""

my_insert_sales = """
INSERT INTO `SALES`
SELECT SaleId,
  LAST_VALUE(UnitId) AS UnitId,
  LAST_VALUE(StatusId) AS StatusID,
  LAST_VALUE(ContractSubtotal) AS ContractSubtotal,
  LAST_VALUE(ContractSigningDate) AS ContractSigningDate,
  LAST_VALUE(AgentId) AS AgentId
FROM `SOURCE__SALES`
GROUP BY `SaleId`
"""

my_insert_agents = """
INSERT INTO `AGENTS`
SELECT `AgentId`, LAST_VALUE(`AgentName`) AS AgentName
FROM `SOURCE__SALES`
GROUP BY `AgentId`
"""

t_env.execute_sql(my_insert_buildings)
t_env.execute_sql(my_insert_projects)
t_env.execute_sql(my_insert_sales)
t_env.execute_sql(my_insert_statuses)
t_env.execute_sql(my_insert_units)
t_env.execute_sql(my_insert_agents)

在这段代码中,我们做了这么几件事情。

  • 创建一个 SOURCE__SALES 表,这个表直接对接 Kafka 中的数据
  • 创建出多个维度表,使用 Apache Hudi 格式,使用 GZIP 或者 Snappy 进行压缩,并对接 S3
  • 所有 Apache Hudi 表都使用 Copy-on-Write(有更新时重建数据文件)的方式存储
  • 从 SOURCE__SALES 表中实时拆解出维度,并实时插入到维度表中
  • 拆解维度的方式是只保留最后一个值,使用 LAST_VALUE() 这个内置函数
  • 提交数个 Flink 任务,分别执行各维度表的实时插入

注意,实际在运行时为了方便对任务的编辑、启停,我们可能会把任务拆成多个 Python 文件。此处为简便起见,我们使用同一个文件。

然后就可以使用 EMR 提供的便捷脚本启动 Flink YARN Session。

flink-yarn-session --detached

接着,运行 Flink。

PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 flink run -py test.py

运行完成后,通过 SSH 转发 Flink Web UI 的端口,我们就可以在 Flink 的控制面板看到正在运行的 Flink 任务了。

使用 Athena 进行查询

在 S3 桶中,我们可以看到实际产生的 Parquet 文件和 Hudi 本身的一些元数据文件。随着新记录的生成,这些文件会不断更新、压紧(Compaction),而这些都是在线完成,无需停止流式处理任务。

因为 Athena 支持 Hudi 格式,所以我们可以非常容易地扫描这些文件中的数据。使用以下语句即可在 Athena 中针对 Hudi 格式的数据建表。

CREATE EXTERNAL TABLE `agents` (
  `_hoodie_commit_time` string,
  `_hoodie_commit_seqno` string,
  `_hoodie_record_key` string,
  `_hoodie_partition_path` string,
  `_hoodie_file_name` string,
    AgentId INT,
  AgentName VARCHAR(64))
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<hudi-bucket>/agents/'

我们可以分别针对不同的文件夹(也即是数据表)进行建表操作。上面带 __hoodie_ 前缀是的 Hudi 本身的原信息,后面则跟着实际的数据字段定义。

创建完成后,我们就可以直接针对这些表进行交互式查询。

可以看到,因为 Parquet 文件本身是高度压缩,所以数百条数据的查询语句,实际上只扫了数 MB 的文件。

有了这样一个查询引擎,我们就可以快速搭建起我们的数据探索和消费体系,比如用 Superset 快速搭建一套销售情况的展示图,进行分钟级的更新。这个延迟远低于原来一小时的数据延迟,并且不会浪费时间每次去汇总已经计算过的数据,在时延和成本上都达到了业务上的要求。

总结

现在,我们处于大数据时代的下半场——大部分希望能够构建大数据体系的公司都已经有一些基础。要真正使数据产生价值,我们需要从架构上支撑起两个需求。

  • 实时性。更及时地处理数据,并且能针对新旧数据进行联合查询。
  • 可变性。虽然维度和事务型数据的变化通常较小,但是这些变化往往会对数据意义有较大影响(比如业绩规则变化),我们应该有自动化的机制能允许这些变化,并且不影响在线的查询业务

在这篇文章中,我们展示了这样一个思路,即使用 Flink 来进行流式处理,并且使用 Hudi 来支持数据的变化。除了对数据变化的支持,Hudi 引擎本身还可以做到在线压紧、分区等方便的功能。

Flink 和 Hudi 的社区都非常活跃,二者也在不断发展之中,相信在不远的将来,流批一体的数据湖架构将会成为主流,帮助有丰富数据的公司更快地实现业务价值。

本篇作者

张玳

AWS 解决方案架构师。十余年企业软件研发、设计和咨询经验,专注企业业务与 AWS 服务的有机结合。译有《软件之道》《精益创业实战》《精益设计》《互联网思维的企业》,著有《体验设计白书》等书籍。