亚马逊AWS官方博客

使用 Amazon EMR 和 Apache Paimon 构建流式数据湖

1 背景

Apache Iceberg、Hudi、Delta Lake 等开源数据湖组件在企业中被广泛采用,Amazon EMR 以及其它相关的数据分析服务也对这些组件进行了原生支持。Apache Paimon 是近年来发展起来的一个流式数据湖平台,相比于其它的开源数据湖组件,其更加侧重数据湖上的流式数据处理。尽管目前(2023 年 10 月)其还处于孵化状态,但是由于其流批统一的设计理念、基于 LSM 的底层数据存储、高速流式数据摄取与分析能力,以及很好的系统稳定性,已经被一些企业用在生产环境中。结合 Apache Paimon 的特性,本文将使用 Amazon EMR 在 Amazon S3 上构建流式数据湖,验证 Apache Paimon 与 Amazon EMR 服务的适配性。

2 构建流式数据湖

Apache Iceberg、Hudi 和 Delta Lake 都支持时间旅行查询,支持增量读,但这三个数据湖组件目前主要以 Spark 生态为主,还不能很好的支持全链路的流式处理。Apache Paimon 先期以 Flink 生态为主,在数据处理层面很自然地实现了流批一体,同时通过流式数据摄取、Changlog Producer、Partial Update Merge Engine、双流 Join、Lookup Join 等特性,可以很好地实现全链路的流式数据处理。

以电商订单数据打宽为例,接下来我们探索利用 Amazon EMR 和 Apache Paimon 来构建流式数据湖。其中订单为主数据,打宽过程需近实时关联多个其他表的数据,如商品数据、类目数据、地域数据。打宽后的订单数据持续地写入下游数据应用层进行查询、统计和分析,整个过程不借助额外的其他流式系统进行数据处理,过程中产生的表和数据都保留在数据湖中,整个过程全部基于数据湖的流处理方式实现,如下图所示。

在数据摄取层,在 Amazon EMR 里面,利用 Flink 提交 Paimon Flink Action 应用,对来自 RDS MySQL 或者 Amazon MSK 中的 CDC 数据进行流式摄取,将订单、商品、类目、地域等数据写入 ODS 层的 Paimon 表中,随后以流读、流写的方式,通过 Paimon 的 Partial Update Merge Engine 以及双流 Look up Join 机制,将多个数据流中的数据合并更新到主表中,实现对订单数据打宽,形成 DWD 层的打宽订单表。同时 Paimon 支持多种方式(Input、Lookup、Full Compaction)对数据表的变更产生变更日志,这样基于打宽的订单数据表,同样可以采用流读、流写的方式将打宽后的变更数据写入到下游的 Amazon RDS、Redshift 或者 OpenSearch 中,整个过程完全由数据湖上的流式数据处理方式实现,不需要像传统流式应用那样需要借助额外的流式系统才能实现数据的流式处理。

2.1 创建 Amazon EMR 集群

在 Amazon EMR Web 控制台创建 EMR 集群,选择 Flink、Spark、Hive 等组件,并选择将 AWS Glue Data Catalog 作为 Hive 和 Spark 的元数据存储。

选择好 EMR 集群实例类型、数量以及 VPC 和子网后添加 Step,用于在集群启动后自动下载和复制 Apache Paimon 相关依赖包到 Flink 的 lib 目录。该 Step 为 Shell 脚本,内容如下所示:

要使用 AWS Glue Data Catalog 作为 Paimon 的元数据存储,需要对这个 Github Repo 做一点改动,也可以直接使用已经改好并编译的 Jar 包,具体改动见此。Shell 脚本准本好后,上传到 Amazon S3 目录。添加 Step 如下图所示:

在软件配置部分,设置 Flink 默认的 checkpoint 时间间隔以及 checkpoint 和 savepoint 的存储路径。

选择已有或则新建 Amazon EMR 服务角色和 EC2 实例配置文件后, 点击“创建集群”即完成支持 Apach Paimon 的 Amazon EMR 集群创建。

2.2 流式摄取数据

Paimon 支持通过相应数据库的 Flink SQL CDC Connector 直接从相应的源库进行流式的全、增量数据摄取, 也支持通过消费 Kafka 中的 CDC 数据进行流式数据摄取。本文使用 Flink SQL MySQL CDC Connector 方式直接从 MySQL 中摄取数据,在这之前,我们已经通过 EMR Step 将相应的 Jar 复制到 Flink lib 目录了。

