背景
大数据处理技术现今已广泛应用于各个行业,为业务解决海量存储和海量分析的需求。但数据量的爆发式增长,对数据处理能力提出了更大的挑战,同时对时效性也提出了更高的要求。业务通常已不再满足滞后的分析结果,希望看到更实时的数据,从而在第一时间做出判断和决策。典型的场景如电商大促和金融风控等,基于延迟数据的分析结果已经失去了价值。
如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析,成了企业构建大数据生态的一个重要方向。如何快速、一致、原子性地在数据湖存储上构建起 Data Pipeline,成了亟待解决的问题。
在构建数据湖的过程中遇到的一些痛点 Iceberg 恰好能解决:
- T+0 的数据落地和处理。传统的数据处理流程从数据入库到数据处理通常需要一个较长的环节、涉及许多复杂的逻辑来保证数据的一致性,由于架构的复杂性使得整个流水线具有明显的延迟。Iceberg 的 ACID 能力可以简化整个流水线的设计,降低整个流水线的延迟。
- 降低数据修正的成本。传统 Hive/Spark 在修正数据时需要将数据读取出来,修改后再写入,有极大的修正成本。Iceberg 所具有的修改、删除能力能够有效地降低开销,提升效率。
在今年亚马逊云大数据分析服务EMR6.5已经集成Iceberg,非常方便大家供快速进行大型表查询性能、原子提交、并发写入Amazon S3。
以用户行为分析为例,看看在亚马逊云上怎么快速搭建一个Iceberg准实时数仓
架构

架构说明
- 用户行为数据(ClickEvent)从前端手机或者Web页面提交到Amazon MSK
- 经过EMR-Flink清理数据和关联产品信息后存入Iceberg表
- 系统业务数据库用户和商品数据由于活动会产生变更,Amazon RDS DB开启binlog 通过EMR-FlinkCDC 同步到Iceberg 表
- 监听ClickEvent Iceberg table 实时计算集合数据
- 通过Spark读取统一HMS, 批量计算到结果表
- Trino ad-hoc 查询数据 Iceberg 流表数据
组件版本
组件 |
版本 |
备注 |
Java |
1.8 |
|
Scala |
2.12 |
|
Iceberg |
1.3.2 |
|
EMR |
6.6 |
|
Flink |
1.4.2 |
|
Spark |
3.2 |
|
Amazon RDS-Mysql |
8.0+ |
|
Trino |
367 |
|
环境搭建
首先下载demo工程github地址, 通过maven编译

工程说明:
gendata:动态产生测试数据input到MSK
flink-iceberg-demo: 实时消费MSK数据入湖
sql: mysql 建表ddl 和测试使用SQL
1. Amazon MSK 创建(参考)
创建一个集群名字为“click-stream” MSK集群

在config页面 Bootstrap servers endpoint 和 zookeeper connection

在一台EC2跳板机上通过kafka client 创建topic和调试
2.创建维表 Amazon RDS
a. 创建RDS并打开MySQL binlog配置

b. 创建Table&测试数据
测试数据
INSERT INTO products
VALUES ("1","test01","Small 2-wheel scooter" , 11.11, now()),
("2","test02","12V car battery",11.11,now()),
("3","test03","12-pack of drill bits with sizes ranging from #40 to #3",11.11,now()),
("4","test04","12oz carpenter's hammer",11.11,now()),
("5","test05","14oz carpenter's hammer",11.11,now()),
("6","test06","16oz carpenter's hammer",11.11,now()),
("7","test07","box of assorted rocks",11.11,now()),
("8","test08","water resistent black wind breaker",11.11,now()),
("9","test09","24 inch spare tire",11.11,now());
INSERT INTO user_member
VALUES ("513248","test01","label01" , 1, now()),
("10952","test02","label02",2,now()),
("555655","test03","label03",3,now()),
("795098","test04","label04",4,now()),
("603670","test05","label05",5,now());
3. 创建Amazon EMR
创建含有Flink,Spark等组件的EMR6.6 (使用Amazon EMR 6.6版本演示。启动 EMR 集群非常简单,这里不再赘述,可以参考亚马逊云科技官方文档)

