前言
当前很多企业已经完成了大量业务系统的数字化建设,可以从业务面带给用户良好的体验。伴随着大量系统的线上化、业务范畴的不断拓展,企业会产生海量的业务或行为数据。数据正越来越成为企业的核心资产,与此同时,实时数据的注入、处理、存储和分析等对企业数据团队的挑战也与日俱增。
客户场景
客户几年前基于AWS EC2 搭建了开源社区版的CDH集群,用于业务数据的实时分析。不过随着业务的发展,客户的数据团队面临如下挑战:
第一,成本优化:数据量日益庞大,系统未实现存算分离,会面临计算资源浪费、成本增加的问题。
第二,系统演进:系统基于CDH开源版本,组件版本低,无法继续升级。
第三,系统稳定:系统基于EC2自建,维护成本高,稳定性较差。
因此,客户的数据团队对AWS有如下期望和需求:
- 业务诉求:
- 第一,实时性:生产系统数据5秒内摄入数据湖;考虑到未来需求,单表100亿数据,5秒内查询到结果。查询不涉及到多表的Join,仅涉及单表的明细和聚合查询。
- 第二,即席查询:开放查询界面给业务人员,方便进行即席查询。
- 技术诉求:
- 第一,系统演进:系统可高效、稳定升级,紧跟云厂商、开源社区最佳实践,快速满足业务需求。
- 第二,系统稳定:系统依托云托管服务,实现低维护成本并带来高稳定性。
通过分析客户的业务场景、技术背景和相关诉求,建议客户从自建CDH迁移到AWS S3、EMR、Presto等服务组成的AWS智能湖仓架构。
方案架构
客户生产系统部署于阿里云,数据流向如下图所示。有以下几个关注点:
- 数据注入采用Flink CDC,摒弃了诸如Debezium/Canal + Kafka的方式。主要原因是减少Kafka(MSK)的运维和费用,采用一套Flink技术栈完成数据的注入和处理。
- EMR Flink集群将处理后的数据分别放置于OpenSearch集群和S3。前者存储热点数据并用于报表的明细查询,OpenSearch集群可以提供极致性能;后者使用Hudi进行全量数据的存储,用于后续的数据查询。
- 数据展示阶段:Athena提供数据分析师即席查询能力;EMR Presto和OpenSearch提供前端页面的报表查询。通过OpenSearch JDBC Driver,实现两者的SQL和代码统一。
系统构建
数据摄入层
搭建EMR集群
硬件层面,数据摄入层和数据处理层是同一个EMR Flink集群。
创建EMR 6.4.0 Flink集群,选择Flink、Hive等组件,并开启hive的metadata支持。
创建Flink Session
例子中集群采用m5.xlarge机型,不同机型配置会有不同。
checkpoints=s3://xxxxxxxx/flink/checkpoints/
nohup flink-yarn-session -jm 3096 -tm 5096 -s 4 \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=5000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.checkpoints-after-tasks-finish.enabled=true \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-D parallelism.default=1 \
-D akka.ask.timeout=240s \
-d \
-t /etc/hive/conf/hive-site.xml &
创建MySQL源表
例子中源数据表存储于MySQL,使用Flink Table API。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
String ddlMySQLSourceTable =
"CREATE TABLE source_mysql_source_table (\n" +
" id VARCHAR(32),\n" +
" chain_id VARCHAR(50),\n" +
" name VARCHAR(50),\n" +
" status TINYINT,\n" +
" created_by VARCHAR(50),\n" +
" created TIMESTAMP(3),\n" +
" last_modified_by VARCHAR(50),\n" +
" last_modified TIMESTAMP(3),\n" +
" is_deleted TINYINT,\n" +
" parent_id VARCHAR(32),\n" +
" level INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'mysqlHost',\n" +
" 'port' = 'mysqlPort',\n" +
" 'username' = 'mysqlUser',\n" +
" 'password' = 'mysqlPass',\n" +
" 'database-name' = 'mysql_database',\n" +
" 'table-name' = 'mysql_table',\n" +
" 'scan.incremental.snapshot.enabled' = 'true',\n" +
" 'scan.startup.mode' = 'initial'\n" +
" )";
tEnv.executeSql(ddlMySQLSourceTable);
数据处理和存储层
使用Flink + Hudi构建ODS层或DWD层,将数据存储到S3 Hudi和OpenSearch中。
搭建OpenSearch
版本选择opensearch 1.3,3个节点,单个节点部署于单个可用区以实现高可用。
创建Hudi目标表
例子中目的数据表存储于S3 Hudi(Copy On Write),使用Flink Table API。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
String ddlHudiSinkTable =
"CREATE TABLE sink_hudi_ods_table (\n" +
" id VARCHAR(32),\n" +
" chain_id VARCHAR(50),\n" +
" name VARCHAR(50),\n" +
" status TINYINT,\n" +
" created_by VARCHAR(50),\n" +
" created TIMESTAMP(3),\n" +
" last_modified_by VARCHAR(50),\n" +
" last_modified TIMESTAMP(3),\n" +
" is_deleted TINYINT,\n" +
" parent_id VARCHAR(32),\n" +
" level INT,\n" +
" ts STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
" ) " + "PARTITIONED BY (ts) \n" +
" WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'write.tasks' = '1',\n"+
" 'path' = 's3://xxx/hudi-tables/xxx/',\n" +
" 'table.type' = 'COPY_ON_WRITE',\n" +
" 'write.bucket_assign.tasks' = '1',\n" +
" 'write.parquet.max.file.size' = '512',\n" +
" 'hoodie.parquet.small.file.limit' = '134217728',\n" +
" 'write.operation' = 'upsert',\n" +
" 'changelog.enabled' = 'true',\n" +
" 'hive_sync.enable' = 'true',\n" +
" 'hive_sync.db' = 'database',\n" +
" 'hive_sync.table' = 'table',\n" +
" 'hive_sync.mode' = 'HMS',\n" +
" 'hive_sync.use_jdbc' = 'false',\n" +
" 'hive_sync.username' = 'hadoop'\n" +
" )";
tEnv.executeSql(ddlSinkV2PatientSourceType);
创建OpenSearch目标表
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
String ddlAOSSinkTable =
"CREATE TABLE sink_aos_ods_table (\n" +
" id VARCHAR(32),\n" +
" chain_id VARCHAR(50),\n" +
" name VARCHAR(50),\n" +
" status TINYINT,\n" +
" created_by VARCHAR(50),\n" +
" created TIMESTAMP(3),\n" +
" last_modified_by VARCHAR(50),\n" +
" last_modified TIMESTAMP(3),\n" +
" is_deleted TINYINT,\n" +
" parent_id VARCHAR(32),\n" +
" level INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'opensearchHost',\n" +
" 'index' = 'table_index_{created|yyyy-MM}',\n" +
" 'username' = 'opensearchUser',\n" +
" 'password' = 'opensearchPass',\n" +
" 'format' = 'json'\n" +
")";
tEnv.executeSql(ddlAOSSinkTable);
数据存储层
数据源只有MySQL,而Sink的目的地有两个:Hudi和OpenSearch。前者用于聚合查询,后者用于明细查询。
// Hudi Sink
StatementSet statementSet = tEnv.createStatementSet();
String odsSinkHudi =
"INSERT INTO sink_hudi_ods_table SELECT smst.*, DATE_FORMAT(smst.created, 'yyyy-MM-dd') FROM source_mysql_source_table AS smst\n";
statementSet.addInsertSql(odsSinkHudi);
statementSet.execute();
// OpenSearch Sink
StatementSet statementSet = tEnv.createStatementSet();
String odsSinkAOS =
"INSERT INTO sink_hudi_ods_table SELECT sink_aos_ods_table\n";
statementSet.addInsertSql(odsSinkAOS);
statementSet.execute();
元数据管理
EMR Flink会结合AWS Glue服务,所有S3 Hudi表的元数据均存储于Glue Catalog,可以方便用户进行元数据的管理,包括查询、更改、删除等。
数据展示层
数据展示层,可以通过OpenSearch Dashboard和Presto Cli进行页面交互查询;同时提供SDK(JDBC Driver)实现前后端联动查询。
OpenSearch Dashboard查询
Presto Cli查询
总结
很多公司的数字化转型已经完成了线上系统的建设,下半场的转型集中于数据的应用。真正使数据产生价值,我们需要在产品和服务方面精益求精,具备优异的架构支持和数据底座;除此之外,我们还可以跟合作伙伴进行更多合作尝试,为用户提供基于亚马逊云科技数据底座的数据应用,从业务层面给用户带来更多价值。
本篇作者