亚马逊AWS官方博客
使用 Amazon Data Firehose (预览版)将更改从数据库复制到 Apache Iceberg 表
今天,我们宣布在 Amazon Data Firehose 中推出一项新功能预览版,该功能可捕获 PostgreSQL 和 MySQL 等数据库所做的更改,并将更新复制到 Amazon Simple Storage Service (Amazon S3) 上的 Apache Iceberg 表中。
Apache Iceberg 是一种用于执行大数据分析的高性能开源表格式。Apache Iceberg 为 S3 数据湖带来 SQL 表的可靠性和简单性,使 Apache Spark、Apache Flink、Trino、Apache Hive 和 Apache Impala 等开源分析引擎能够同时处理相同的数据。
这项新功能提供了一个简单的端到端解决方案,能够在不降低数据库应用事务性能的前提下,实现数据库更新的流式传输。您只需几分钟就能配置一个 Data Firehose 数据流,它能够从数据库中提供更改数据捕获 (CDC) 更新。现在,您可以轻松地将来自不同数据库的数据复制到 Amazon S3 上的 Iceberg 表中,并将最新数据用于大规模分析和机器学习 (ML) 应用程序。
典型的 Amazon Web Services (AWS) 企业客户将数百个数据库用于事务应用程序。为了对最新数据进行大规模分析和机器学习,他们希望捕获数据库中的更改,如插入、修改或删除表中记录时的更改,并将更新以 Apache Iceberg 等开源表格式传送到数据仓库或 Amazon S3 数据湖。
为此,许多客户开发了提取、转换、加载 (ETL) 作业,定期从数据库读取数据。但是,ETL 读取器会影响数据库事务性能,并且批处理作业可能会在数据可用于分析之前增加数小时的延迟。为了减轻对数据库事务性能的影响,客户希望能够对数据库中的更改进行流式处理。该数据流被称为更改数据捕获 (CDC) 流。
我接触过不少客户,他们都在使用像 Debezium 之类的开源分布式系统,此类系统具有到流行数据库的连接器、Apache Kafka Connect 集群和 Kafka Connect Sink,用于读取事件并将其传输到目标。此类系统的初始配置和测试涉及安装和配置多个开源组件。这可能得花上几天到几周的时间。设置完成后,工程师必须监控和管理集群,验证和应用开源更新,这增加了运营开销。
借助这种新的数据流传输功能,Amazon Data Firehose 增加了从数据库获取 CDC 数据流并将其持续复制到 Amazon S3 上 Apache Iceberg 表的能力。您可以通过指定源和目标来设置 Data Firehose 流。Data Firehose 捕获并持续复制初始的数据快照,然后以数据流的形式复制对选定数据库表的所有后续更改。为了获取 CDC 流,Data Firehose 使用数据库复制日志,这样可以有效减少对数据库事务性能的影响。随着数据库更新量的增减,Data Firehose 能够自动对数据进行分区,并保持记录,直到这些数据被成功传输到目标。您无需预置容量,也无需管理和微调集群。除了数据本身,Data Firehose 可以使用与数据库表相同的模式自动创建 Apache Iceberg 表,作为初始 Data Firehose 流创建的一部分,并根据源模式更改自动发展目标模式,如添加新列。
因为 Data Firehose 是一项完全托管的服务,您无需依赖开源组件,也不用自己进行软件更新或承担运营成本。
使用 Amazon Data Firehose 持续将数据库更改复制到 Amazon S3 中的 Apache Iceberg 表,可为您提供了一个简单、可扩展的端到端托管解决方案,将 CDC 数据流传输到数据湖或数据仓库,并在其中运行大规模分析和机器学习应用程序。
让我们看看如何配置新管道
为了向您展示如何创建新的 CDC 管道,我使用 AWS 管理控制台设置了一个 Data Firehose 数据流。像往常一样,我还可以选择使用 AWS 命令行界面(AWS CLI)、AWS SDK、AWS CloudFormation 或 Terraform。
在本演示中,我选择 Amazon Relational Database Service (Amazon RDS) 上的 MySQL 数据库作为源。Data Firehose 还可与 Amazon Elastic Compute Cloud (Amazon EC2) 上的自行管理的数据库配合使用。为了在部署数据库的虚拟私有云 (VPC) 和 RDS API 之间建立连接,同时避免将流量暴露在互联网上,我创建了一个 AWS PrivateLink VPC 服务端点。您可以按照 Amazon RDS 文档中的说明,了解如何为 RDS API 创建 VPC 服务端点。
我还有一个用于托管 Iceberg 表的 S3 存储桶,并且设置了一个具有适当权限的 AWS Identity and Access Management (IAM) 角色。您可以参阅 Data Firehose 文档中的先决条件列表。
首先,我打开控制台并导航到 Amazon Data Firehose 部分。我可以看到已经创建的流。要创建新的流,我选择创建 Firehose 流。
我选择源和目标。在此示例中:MySQL 数据库和 Apache Iceberg 表。我还会为流输入 Firehose 流名称。
我输入数据库端点的完全限定的 DNS 名称和数据库 VPC 端点服务名称。我确认已选中 启用 SSL,然后在 密钥名称一栏,在安全存储数据库用户名和密码的 AWS Secrets Manager 中选择密钥的名称。
接下来,我配置了 Data Firehose,通过使用显式名称或正则表达式指定数据库、表和列来捕获特定数据。
我必须创建一个水印表。在这种情况下,水印是 Data Firehose 用来跟踪数据库表增量快照进度的标记。它有助于 Data Firehose 识别表中哪些部分已经捕获,哪些部分仍需处理。我可以手动创建水印表,也可以让 Data Firehose 自动创建水印表。在这种情况下,传递给 Data Firehose 的数据库凭证必须具有在源数据库中创建表的权限。
接下来,我配置要使用的 S3 存储桶区域和名称。如果 Iceberg 表还不存在,Data Firehose 可以自动创建。同样,当检测到数据库模式发生变化时,它可以更新 Iceberg 表模式。
最后一步是启用 Amazon CloudWatch 错误日志,以便获取有关流进程和最终错误的反馈。您可以在 CloudWatch 日志组上设置较短的保留期,用以降低日志存储成本。
查看配置后,我选择创建 Firehose 流。
创建流后,它就会开始复制数据。我可以监控流的状态,并检查是否有最终错误。
现在,是时候测试流了。
我打开与数据库的连接,在一个表中插入新行。
接着,我进入了配置为目标的 S3 存储桶,发现已经生成了一个文件来存放表中的数据。
我下载了这个文件,然后用 parq
命令检查内容(您可以通过 pip install parquet-cli
安装此命令)
当然,下载和检查 Parquet 文件只是为了演示目的。在实际操作中,您将使用 AWS Glue 和 Amazon Athena 来管理数据目录并对数据运行 SQL 查询。
注意事项
以下是您需要了解的一些其他事项。
这项新功能支持 Amazon EC2 上的自我管理 PostgreSQL 和 MySQL 数据库,以及 Amazon RDS 上的以下数据库:
在预览期间和正式发布之后,团队将继续增加对其他数据库的支持。他们告诉我,他们已经在积极推进对 SQL Server、Oracle 和 MongoDB 数据库的支持工作。
Data Firehose 使用 AWS PrivateLink 连接到Amazon Virtual Private Cloud (Amazon VPC) 中的数据库。
设置 Amazon Data Firehose 传输流时,您可以指定特定的表和列,也可以使用通配符指定表和列的类别。使用通配符时,如果在创建 Data Firehose 流后将新表和列添加到数据库,且这些表和列与通配符匹配,则 Data Firehose 会自动在目标中创建这些表和列。
定价和可用性
除中国区域、AWS GovCloud(美国)区域和亚太地区(马来西亚)区域外,所有 AWS 区域均提供新的数据流传输功能。我们期待您评估这项新功能,并向我们提供一些反馈意见。预览期间,这项功能是完全免费的。将来某个时刻,这项功能将根据实际使用量来计费,比如依据您读取和传输的数据量(以字节计)来定价。没有任何承诺要求,也不需要前期投资。请务必查看定价页面,以获取详细信息。
现在,去 Amazon S3 配置第一个持续复制到 Apache Iceberg 表的数据库,然后访问 http://aws.amazon.com/firehose。
*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。