亚马逊AWS官方博客

使用亚马逊云科技服务同步数据到 Amazon Redshift 的方案与实践

前言

Amazon Redshift 是亚马逊云科技的云数仓产品,相比于其他云数仓产品,具有很好的性价比优势,且产品功能丰富,用户可以使用 SQL 对来自于业务数据库、数据湖中的结构化和半结构化数据进行分析,被广泛用来构建企业内部数据仓库。企业内部需要分析的数据主要存在于关系数据库和 NoSQL 类型的数据库中,要使用 Amazon Redshift 对这些数据进行分析,首先需要将这些业务数据库中的数据同步到数仓中。本文将介绍使用亚马逊云科技服务同步数据到 Amazon Redshift 的方案以及相关实践。

数据同步方案

使用亚马逊云科技服务同步数据到 Amazon Redshift 主要有以下两种方案。

方案一

通过 AWS Database Migration Service(DMS)服务将业务数据库中的数据同步到 Amazon Redshift 中。如下图所示:

该方案的优点是:

  • 支持多种数据源,常见的各种关系数据库(如 MySQL,PostgreSQL,SQL Server 等)和 NoSQL 类型的数据库(如 MongoDB,Cassandra 等)都能很好的支持;
  • 支持多种同步模式,包括全量数据同步、全量加增量同步、仅增量同步模式,满足不同的数据同步需求;
  • 架构简单,只需要进行简单的配置就可以快速实现业务数据同步到 Amazon Redshift,用户不需要编写数据同步代码。

由于 Amazon Redshift 是专门针对大数据量聚合分析查询优化的 OLAP 类型数仓产品,而业务数据库基本都是实时更新的 OLTP 型数据库,Amazon Redshift 在适用场景以及功能特性方面与源库存在较大差异。在使用 AWS DMS 同步数据到 Amazon Redshift 时,为了更有效、更流畅地进行数据同步,需要重点注意以下几点:

  • 分析源库中表的数据量和更新频率,采用单独的数据同步任务来同步大数据量或数据更新频率高的表,以避免小数据量或更新频率比较低的表数据同步延迟增大;
  • 在全量数据同步阶段,对于大数据量的表,可以设置数据表的并行加载规则,并适当调整 ParallelLoadTreads 参数,以获得更好的数据同步效率;
  • 在增量数据同步阶段,DMS 会默认采用“Batch Apply” 模式来应用变更数据,以提升数据同步效率。根据数据变更频率,可以适当调整 ParallelApplyThreads,ParallelApplyBufferSize,BatchApplyTimeoutMin,BatchApplyTimeoutMax 等参数值,可以提升变更数据应用的效率。具体的配置可以参考此文档
  • 源表 schema 变更通常是造成数据同步任务终止的原因之一,需要做好同步任务的测试与监控;
  • 当采用“Document Mode” 同步 MongoDB / Amazon DocumentDB 中的 Collection 时,会将 Collection 中的整条记录存入到 VARCHAR 字段中,而 Amazon Redshift 中 VARCHAR 字段长度上限为 64KB。当记录长度超过 64KB 时,会导致数据同步异常。

该方案可以快速实现同步源表中的数据到 Amazon Redshift 中,但如果我们希望对整个数据同步过程进行进一步的控制,比如更好地支持上游数据表 schema 的变更、使用 Amazon Redshift 的 SUPER 字段来存储半结构化的长文本字段、希望将源表的数据变更给下游的多个应用使用等,那我们就需要用到方案二。

方案二

在本方案中,整个数据同步过程由两部分组成。首先将源库(表)的全量或者增量数据以某种格式同步到 Amazon MSK 中,随后通过消费 Kafka 中的数据,将这些全量和增量的数据应用到 Amazon Redshift 中。如下图所示:

第一部分将源库(表)的数据同步到 Amazon MSK 在亚马逊云科技中有以下实现方式:

  • 使用 Flink CDC 将源库(表)中的数据同步到 Amazon MSK,需要编写 Flink CDC 程序,并通过 Amazon EMR 或者 Amazon Managed Service for Apache Flink(原服务名为 Amazon KDA)来运行。这种方式可最大程度定制数据同步过程,如可实现按不同的源库同步到不同的 topic,数据 partition 路由设置,甚至是对数据同步内容进行定制等。这种方式数据会以 debezium 格式同步到 Amazon MSK;
  • 使用 Amazon MSK Connect 创建自定义的 Connector plugin,将不同的源库(表)同步到 Amazon MSK,不需要编写代码,只需进行配置。不同的 Connector plugin 有不同功能特性和配置方式,可以使用 debezium 相关的 connector 来进行数据同步;
  • 使用 AWS DMS 同步源库(表)数据到 Amazon MSK,不需要编写代码,只需要进行配置。这种方式使用 Amazon MSK 作为 AWS DMS 的目标,默认多个源表同步到一个 topic 中,AWS DMS 3.4.6 及以后的版本支持多个源表同步到不同的数据同步的格式见此文档