在进行数据摄取之前, 我们使用 Hive Metastore 创建 Paimon Catalog,并创建数据库,SQL 如下:

CREATE CATALOG paimon_hive_catalog WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
  'hive-conf-dir' = '/etc/hive/conf.dist',
  'metastore.client.class'='com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient',
  'warehouse' = 's3://Your/S3/Path /'
);
use catalog paimon_hive_catalog;
CREATE DATABASE paimon_db;

在创建集群时, 已经选用 AWS Glue Data Catalog 作为 Spark 和 Hive 的元数据存储,因此创建的 Paimon Catalog 实际就是使用 Glue Data Catalog 作为元数据存储,这样 Spark 和 Hive 都能读到 Paimon 创建的库、表信息。

随后提交 Paimon Flink Action 数据摄取任务来进行流式的全、增量数据摄取,代码如下:

/bin/flink run \
    --detached \
    /home/hadoop/paimon-flink-action.jar \
    mysql-sync-database \
    --warehouse s3://Your/S3/Path/ \
    --database paimon_db\
    --mysql-conf hostname=xxdb.xxxxx.region.rds.amazonaws.com \
    --mysql-conf username=YourUsername \
    --mysql-conf password=YourPassword \
    --mysql-conf database-name=YourDB \
    --mysql-conf server-time-zone=Asia/Shanghai \
    --catalog-conf metastore=hive \
    --catalog-conf hive-conf-dir=/etc/hive/conf.dist \
    --catalog-conf metastore.client.class=com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient \
    --catalog-conf warehouse=s3:// Your/S3/Path \
    --table-conf bucket=4 \
    --table-prefix ods_ \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4 \
   --including-tables 'order|item|catalog|location'

任务提交成功后,可以通过 Flink 的控制台查看任务的执行情况以及通过 Flink SQL 查看同步的数据表和数据。由于已经使用了 AWS Glue Data Catalog 作为元数据存储,我们在 Glue Data Catalog 中也能看到同步的数据表。

2.3 流式数据打宽

使用 Paimon 的“partial-update” Merge Engine 可以使用多个流来对同一个表进行写入,非常适合数据打宽的场景。在此基础上,通过 Paimon 提供的 Lookup Join,从维表中补充信息写入到主表中,实现从多个数据流获取数据来更新主表。另外,Paimon 也支持双流 Join,通过主流和其他流的 Join,从其他流中获取数据来更新主表,只不过这种方式相比 Lookup Join 来讲代价要大。在数据打宽的场景,大多都是将维表中的数据补充到主表中,通过 Lookup Join 结合“partial-update” Merge Engine 可以很好地实现多流数据打宽。接下来我们创建 DWD 层的订单表,并以 ODS 层的数据来对 DWD 层的表进行打宽。

创建 DWD 层订单表,建表时表属性“merge-engine”设置为“partial-update”,并采用“full-compaction”的方式产生表的 changlog 以供下游使用。

将 ODS 层的订单表信息写入 DWD 层订单表相关字段中,Flink SQL 如下:

INSERT INTO dwd_order(id, order_no, user_id, item_id, item_cnt, item_price, item_discount, loc_id, 
status, gmt_create, gmt_modified, dt) 
SELECT *, DATE_FORMAT(gmt_create, 'yyyyMMdd') as dt
FROM `ods_order`;

将 ODS 层的地域位置信息通过 Lookup Join 方式写入 DWD 层订单相关字段中,Flink SQL 如下:

INSERT INTO dwd_order(id, dt, province_code, province_name, city_code, city_name, county_code, county_name)
SELECT o.id, DATE_FORMAT(o.gmt_create, 'yyyyMMdd') as dt, l.province_code, l.province_name, l.city_code, l.city_name, l.county_code, l.county_name
FROM `dwd_order` o JOIN `ods_location`
FOR SYSTEM_TIME AS OF o.proc_time AS l
ON o.loc_id = l.id;

将 ODS 层的商品信息通过 Lookup Join 方式写入 DWD 层订单相关字段中,Flink SQL 如下:

INSERT INTO dwd_order(id, dt, item_name, item_spec, item_cat3_id)
SELECT o.id, DATE_FORMAT(o.gmt_create, 'yyyyMMdd') as dt, i.item_name as item_name, i.item_spec as item_spec, i.cat3_id as item_cat3_id
FROM `dwd_order` AS o JOIN `ods_item` 
FOR SYSTEM_TIME AS OF o.proc_time AS i
ON o.item_id = i.id;