a.EMR 启动配置
参数说明:
“hive-site”使用外接RDS作为Hive metadata, 配置JDBC连接
“iceberg-defaults” 开启EMR Iceberg 配置
“taskmanager.numberOfTaskSlots“ 配置taskmanager slot数量 (根据自己的集群机器设定)
b. 配置安全组
使其能访问RDS,在RDS安全组添加EMR master and slave权限

c.下载对应的第三方Jar(Flink-CDC, Kafka Flink connect 和Flink jdbc connect)
下载flink-sql-connector-mysql-cdc, flink-sql-connector-kafka ,flink-sql-connector-hive到 /usr/lib/flink/lib/
d.启动 flink yarn 集群
参数说明
创建iceberg data lake
1. 创建iceberg flink sql client 文件“start.sh”
启动含有iceberg runtime的flinksql client
2.创建user_member iceberg 表
业务系统user_member table 记录用户的会员等级和标签,由于它会根据业务的变化而变化,可以使用FlinkCDC实时同步的方式将数据同步到 iceberg table.
a. 创建iceberg catalog
在使用iceberg时候,必须创建一个iceberg catalog
参数说明:
“type”: 指名这个是一个Iceberg 的 catalog
“catalog-type”: 统一使用hive的metadata储存iceberg table schema
“warehouse” : iceberg catalog 需要指定一个S3路径存放数据,在S3创建一个bucket “s3://sg-emr-flink-iceberg/mywarehouse/“
b. 创建user_sinkiceberg,同步RDS数据
c. 创建Flink mysql-cdc table
d. 提交flink job
打开FlinkWebUI 检查job状态

在mysql client 提交 “update user_member set member_level =2 where userid = '555655'”
后,通过flinksql “select * from dim_db.user_member_sink;” 观察数据变化

