亚马逊AWS官方博客

基于 AWS S3、EMR Flink、Presto 和 Hudi 的实时数据湖仓 – 使用 EMR 迁移 CDH

前言

当前很多企业已经完成了大量业务系统的数字化建设,可以从业务面带给用户良好的体验。伴随着大量系统的线上化、业务范畴的不断拓展,企业会产生海量的业务或行为数据。数据正越来越成为企业的核心资产,与此同时,实时数据的注入、处理、存储和分析等对企业数据团队的挑战也与日俱增。

客户场景

客户几年前基于AWS EC2 搭建了开源社区版的CDH集群,用于业务数据的实时分析。不过随着业务的发展,客户的数据团队面临如下挑战:

第一,成本优化:数据量日益庞大,系统未实现存算分离,会面临计算资源浪费、成本增加的问题。

第二,系统演进:系统基于CDH开源版本,组件版本低,无法继续升级。

第三,系统稳定:系统基于EC2自建,维护成本高,稳定性较差。

因此,客户的数据团队对AWS有如下期望和需求:

  1. 业务诉求:
  • 第一,实时性:生产系统数据5秒内摄入数据湖;考虑到未来需求,单表100亿数据,5秒内查询到结果。查询不涉及到多表的Join,仅涉及单表的明细和聚合查询。
  • 第二,即席查询:开放查询界面给业务人员,方便进行即席查询。
  1. 技术诉求:
  • 第一,系统演进:系统可高效、稳定升级,紧跟云厂商、开源社区最佳实践,快速满足业务需求。
  • 第二,系统稳定:系统依托云托管服务,实现低维护成本并带来高稳定性。

通过分析客户的业务场景、技术背景和相关诉求,建议客户从自建CDH迁移到AWS S3、EMR、Presto等服务组成的AWS智能湖仓架构。

方案架构

客户生产系统部署于阿里云,数据流向如下图所示。有以下几个关注点:

  1. 数据注入采用Flink CDC,摒弃了诸如Debezium/Canal + Kafka的方式。主要原因是减少Kafka(MSK)的运维和费用,采用一套Flink技术栈完成数据的注入和处理。
  2. EMR Flink集群将处理后的数据分别放置于OpenSearch集群和S3。前者存储热点数据并用于报表的明细查询,OpenSearch集群可以提供极致性能;后者使用Hudi进行全量数据的存储,用于后续的数据查询。
  3. 数据展示阶段:Athena提供数据分析师即席查询能力;EMR Presto和OpenSearch提供前端页面的报表查询。通过OpenSearch JDBC Driver,实现两者的SQL和代码统一。

系统构建

数据摄入层

搭建EMR集群

硬件层面,数据摄入层和数据处理层是同一个EMR Flink集群。

创建EMR 6.4.0 Flink集群,选择Flink、Hive等组件,并开启hive的metadata支持。

创建Flink Session

例子中集群采用m5.xlarge机型,不同机型配置会有不同。

checkpoints=s3://xxxxxxxx/flink/checkpoints/

nohup flink-yarn-session  -jm 3096 -tm 5096 -s 4 \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=5000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.checkpoints-after-tasks-finish.enabled=true \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-D parallelism.default=1 \
-D akka.ask.timeout=240s \
-d \
-t /etc/hive/conf/hive-site.xml &

创建MySQL源表

例子中源数据表存储于MySQL,使用Flink Table API。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

String ddlMySQLSourceTable =
    "CREATE TABLE source_mysql_source_table (\n" +
    " id               VARCHAR(32),\n" +
    " chain_id         VARCHAR(50),\n" +
    " name             VARCHAR(50),\n" +
    " status           TINYINT,\n" +
    " created_by       VARCHAR(50),\n" +
    " created          TIMESTAMP(3),\n" +
    " last_modified_by VARCHAR(50),\n" +
    " last_modified    TIMESTAMP(3),\n" +
    " is_deleted       TINYINT,\n" +
    " parent_id        VARCHAR(32),\n" +
    " level            INT,\n" +
    " PRIMARY KEY (id) NOT ENFORCED\n" +
    " ) WITH (\n" +
    "     'connector' = 'mysql-cdc',\n" +
    "     'hostname' = 'mysqlHost',\n" +
    "     'port' = 'mysqlPort',\n" +
    "     'username' = 'mysqlUser',\n" +
    "     'password' = 'mysqlPass',\n" +
    "     'database-name' = 'mysql_database',\n" +
    "     'table-name' = 'mysql_table',\n" +
    "     'scan.incremental.snapshot.enabled' = 'true',\n" +
    "     'scan.startup.mode' = 'initial'\n" +
    " )";

tEnv.executeSql(ddlMySQLSourceTable);

数据处理和存储层

使用Flink + Hudi构建ODS层或DWD层,将数据存储到S3 Hudi和OpenSearch中。

搭建OpenSearch

版本选择opensearch 1.3,3个节点,单个节点部署于单个可用区以实现高可用。