最后,将 ODS 层的商品类目信息通过 Lookup Join 方式写入 DWD 层订单相关字段中,Flink SQL 如下:

INSERT INTO dwd_order(id, dt, item_cat1_code, item_cat1_name, item_cat2_code, item_cat2_name, item_cat3_code, item_cat3_name)
SELECT o.id, DATE_FORMAT(o.gmt_create, 'yyyyMMdd') as dt, c.cat1_code, c.cat1_name, c.cat2_code, c.cat2_name, c.cat3_code, c.cat3_name
FROM `dwd_order` AS o JOIN `ods_catalog` 
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.item_cat3_id = c.id;

这些 Flink SQL 任务提交执行后,Paimon 将以流的方式从多个 Paimon 表中获取数据来填充 DWD 层的订单表数据。业务表中的订单数据有更新时,DWD 层的订单表数据也能够被更新。在 Flink SQL Client 中,可以通过以下 SQL 来查询被打宽的数据:

SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM dwd_order where id = 100;

2.4 应用层数据流式写入

由于宽表 dwd_order 通过“full-compaction” 的方式生成表的变更日志,因此可以基于此表以流读、流写的方式写入应用层数据库,如 Amazon RDS、Redshift 或者 Amazon OpenSearch。简单起见,我们将打宽的数据写入 RDS MySQL。在 Paimon Catalog 中建立临时表,SQL 如下:

CREATE TEMPORARY TABLE ads_order (
    id BIGINT, order_no VARCHAR(32), user_id BIGINT,item_id BIGINT,item_cnt SMALLINT, 
    item_name VARCHAR(64),item_spec VARCHAR(64), item_price BIGINT, item_discount BIGINT, 
    item_cat1_code VARCHAR(16),item_cat1_name VARCHAR(32),
    item_cat2_code VARCHAR(16),item_cat2_name VARCHAR(32),
    item_cat3_code VARCHAR(16),item_cat3_name VARCHAR(32),
    item_cat3_id BIGINT,loc_id BIGINT,
    province_code VARCHAR(16),province_name VARCHAR(32),
    city_code VARCHAR(16), city_name VARCHAR(32),
    county_code VARCHAR(16),county_name VARCHAR(32),
    status SMALLINT,
    gmt_create TIMESTAMP(0),
    gmt_modified TIMESTAMP(0),
    dt STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://YourRDSHost:3306/YourDB',
   'table-name' = 'ads_order',
   'username' = 'YourUserName',
   'password' = 'YourPassword'
);

将 DWD 层打宽的数据写入,SQL 如下:

INSERT INTO ads_order 
SELECT id, order_no, user_id, item_id, item_cnt, item_name, item_spec,item_price, item_discount,
item_cat1_code, item_cat1_name, item_cat2_code, item_cat2_name, item_cat3_code,item_cat3_name,
item_cat3_id, loc_id, province_code, province_name, city_code, city_name, county_code, county_name,
status, gmt_create, gmt_modified, dt 
FROM dwd_order;

任务提交之后,我们可以看到打宽后的订单数据持续地被写入 RDS 数据表中。上层应用可以基于此进行查询、统计和分析。查询 RDS MySQL 中的成品数据如下:

2.5 使用 Spark 进行数据查询

使用 Amazon EMR Flink 创建的 Paimon 表,可以使用其中的 Spark、Hive 组件或者 Athena Spark 来进行查询,并且 Spark 3.3+版本支持流读和流写。在 Amazon EMR 中使用 PySpark 查询 Paimon 表和数据命令如下:

pyspark --jars s3://path/of/paimon-spark-3.4.jar,s3://path/of/paimon-hive-connector-common.jar \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=s3://path/of/warehouse/

执行后,即可执行 PySpark 代码查询 Paimon 数据信息,如下图所示,Spark 亦可以直接读取打宽后的 Paimon 表数据。

3 总结

本文介绍了使用 Amazon EMR 和 Apache Paimon 在 Amazon S3 上构建流式数据湖的过程和方法。Paimon 作为开源数据湖组件的后起之秀,其在构建流式数据湖方面具有独特的功能优势,并且能和 Amazon EMR 中的 Flink、Spark、Hive 组件,Athena Spark 以及 Amazon S3 等很好地协同工作。使用 Amazon EMR 构建基于 Apache Paimon 的流式数据湖是一个不错的选择。

4 参考资料

本篇作者

程亮

AWS 解决方案架构师,负责基于 AWS 云平台的解决方案咨询和设计。有多年的互联网软件研发、系统架构设计及大数据产品开发经验。