3.Event数据入湖
a. Mock 前端数据通过MSK入湖
打开gendata 工程,配置MSK地址和Topic
打开项目并通过编译JAR上传到EC2 ,打包后上传到EC2 请使用JVM CMD执行
java -cp gendata-1.0-SNAPSHOT-jar-with-dependencies.jar com.demo.gendata.DataGen2 -c 100000 -s 10 -bootstrap {kafka_bootstrap}
参数说明:
-c 发送数据条数据
-s 每1000条数据 sleep毫秒数
b. 将MSK数据保存到iceberg table
编译打包flink-iceberg-demo,将jar包上传到EMR Master Node后,提交Flink job运行
这里需要将MSK数据和MySQL里的Products数据join 然后存储到iceberg table,Flink Temporary table join 可以帮助我们解决维表join 流表问题
public class Kafka2Iceberg {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setParallelism(5);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setInteger("table.exec.resource.default-parallelism", 5);
configuration.setBoolean("table.dynamic-table-options.enabled", true);
Properties properties = new Properties();
properties.setProperty("max.partition.fetch.bytes", "10485760");
properties.setProperty("request.timeout.ms", "120000");
properties.setProperty("session.timeout.ms", "60000");
properties.setProperty("heartbeat.interval.ms", "10000");
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage(FLINK_CHECKPOINT);
String kafkasourceTable = "CREATE TABLE IF NOT EXISTS default_database.kafka_table (\n" +
" `webpageId` int,\n" +
" `uid` STRING,\n" +
" `productId` STRING,\n" +
" `cookieId` STRING,\n" +
" `expendTime` int,\n" +
" `updateTime` BIGINT,\n" +
" `proctime` as PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
" `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(updateTime/1000, 'yyyy-MM-dd HH:mm:ss')) -- 事件时间\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '"+ DEFAULT_KAFKA_TOPIC +"',\n" +
" 'properties.bootstrap.servers' = '"+ BOOTSTRAP_SERVERS_CONFIG +"',\n" +
" 'properties.group.id' = 'test-group02',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
" )";
String productsTable = "CREATE TABLE IF NOT EXISTS default_database.products_jdbc (\n" +
" productId STRING PRIMARY KEY,\n" +
" name STRING,\n" +
" description STRING,\n" +
" product_price DECIMAL(10, 4),\n" +
" update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '" + MYSQL_URL + "',\n" +
" 'table-name' = 'products',\n" +
" 'username' = '"+ MYSQL_USER + "',\n" +
" 'password' = '"+ MYSQL_PSW +"',\n" +
" 'scan.fetch-size' = '100',\n" +
" 'lookup.cache.max-rows' = '5000',\n" +
" 'lookup.cache.ttl' = '10s',\n" +
" 'lookup.max-retries' = '3'\n" +
" )" ;
String flinkCatalogSQL = "create catalog iceberg_hive_catalog with(\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive',\n" +
" 'clients'='5',\n" +
" 'property-version'='1',\n" +
" 'hive-conf-dir'='/usr/lib/hive/conf',\n" +
" 'warehouse'='s3://sg-emr-flink-iceberg/mywarehouse/'\n" +
")";
String clickEventTable = "create table IF NOT EXISTS ods_behavior.clickevent_v5(\n" +
" `webpageId` int,\n" +
" `uid` STRING,\n" +
" `productId` STRING,\n" +
" `cookieId` STRING,\n" +
" `expendTime` int,\n" +
" `updateTime` TIMESTAMP(3),\n" +
" `name` STRING,\n" +
" `product_price` DECIMAL(10, 4),\n" +
" `dt` STRING,\n" +
" `eventTime` TIMESTAMP(3)\n" +
") PARTITIONED BY (dt) with(\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive',\n" +
" 'write.metadata.delete-after-commit.enabled'='true',\n" +
" 'write.metadata.previous-versions-max'='5',\n" +
" 'sink.parallelism' = '5',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file', \n" +
" 'warehouse'='s3://sg-emr-flink-iceberg/mywarehouse/',\n" +
" 'write.upsert.enable'='true',\n" +
" 'format-version'='2'\n" +
")";
String insertETL = "insert into ods_behavior.clickevent_v5(\n" +
" webpageId,\n" +
" uid,\n" +
" productId,\n" +
" cookieId,\n" +
" expendTime,\n" +
" updateTime,\n" +
" name,\n" +
" product_price,\n" +
" eventTime,\n" +
" dt\n" +
") select\n" +
" aa.webpageId,\n" +
" aa.uid,\n" +
" aa.productId,\n" +
" aa.cookieId,\n" +
" aa.expendTime,\n" +
" aa.updateTime,\n" +
" bb.name,\n" +
" bb.product_price,\n" +
" aa.eventTime,\n" +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')\n" +
"from\n" +
" default_catalog.default_database.kafka_table AS aa\n" +
" left join default_catalog.default_database.products_jdbc FOR SYSTEM_TIME AS OF aa.proctime AS bb\n" +
" on aa.productId = bb.productId";
tableEnv.executeSql(kafkasourceTable);
tableEnv.executeSql(productsTable);
tableEnv.executeSql(flinkCatalogSQL);
tableEnv.useCatalog("iceberg_hive_catalog");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods_behavior");
tableEnv.executeSql(clickEventTable);
tableEnv.executeSql(insertETL);
}
}
修改myql jdbc,msk 地址和topic后,Flink CLI 提交job
检查Flink web UI

c. 使用FlinkSQL 实时统计
使用iceberg stream table统计产品客户浏览
实时发送mock data,可以看见Flink实时统计数据在变化

当更新 products table后,统计数据也发生变化

d. Spark批处理数据
Spark可以和Flink 共享Hive metadata,批流都可以通过同一套schema管理
这里使用EMR studio 提交测试脚本
大家可以查看具体的notebook

e. Trino ad-hoc 查询数据
配置trino iceberg connect
将如下配置加入 iceberg.properties
执行

总结
Apache Iceberg 与 Amazon S3,EMR集成, 适用于大型数据集的开放表格式,提供快速的大型表查询性能、原子提交、并发写入和 SQL等功能,能快速构建一个准实时数仓。
参考:
Iceberg 官网:https://iceberg.apache.org/
Flink Temporal Joins:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/
亚马逊云科技Iceberg 文档:https://docs.aws.amazon.com/zh_cn/emr/latest/ReleaseGuide/emr-iceberg.html
本篇作者