方案的第二部分通过消费同步到 Amazon MSK 中的全量和增量数据,将这些数据解析、处理后,写入到 Amazon Redshift 中。这部分的实现方式有:

  • 使用 Amazon Glue 或者 Amazon EMR 执行 Spark Structured Streaming 应用程序写入数据。Amazon Glue 4.0 和 Amazon EMR 6.9.0 及以上版本提供了原生的 Spark Connector for Amazon Redshift,可以方便、高效地往 Amazon Redshift 读取/写入数据。在这部分,可以根据源库的实际情况,充分利用 Spark Structured Streaming 的灵活性,来实现源表 schema 变更的处理、使用 Amazon Redshift SUPER 字段来存储半结构化长文本字段等操作。

方案二在架构上相较于方案一稍微复杂,过程相对繁琐,但带来的好处也是很明显的,就是整个过程完全自主可控,可以按需进行定制化处理。

接下来我们将以 Amazon DocumentDB 作为源库为例,来实际验证上述两种方案。

数据同步实践

在实际操作之前,  需要确保在亚马逊云科技账号中已经创建 Amazon DocumentDB 数据库实例以及 Amazon Redshift 集群。

方案一验证

主要操作过程包括以下步骤:

  1. 创建 AWS DMS 源端点和目标端点
  2. 创建 AWS DMS 复制实例
  3. 创建 AWS DMS 复制任务
  4. 运行AWS DMS 复制任务

各步骤操作与配置如下。

创建 AWSDMS 源端点和目标端点

创建源端点和目标端点目的是用来记录源库和目标库的连接及配置信息,被复制实例用来创建源库和目标库连接。

1)创建源端点

在创建 DocumentDB 源端点时,需要先创建连接用户,如果要进行增量数据同步,还需开启 DocumentDB 的 change stream,具体操作见文档说明,此处略过。进入 AWS DMS 控制台,开始创建源端点,端点类型选择“源端点”,填入源端点名称,源引擎选择 Amazon DocumentDB。

选择“手动提供访问信息”,并填入 Amazon DocumentDB 相关连接信息和要同步的数据库名称。“元数据模式” 选择“文档”模式,此模式会将整个 JSON 文档同步到一个字段中,对于 JSON 文档格式不定,存在嵌套结构的情况具有很好的适应性。选用此模式时,AWS DMS 会以 Amazon Redshift VARCHAR 字段类型来存储 JSON 文档,注意文档大小不要超过 64KB。

2)创建目标端点

AWS DMS 连接 Amazon Redshift 时,需要数据库用户名、密码以及相应的权限才能正常连接并将数据写入,我们通过 Redshift Query Editor v2 创建目标数据库、Schema、数据库用户以及赋予相应的权限,SQL 如下:

CREATE DATABASE ods;
CREATE SCHEMA chats;
CREATE USER dms_user WITH PASSWORD 'Your Password';
GRANT ALL ON ALL TABLES IN SCHEMA chats TO dms_user;
GRANT ALL ON DATABASE ods to dms_user;
GRANT ALL ON SCHEMA chats to dms_user;

在 AWS DMS 控制台创建端点,端点类型选择“目标端点”。填入目标端点名称,并选择“Amazon Redshift” 作为目标引擎。

选择 “手动提供访问信息”,填入 Amazon Redshift 集群连接信息、用户名、密码以及目标数据库。选择“创建端点” 完成目标端点的创建。

通过 AWS DMS Web 控制台创建的 Amazon Redshift 端点,AWS DMS 将自动创建一个名为 dms-access-for-endpoint 的 IAM 角色,通过该角色来创建 Amazon S3 桶,并调用 Copy 命令将此 S3 桶的数据应用到 Amazon Redshift 中。为此还需为此前创建的数据库用户授予 ASSUMEROLE 的权限,SQL 如下:

GRANT ASSUMEROLE ON 'arn:aws:iam::YourAccountId:role/dms-access-for-endpoint' TO dms_user FOR ALL;

同时将该角色关联到 Amazon Redshift 集群。

创建 AWS DMS 复制实例

AWS DMS 复制实例扮演从源库加载数据并将数据同步到目标数据库的角色。复制实例大小的选择可参考白皮书。在 AWS DMS 控制台创建复制实例,选择实例大小和引擎版本,在高可用性方面可选择是否进行高可用部署,一般生产环境建议多可用区部署,开发和测试时单可用区部署即可。