创建Hudi目标表

例子中目的数据表存储于S3 Hudi(Copy On Write),使用Flink Table API。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

String ddlHudiSinkTable =
    "CREATE TABLE sink_hudi_ods_table (\n" +
    " id               VARCHAR(32),\n" +
    " chain_id         VARCHAR(50),\n" +
    " name             VARCHAR(50),\n" +
    " status           TINYINT,\n" +
    " created_by       VARCHAR(50),\n" +
    " created          TIMESTAMP(3),\n" +
    " last_modified_by VARCHAR(50),\n" +
    " last_modified    TIMESTAMP(3),\n" +
    " is_deleted       TINYINT,\n" +
    " parent_id        VARCHAR(32),\n" +
    " level            INT,\n" +
    " ts               STRING,\n" +
    " PRIMARY KEY (id) NOT ENFORCED\n" +
    " ) " + "PARTITIONED BY (ts) \n" +
    " WITH (\n" +
    "     'connector' = 'hudi',\n" +
    "     'write.tasks' = '1',\n"+
    "     'path' = 's3://xxx/hudi-tables/xxx/',\n" +
    "     'table.type' = 'COPY_ON_WRITE',\n" +
    "     'write.bucket_assign.tasks' = '1',\n" +
    "     'write.parquet.max.file.size' = '512',\n" +
    "     'hoodie.parquet.small.file.limit' = '134217728',\n" +
    "     'write.operation' = 'upsert',\n" +
    "     'changelog.enabled' = 'true',\n" +
    "     'hive_sync.enable' = 'true',\n" +
    "     'hive_sync.db' = 'database',\n" +
    "     'hive_sync.table' = 'table',\n" +
    "     'hive_sync.mode' = 'HMS',\n" +
    "     'hive_sync.use_jdbc' = 'false',\n" +
    "     'hive_sync.username' = 'hadoop'\n" +
    " )";
tEnv.executeSql(ddlSinkV2PatientSourceType);

创建OpenSearch目标表

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

String ddlAOSSinkTable =
    "CREATE TABLE sink_aos_ods_table (\n" +
    " id               VARCHAR(32),\n" +
    " chain_id         VARCHAR(50),\n" +
    " name             VARCHAR(50),\n" +
    " status           TINYINT,\n" +
    " created_by       VARCHAR(50),\n" +
    " created          TIMESTAMP(3),\n" +
    " last_modified_by VARCHAR(50),\n" +
    " last_modified    TIMESTAMP(3),\n" +
    " is_deleted       TINYINT,\n" +
    " parent_id        VARCHAR(32),\n" +
    " level            INT,\n" +
    " PRIMARY KEY (id) NOT ENFORCED\n" +
    ") WITH (\n" +
    "    'connector' = 'elasticsearch-7',\n" +
    "    'hosts' = 'opensearchHost',\n" +
    "    'index' = 'table_index_{created|yyyy-MM}',\n" +
    "    'username' = 'opensearchUser',\n" +
    "    'password' = 'opensearchPass',\n" +
    "    'format' = 'json'\n" +
    ")";

tEnv.executeSql(ddlAOSSinkTable);

数据存储层

数据源只有MySQL,而Sink的目的地有两个:Hudi和OpenSearch。前者用于聚合查询,后者用于明细查询。

// Hudi Sink
StatementSet statementSet = tEnv.createStatementSet();
String odsSinkHudi =
	"INSERT INTO sink_hudi_ods_table SELECT smst.*, DATE_FORMAT(smst.created, 'yyyy-MM-dd') FROM source_mysql_source_table AS smst\n";
statementSet.addInsertSql(odsSinkHudi);
statementSet.execute();

// OpenSearch Sink
StatementSet statementSet = tEnv.createStatementSet();
String odsSinkAOS =
	"INSERT INTO sink_hudi_ods_table SELECT sink_aos_ods_table\n";
statementSet.addInsertSql(odsSinkAOS);
statementSet.execute();

元数据管理

EMR Flink会结合AWS Glue服务,所有S3 Hudi表的元数据均存储于Glue Catalog,可以方便用户进行元数据的管理,包括查询、更改、删除等。

数据展示层

数据展示层,可以通过OpenSearch Dashboard和Presto Cli进行页面交互查询;同时提供SDK(JDBC Driver)实现前后端联动查询。

OpenSearch Dashboard查询

Presto Cli查询

总结

很多公司的数字化转型已经完成了线上系统的建设,下半场的转型集中于数据的应用。真正使数据产生价值,我们需要在产品和服务方面精益求精,具备优异的架构支持和数据底座;除此之外,我们还可以跟合作伙伴进行更多合作尝试,为用户提供基于亚马逊云科技数据底座的数据应用,从业务层面给用户带来更多价值。

本篇作者

高郁

亚马逊云科技解决方案架构师,主要负责企业客户上云,帮助客户进行云架构设计和技术咨询,专注于智能湖仓、AI/ML等技术方向。