亚马逊AWS官方博客
多库多表场景下使用Amazon EMR CDC实时入湖最佳实践
一、前言
CDC(Change Data Capture)从广义上讲所有能够捕获变更数据的技术都可以称为CDC,但本篇文章中对CDC的定义限定为以非侵入的方式实时捕获数据库的变更数据。例如:通过解析MySQL数据库的Binlog日志捕获变更数据,而不是通过SQL Query源表捕获变更数据。 Hudi 作为最热的数据湖技术框架之一, 用于构建具有增量数据处理管道的流式数据湖。其核心的能力包括对象存储上数据行级别的快速更新和删除,增量查询(Incremental queries,Time Travel),小文件管理和查询优化(Clustering,Compactions,Built-in metadata),ACID和并发写支持。Hudi不是一个Server,它本身不存储数据,也不是计算引擎,不提供计算能力。其数据存储在S3(也支持其它对象存储和HDFS),Hudi来决定数据以什么格式存储在S3(Parquet,Avro,…), 什么方式组织数据能让实时摄入的同时支持更新,删除,ACID等特性。Hudi通过Spark,Flink计算引擎提供数据写入, 计算能力,同时也提供与OLAP引擎集成的能力,使OLAP引擎能够查询Hudi表。从使用上看Hudi就是一个JAR包,启动Spark, Flink作业的时候带上这个JAR包即可。Amazon EMR 上的Spark,Flink,Presto ,Trino原生集成Hudi, 且EMR的Runtime在Spark,Presto引擎上相比开源有2倍以上的性能提升。 在多库多表的场景下(比如:百级别库表),当我们需要将数据库(mysql,postgres,sqlserver,oracle,mongodb等)中的数据通过CDC的方式以分钟级别(1minute+)延迟写入Hudi,并以增量查询的方式构建数仓层次,对数据进行实时高效的查询分析时。我们要解决三个问题,第一,如何使用统一的代码完成百级别库表CDC数据并行写入Hudi,降低开发维护成本。第二,源端Schema变更如何同步到Hudi表。第三,使用Hudi增量查询构建数仓层次比如ODS->DWD->DWS(各层均是Hudi表),DWS层的增量聚合如何实现。本篇文章推荐的方案是: 使用Flink CDC DataStream API(非SQL)先将CDC数据写入Kafka,而不是直接通过Flink SQL写入到Hudi表,主要原因如下,第一,在多库表且Schema不同的场景下,使用SQL的方式会在源端建立多个CDC同步线程,对源端造成压力,影响同步性能。第二,没有MSK做CDC数据上下游的解耦和数据缓冲层,下游的多端消费和数据回溯比较困难。CDC数据写入到MSK后,推荐使用Spark Structured Streaming DataFrame API或者Flink StatementSet 封装多库表的写入逻辑,但如果需要源端Schema变更自动同步到Hudi表,使用Spark Structured Streaming DataFrame API实现更为简单,使用Flink则需要基于HoodieFlinkStreamer做额外的开发。Hudi增量ETL在DWS层需要数据聚合的场景的下,可以通过Flink Streaming Read将Hudi作为一个无界流,通过Flink计算引擎完成数据实时聚合计算写入到Hudi表。
二、架构设计与解析
2.1 CDC数据实时写入MSK
图中标号1,2是将数据库中的数据通过CDC方式实时发送到MSK(Amazon托管的Kafka服务)。flink-cdc-connectors是当前比较流行的CDC开源工具。它内嵌debezium引擎,支持多种数据源,对于MySQL支持Batch阶段(全量同步阶段)并行,无锁,Checkpoint(可以从失败位置恢复,无需重新读取,对大表友好)。支持Flink SQL API和DataStream API,这里需要注意的是如果使用SQL API对于库中的每张表都会单独创建一个链接,独立的线程去执行binlog dump。如果需要同步的表比较多,会对源端产生较大的压力。在需要整库同步表非常多的场景下,应该使用DataStream API写代码的方式只建一个binlog dump同步所有需要的库表。另一种场景是如果只同步分库分表的数据,比如user表做了分库,分表,其表Schema都是一样的,Flink CDC的SQL API支持正则匹配多个库表,这时使用SQL API同步依然只会建立一个binlog dump线程。需要说明的是通过Flink CDC可以直接将数据Sink到Hudi, 中间无需MSK,但考虑到上下游的解耦,数据的回溯,多业务端消费,多表管理维护,依然建议CDC数据先到MSK,下游再从MSK接数据写入Hudi。
2.2 CDC工具对比
图中标号3,除了flink-cdc-connectors之外,DMS(Amazon Database Migration Services)是Amazon 托管的数据迁移服务,提供多种数据源(mysql,oracle,sqlserver,postgres,mongodb,documentdb等)的CDC支持,支持可视化的CDC任务配置,运行,管理,监控。因此可以选择DMS作为CDC的解析工具,DMS支持将MSK或者自建Kafka作为数据投递的目标,所以CDC实时同步到MSK通过DMS可以快速可视化配置管理。当然除了DMS之外还有很多开源的CDC工具,也可以完成CDC的同步工作,但需要在EC2上搭建相关服务。下图列出了CDC工具的对比项,供大家参考
2.3 Spark Structured Streaming多库表并行写Hudi及Schema变更
图中标号4,CDC数据到了MSK之后,可以通过Spark/Flink计算引擎消费数据写入到Hudi表,我们把这一层我们称之为ODS层。无论Spark还是Flink都可以做到数据ODS层的数据落地,使用哪一个我们需要综合考量,这里阐述一些相对重要的点。首先对于Spark引擎,我们一定是使用Spark Structured Streaming 消费MSK写入Hudi,由于可以使用DataFrame API写Hudi, 因此在Spark中可以方便的实现消费CDC Topic并根据其每条数据中的元信息字段(数据库名称,表名称等)在单作业内分流写入不同的Hudi表
,封装多表并行写入逻辑,一个Job即可实现整库多表同步的逻辑。样例代码截图如下,完整代码点击Github获取 我们知道CDC数据中是带着I(insert)、U(update)、D(delete)信息的, 不同的CDC工具数据格式不同,但要表达的含义是一致的。使用Spark写入Hudi我们主要关注U、D信息,数据带着U信息表示该条数据是一个更新操作,对于Hudi而言只要设定源表的主键为Hudi的recordKey,同时根据需求场景设定precombineKey即可。这里对precombineKey做一个说明,它表示的是当数据需要更新时(recordKey相同), 默认选择两条数据中precombineKey的大保留在Hudi中。其实Hudi有非常灵活的Payload机制,通过参数hoodie.datasource.write.payload.class
可以选择不同的Payload实现,比如Partial Update(部分字段更新)的Payload实现OverwriteNonDefaultsWithLatestAvroPayload,也可以自定义Payload实现类,它核心要做的就是如何根据precombineKey指定的字段更新数据。所以对于CDC数据Sink Hudi而言,我们需要保证上游的消息顺序,只要我们表中有能判断哪条数据是最新的数据的字段即可,那这个字段在MySQL中往往我们设计成数据更新时间modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
。如果没有类似字段,建议定义设计规范加上这个字段,否则就必须保证数据有序(这会给架构设计和性能带来更多的阻力),不然数据在Hudi中Updata的结果可能就是错的。对于带着D信息的数据,它表示这条数据在源端被删除,Hudi是提供删除能力的,其中一种方式是当一条数据中包含_hoodie_is_deleted
字段,且值为true是,Hudi会自动删除此条数据,这在Spark Structured Streaming 代码中很容易实现,只需在map操作实现添加一个字段且当数据中包含D信息设定字段值为true即可。
2.4 Flink StatementSet多库表CDC并行写Hudi
对于使用Flink引擎消费MSK中的CDC数据落地到ODS层Hudi表,如果想要在一个JOB实现整库多张表的同步,Flink StatementSet
来实现通过一个Kafka的CDC Source表,根据元信息选择库表Sink到Hudi中。但这里需要注意的是由于Flink和Hudi集成,是以SQL方式先创建表,再执行Insert语句写入到该表中的,如果需要同步的表有上百之多,封装一个自动化的逻辑能够减轻我们的工作,你会发现SQL方式写入Hudi虽然对于单表写入使用上很方便,不用编程只需要写SQL即可,但也带来了一些限制,由于写入Hudi时是通过SQL先建表,Schema在建表时已将定义,如果源端Schema变更,通过SQL方式是很难实现下游Hudi表Schema的自动变更的。虽然在Hudi的官网并未提供Flink DataStream API写入Hudi的例子,但Flink写入Hudi是可以通过HoodieFlinkStreamer
以DataStream API的方式实现,在Hudi源码中可以找到。因此如果想要更加灵活简单的实现多表的同步,以及Schema的自动变更,需要自行参照HoodieFlinkStreamer代码以DataStream API的方式写Hudi。对于I,U,D信息,Flink的debezium ,maxwell,canal format
会直接将消息解析 为Flink的changelog流,换句话说就是Flink会将I,U,D操作直接解析成Flink内部的数据结构RowData,直接Sink到Hudi表即可,我们同样需要在SQL中设定recordKey,precombineKey,也可以设定Payload class的不同实现类。
2.5 Flink Streaming Read模式读Hudi实现ODS层聚合
图中标号5,数据通过Spark/Flink落地到ODS层后,我们可能需要构建DWD和DWS层对数据做进一步的加工处理,(DWD和DWS并非必须的,根据你的场景而定,你可以直接让OLAP引擎查询ODS层的Hudi表)我们希望能够使用到Hudi的增量查询能力,只查询变更的数据来做后续DWD和DWS的ETL,这样能够加速构建同时减少资源消耗。对于Spark引擎,在DWD层如果仅仅是对数据做map,fliter等相关类型操作,是可以使用增量查询的,但如果DWD层的构建有Join操作,是无法通过增量查询实现的,只能全表(或者分区)扫描。DWS层的构建如果聚合类型的操作没有去重,窗口类型的操作,只是SUM, AVG,MIN, MAX等类型的操作,可以通过增量查询之后和目标表做Merge实现,反之,只能全表(或者分区)扫描。 对于Flink引擎来构建DWD和DWS, 由于Flink 支持Hudi表的streaming read, 在SQL设定read.streaming.enabled= true,changelog.enabled=true
等相关流式读取的参数即可。设定后Flink把Hudi表当做了一个无界的changelog流表,无论怎样做ETL都是支持的,Flink会自身存储状态信息,整个ETL的链路是流式的。
2.6 OLAP引擎查询Hudi表
图中标号6, EMR Hive/Presto/Trino 都可以查询Hudi表,但需要注意的是不同引擎对于查询的支持是不同的,参见官网,这些引擎对于Hudi表只能查询,不能写入。 关于Schema的自动变更,首先Hudi自身是支持Schema Evolution,我们想要做到源端Schema变更自动同步到Hudi表,通过上文的描述,可以知道如果使用Spark引擎,可以通过DataFrame API操作数据,通过from_json动态生成DataFrame,因此可以较为方便的实现自动添加列。如果使用Flink引擎上文已经说明想要自动实现Schema的变更,通过HoodieFlinkStreamer
以DataStream API的方式实现Hudi写入的同时融入Schema变更的逻辑。
三、EMR CDC整库同步Demo
接下的Demo操作中会选择RDS MySQL作为数据源,Flink CDC DataStream API 同步库中的所有表到Kafka,使用Spark引擎消费Kafka中binlog数据实现多表写入ODS层Hudi,使用Flink引擎以streaming read
的模式做DWD和DWS层的Hudi表构建。
3.1 环境信息
3.2 创建源表
在MySQL中创建test_db库及user,product,user_order
三张表,插入样例数据,后续CDC先加载表中已有的数据,之后源添加新数据并修改表结构添加新字段,验证Schema变更自动同步到Hudi表。
3.3 Flink CDC发送数据到Kafka
使用DataStream API编写CDC同步程序。样例代码Github