为复制实例创建专门的安全组,源库和目标库均需允许来自复制实例安全组的的流量。

创建 AWS DMS 迁移任务

AWS DMS 复制任务定义数据同步的规则,包括指定源端点、目标端点、同步源端点的哪些库和表、设置表和数据的转化规则等。创建迁移任务时,选择之前创建的复制实例、Amazon DocumentDB 源端点、Amazon Redshift 目标端点,迁移类型支持三种选项:迁移现有数据(即全量同步)、迁移现有数据并复制正在进行的更改(即全量加增量数据同步)、仅复制数据更改(即仅增量数据同步)。

在任务开发调试阶段, 建议打开 CloudWatch 日志,以便进行问题排查。

在表映射的选择规则部分,可以设置同步源库中的哪些数据库、表以及设置表的过滤条件。下图中的设置将同步“chats“数据库中的所有表。

在规则转换部分,可以针对表和列设置转换规则,如对表名和列名进行修改、增加列、修改列的字段类型等等。下图中的设置表示将“chats”数据库所有表同步到目标库时都加上“ods_” 这个前缀。选择规则和转换规则可以有多个。

选择“稍后手动启动”迁移任务,完成 AWS DMS 迁移任务的创建。

运行 AWS DMS 迁移任务

迁移任务创建完成后,可以手动启动迁移任务。

任务启动后,可以看到任务的执行状态、任务详细信息以及源表同步的统计信息。如果同步过程中出现错误,可以通过 CloudWatch 日志来进行排查。任务成功执行后,在 Amazon Redshift 中我们就能看到同步的表和数据。并且源库数据修改后,在目标库中能很快看到同步过来的变更数据。

方案二验证

使用 Flink CDC 同步源库数据到 Amazon MSK

在同步源库数据到 Amazon MSK 这部分,我们选择使用编写 Flink CDC 程序并在 Amazon Managed Service for Apache Flink 上运行这种方式。同步数据的 Flink CDC 相关代码见 Github 仓库,也可以直接下载编译好的 Jar 包直接使用。在代码中,我们使用“数据库名.表名.主键值”这种形式作为分区键,这样相同主键的数据变更被发送到同一个分区,保证数据局部有序。Java 代码如下:

JsonElement rootEle = JsonParser.parseString(line);
JsonElement nsEle = rootEle.getAsJsonObject().get("ns");

String db = nsEle.getAsJsonObject().get("db").getAsString();
String coll = nsEle.getAsJsonObject().get("coll").getAsString();
String pkVal = "no_pk";

if (rootEle.getAsJsonObject().has("documentKey")) {
    String documentKeyStr = rootEle.getAsJsonObject().get("documentKey").getAsString();

    JsonObject dkJson = JsonParser.parseString(documentKeyStr).getAsJsonObject();
    pkVal = dkJson.get("_id").toString();
}

String partitionKey = String.format("%s.%s.%s", db, coll, pkVal);
return CDCKafkaModel.of(db, coll, partitionKey, line);

编译或者下载好应用程序包后,我们将应用程序包上传到 Amazon S3 目录中。接下来我们创建托管的 Apache Flink 应用。

1)创建 Flink 应用程序

在亚马逊云科技 Web 控制台,搜索“Managed Apache Flink” 服务,打开其控制台,创建流式应用程序,选择 Flink 1.15 版本,在应用程序配置部分,填入应用名称并选择让 Managed Apache Flink 为应用程序创建相应的 IAM 角色。

在应用程序模板设置部分,可以选择“开发” 或“生产”这两种模板,来设置是否开启快照、应用的监控级别以及应用的并行度等参数,在开发调试阶段可以选择“开发”模板,在正式运行时可以选择“生产”模板,模板中的配置信息在后续可以修改。保存配置后完成 Flink 流式应用的创建。

2)配置 Flink 应用程序

Flink 应用程序创建完成后,需要对应用进行配置。首先指定应用代码存放的 Amazon S3 桶和对象路径,并选择让服务来自动更新 IAM 角色权限。

开启快照可以让应用程序实现 exactly-once 语义,在应用程序更新和重启时保证数据不丢失,在应用调试时可以不用开启,正式线上运行时建议开启。

检查点配置采用默认值即可,即 60 秒更新间隔。

在扩展性配置方面,根据源库源表的数量、数据量以及数据更新频率设置 Flink 应用的并行度,以及每个 KPU 资源(包括 1 个 vCPU,4GB 内存,50GB 磁盘容量)分配多少个并行度,总共需要的 KPU 数量 =(应用的并行度)除以(每个 KPU 的并行度)。可以开启“自动扩缩”选项,让服务自动进行并行度的弹性扩缩,增强应用的弹性。

在应用监控与日志方面,采用默认配置即可。在网络配置方面,选择应用部署的 VPC、子网以及安全组。由于 Flink 应用需要访问源库和 Amazon MSK,因此它们使用的安全组需要允许来自 Flink 应用所使用的安全组流量。

接下来配置应用程序运行时需要的属性配置,根据代码中的参数配置需要,我们按下面的方式设置各个属性的分组、键名以及键值信息。设置完后保存应用配置即完成 Flink 应用的配置。

3)运行 Flink 应用程序

Flink 应用程序配置完成后,就可以运行应用。运行 Flink 应用程序时,可以选择是否基于最新的快照来运行,如果 Flink 应用程序启用了快照,选择基于最新快照来运行可以让应用程序基于上次最新的运行状态启动,实现 exactly-once 的效果,而无快照运行则从头开始运行。测试时,可以选择“无快照运行”。

应用启动后,可以在 Web 控制台查看应用的运行状态、运行指标以及日志,也可以在控制台打开 Flink 控制面板,查看任务的运行情况。如下图所示,可以看到 Flink 应用程序正在运行,并正常同步数据。

使用 Amazon Glue 写入数据到 Amazon Redshift

1)创建 Glue Data Connection

由于 Glue 程序需要同时连接 Amazon MSK 和 Redshift 集群,因此我们首先在 Glue 中建立一个 Data Connection,保证 Glue 程序运行起来后,在网络层面能和 Amazon MSK 和 Redshift 集群相连通。

创建连接时,我们输入连接名称,“Connection type”  选择 “Network”。

在网络选项设置方面,选择 Amazon MSK 和 Redshift 集群所在的 VPC,并选择该 VPC 下的一个子网。在安全组选项方面,建议选择 Redshift 集群使用的安全组,同时在 MSK 集群的安全组中允许来自该安全组的流量。保存即完成 Data Connection 的创建。

2)创建 Glue Job

Github 上的这个文件是完整的消费本方案第一部分写入 Amazon MSK 的 CDC 数据,并将数据写入 Amazon Redshift 的 Glue 脚本。利用此脚本内容在 Amazon Glue 控制台创建一个 Spark Script Job,设置 Job 类型为“Spark Streaming”,选择 Glue 4.0,并选择 G 1X 作为 Worker type 以及设置 Worker 的数量。

在高级选项里,选择之前创建的 Data Connection。

添加额外的 redshift_connector python 模块。保存即完成 Glue Job 的创建。

3)创建配置文件并运行任务

该 Glue Job 需要的运行时参数存放在 JSON 文件中,并从 Amazon S3 中读取 JSON 文件内容。格式如下,填入 Amazon MSK 和 Redshift 相应的连接信息即可。

该任务使用了 Redshift Spark Connector,在向 Redshift 读、写数据时,其会以 Amazon S3 作为数据中转,需要配置 S3 路径作为 tmpdir 的值以及配置 IAM Role 给 Amazon Redshift 在加载 S3 中的数据使用,具体的 IAM Role 配置可以参考这两个链接:链接1链接2。配置完成后即可在 Glue 的控制台设置参数运行,或者通过如下命令行运行该 Glue Job:

aws glue start-job-run \
--job-name YourJobName \
--arguments='{"--S3_CONF_BUCKET": "conf-bucket-name","--S3_CONF_KEY": "path of glue-job.json"}'

在 Glue 控制台可以可以查看 Job 的运行情况,正常运行后,将会看到数据被写入到了 Amazon Redshift 库中,数据内容以 SUPER 字段类型存储,当源端数据表有变更时,也能快速地在目标端体现,并且可以支持源表的 schema 变更。

总结

本文介绍了使用亚马逊云科技服务同步数据到 Amazon Redshift 的两种大的方案,并针对每种方案进行了实例操作验证。方案一使用 AWS DMS 来进行数据同步,该方案架构简单,只需通过配置即可快速实现数据同步,但整个过程可控性较小;方案二首先将源库全量和增量数据同步到 Amazon MSK,然后通过 Spark Structured Steaming 程序消费 MSK 中的数据,并把数据写入 Amazon Redshift,此方案相对复杂,但整个过程都可以按照实际要求来进行定制。本文也给出了参考代码实现,用户可以根据实际情况来选用合适的方案。

参考资料

本篇作者

程亮

AWS 解决方案架构师,负责基于 AWS 云平台的解决方案咨询和设计。有多年的互联网软件研发、系统架构设计及大数据产品开发经验。