亚马逊AWS官方博客

通过集成Amazon Kinesis Service 在Amazon DocumentDB (兼容MongoDB)上实现实时数据同步

Amazon DocumentDB (兼容MongoDB)是一项快速、可扩展、具备高可用性的全托管文档数据库服务,可支持MongoDB工作负载。您可以使用相同的MongoDB应用程序代码、驱动程序与工具运行、管理并扩展Amazon DocumentDB上的工作负载,且不必分神于底层基础设施管理事务。作为一套文档数据库,Amazon DocumentDB极大简化了对JSON数据的存储、查询与索引流程。

实时数据同步是Amazon DocumentDB常见的需求场景。例如,场景1: 业务数据实时同步:有零售集团, 需要实时将产品目录更新信息, 从公司总部文档数据库下发和实时同步到子公司文档数据库, 使公司各层级销售部门, 实时获取产品最新信息, 上下协同,及时掌握和应对市场变化。场景2: 实时数据在线迁移:需要文档数据库之间在线数据迁移, 例如DocumentDB和MongoDB之间在线数据迁移, 可结合文档数据库离线方式数据导入和导出(Mongodump/resotre)和实时数据同步。

在本博文中,我们将向您介绍如何将Amazon DocumentDB与Amazon Kinesis相集成,实现Amazon DocumentDB之间或者Amazon DocumentDB和MongoDB之间的实时数据同步。具体来讲,我们将向您展示如何使用变化数据捕获Python程序将事件从Amazon DocumentDB集群的变更流传输至Amazon Kinesis Data Stream,再使用变化数据消费Python程序将事件从Amazon Kinesis Data Stream实时应用至目标文档数据库DocumentDB或者MongoDB。

下图所示,为这套解决方案的参考架构:

 

演示概述

本文具体涵盖以下任务:

  1. 部署一套AWS CloudFormation模板以启动以下组件:
    1. Amazon DocumentDB集群
    2. Amazon EC2 BastionHost环境
  2. 设置Amazon EC2 BastionHost环境。
  3. 在Amazon DocumentDB上启用变更流。
  4. 创建Kinesis Data Stream
  5. 设置并部署实时变化数据捕获和应用Python 应用程序
  6. 演示实时数据同步。

 

部署一套CloudFormation模板

AWS CloudFormation提供一种通用语言,供您在云环境中对AWS资源进行建模及配置。在本演练中,您将部署一套CloudFormation模板,用于创建以下内容:

  1. Amazon DocumentDB集群 – 模拟源和目标DocumentDB环境
  2. Amazon EC2 BastionHost环境 – 部署和运行实时数据捕获和应用程序

要部署此模板,请完成以下操作步骤:

  • 请使用自己帐号 (region请选择 us-east-1)
  • 下载Cloudformation Yaml文件:

https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/docdb_changestream.yaml

  • 在AWS CloudFormation控制台上, 选择 Create stack。
  • 选择 Upload a template file。
  • 选择 choose file。
  • 上传之前下载到本地docdb_changestream.yaml 文件。
  • 选择 Next。
  • 为您的Amazon DocumentDB集群输入名称、用户名、密码与标识符。
  • 如果您已经拥有角色,请选择 true。如果还没有角色,请选择 false,AWS CloudFormation模板将为您创建新角色。
  • 其他部分保留默认设置,之后选择 Next。
  • 选中复选框,允许栈为您创建角色选择 Create stack。

CloudFormation一般在几分钟内完成。
CloudFormation 创建成功后, 请访问CloudFormation output Tab, 查看后续需要访问EC2 BastionHost 访问地址和DocumentDB Cluster Endpoint:

 

设置Amazon EC2 BastionHost环境

连接到新建的Ec2堡垒机:

修改证书文件权限

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]

执行aws configure:

aws configure

default region name, 输入: “us-east-1”,其它选择缺省设置

安装Mongo Shell

安装4.0 mongo shell,请在命令提示符中使用以下命令创建repo文件:

echo -e "[mongodb-org-4.0] \nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/\ngpgcheck=1 \nenabled=1 \ngpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo

完成之后,使用以下命令安装mongo shell:

sudo yum install -y mongodb-org-shell

下载DocumentDB数据库证书

wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem

安装Pymongo:

sudo pip3 install pymongo

安装boto3:

sudo pip3 install boto3

 

Amazon DocumentDB上启用变更流

Amazon DocumentDB变更流提供一条按时间顺序排列的更新事件序列,用以囊括发生在集群内各集合与数据库中的更新事件。可以轮询DocumentDB集群上的变更流,并在发生变更事件(INSERTS, UPDATES以及DELETES)时进行读取。我们使用变更流,将变更事件从您的Amazon DocumentDB集群流式传输至Amazon Kinesis Data Stream。

要在集群上启用变更流,请输入以下代码(请将相应部分替换为您的集群值)。首先,我们使用mongo shell登录数据库:

export USERNAME=<DocumentDB cluster username>

echo "export USERNAME=${USERNAME}" >> ~/.bash_profile

export PASSWORD=<DocumentDB cluster password>

echo "export PASSWORD=${PASSWORD}" >> ~/.bash_profile

export CLUSTERENDPOINT= 【上图Cloudformation Outputs ClusterEndpoint】

echo "export CLUSTERENDPOINT=${CLUSTERENDPOINT}" >> ~/.bash_profile

登录至您的Amazon DocumentDB集群

mongo --ssl --host $CLUSTERENDPOINT:27017 --sslCAFile rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD

接下来,在集群上启用DocumentDB变更流:

在集群级别启用DocumentDB变更流, 集群上所有数据库上的数据变化都将被捕获至DocumentDB Change Stream:

db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

您应得到以下响应结果:

{ "ok" : 1 }

 

创建Kinesis Data Stream

aws kinesis create-stream --stream-name 请输入您的Kinesis Data Stream Name】 --shard-count 1

 

设置并部署实时数据捕获和应用Python程序

登录到EC2 BastionHost, 下载Python程序:

下载实时数据捕获Python程序:

wget https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/changestream_capture.py .

下载实时数据应用Python程序

wget https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/changestream_apply.py .

下载运行实时数据捕获和数据应用程序所需要的系统环境变量sample文件:

wget https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/.bash_profile_sample .

修改和设置操作系统环境变量

修改实时变化数据捕获应用相关操作系统变量:

操作系统变量名 操作系统变量值示例 备注
USERNAME masteruser 源端DocumentDB用户名
PASSWORD password 源端DocumentDB用户密码
CLUSTERENDPOINT clusterendpoint 源端DocumentDB终端节点
SSL True or False 源端DocumentDB SSL是否启用
WATCHED_DB_NAME bar 数据同步的源端DocumentDB 数据库  
STATE_DB bar_stat 存放同步状态的源端DocumentDB 数据库
STATE_COLLECTION bar_stat 存放同步状态的源端DocumentDB collection
KINESIS_STREAM test 前面新建Kinesis Data Stream名字
STATE_SYNC_COUNT 1 源端同步n条记录后 记录同步状态
MAX_LOOP 100000 执行数据捕获和数据应用的循环次数

修改实时数据应用程序相关操作系统变量:

操作系统变量名 操作系统变量值示例 备注
TARGET_USERNAME masteruser 目标端DocumentDB或者MongoDB用户名
TARGET_PASSWORD password 目标端DocumentDB或者MongoDB用户密码
TARGET_CLUSTERENDPOINT clusterendpoint 目标端DocumentDB或者MongoDB终端节点
Target SSL True or False 目标端DocumentDB或者MongoDB SSL是否启用
TARGET_DB_NAME bar_new 数据应用的目标端DocumentDB或者MongoDB数据库  
SEQ_DB bar_new 目标端DocumentDB或者MongoDB存放同步状态的数据库
SEQ_COLLECTION sequence 目标端DocumentDB或者MongoDB存放同步状态的collection
KINESIS_STREAM test 前面新建Kinesis Data Stream名字
MAX_LOOP 100000 执行数据捕获和数据应用的循环次数

修改您实时数据捕获和实时数据应用相关的操作系统环境变量:

 根据您的实际环境,修改.bash_profile sample文件

更改.bash_profile sample 为ec2-user用户下的.bash_profile

mv .bash_profile .bash_profile.bak

mv .bash_profile_sample .bash_profile

. ~/.bash_profile

演示实时数据同步

为了简化演示环境, 源和目标文档数据库均设置为同一套DocumentDB集群, 在实际环境,根据需求设置分别的源端DocumentDB数据库和目标端DocumentDB或者MongoDB数据库。

以下演示将模拟场景1: 业务数据实时同步:有零售集团, 需要实时将产品目录更新信息, 从公司总部文档数据库下发和实时同步到子公司文档数据库,将模拟总部文档数据库的数据的增删改如何实时同步到子公司文档数据库。关于场景2:在线数据迁移也可以采用同样的实时数据同步的方案,在此博文中不再花篇幅赘述。

演示环境的实时数据捕获和实时数据应用程序采用Python程序部署,在实际环境中部署也可以采用Lambda无服务架构,来进一步优化部署架构和成本。

演示环境实时消费和应用Kinesis数据采用Kinesis Data Stream API开发, 在实际开发和部署生产环境,也可参考Kinesis KCL实现实时消费和应用Kinesis数据。

运行实时数据捕获程序:

打开终端窗口1, 连接到Ec2堡垒机,运行实时数据捕获应用:

python3 changestream_capture.py

运行实时数据应用程序:

打开终端窗口2, 连接到Ec2堡垒机,运行实时数据应用程序:

python3 changestream_apply.py

打开终端窗口3, 连接到Ec2堡垒机,运行Mongo Shell模拟连接到源端和目标端文档数据库:

mongo --ssl --host $CLUSTERENDPOINT:27017 --sslCAFile rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD

在源端文档数据库插入一条产品信息:

运行mongo shell

use bar

db.products.insert({

"name" : "RayBan Sunglass Pro",

"sku" : "1590234",

"description" : "RayBan Sunglasses for professional sports people",

"inventory" : 100

})

观测实时数据捕获应用输出, 插入记录已被捕获:

观测实时数据apply应用输出, 插入记录已被应用:

连接到目标端文档数据库, 查看新插入产品信息是否被同步:

运行mongo shell

use bar_new

db.products.find()

{ "_id" : ObjectId("60d97230a2e692800e1a2b76"), "name" : "RayBan Sunglass Pro", "sku" : "1590234", "description" : "RayBan Sunglasses for professional sports people", "inventory" : 100 }

在源端文档数据库更改一条产品信息:

运行mongo shell

use bar

db.products.update(
{"sku":"1590234"},
{
$set: {
"reviews" : [{
"rating" :4,
"review":"perfect glasses"
},{
"rating" :4.5,
"review":"my priced posession"
},{
"rating" :5,
"review":"Just love it"
}]}
}
)

观测实时数据捕获应用输出, 更新记录已被捕获:

观测实时数据apply应用输出, 更新记录已被应用:

连接到目标端文档数据库, 查看更新产品信息是否被同步:

运行mongo shell

use bar_new

db.products.find()

{ "_id" : ObjectId("60d97230a2e692800e1a2b76"), "name" : "RayBan Sunglass Pro", "sku" : "1590234", "description" : "RayBan Sunglasses for professional sports people", "inventory" : 100, "reviews" : [ { "rating" : 4, "review" : "perfect glasses" }, { "rating" : 4.5, "review" : "my priced posession" }, { "rating" : 5, "review" : "Just love it" } ] }

在源端文档数据库删除产品信息:

运行mongo shell

use bar

db.products.remove({"sku":"1590234"}) 

观测实时数据捕获应用输出, 删除记录已被捕获:

观测实时数据apply应用输出, 删除记录已被应用:

连接到目标端文档数据库, 查看更新产品信息是否被同步:

运行mongo shell

use bar_new

db.products.find()

记录也被删除,没有记录被查询

如果在运行上述演示步骤,出现异常情况,可以重置演示环境,重新再执行上述演示步骤:

  1. 删除之前创建的Kinesis Data Stream

aws kinesis delete-stream --stream-name 请输入您的Kinesis Data Stream Name】

  1. 重新创建Kinesis Data Stream和重置同步状态表

wget  https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/reinitiate-new.py .

python3 reinitiate-new.py

 

清理资源

要清理本次演练中创建的资源,请导航至AWS CloudFormation控制台,找到您为本次演练创建的栈,而后一一将其删除。再请删除之前创建的Kinesis Data Steam,操作之后,即可删除与演练相关的所有资源。

 

总结

在本博文中,我们将向您介绍如何将Amazon DocumentDB与Amazon Kinesis相集成,实现Amazon DocumentDB之间或者Amazon DocumentDB和MongoDB之间的实时数据同步。

 

参考资源

  • DocumentDB 开发者指南:https://docs.aws.amazon.com/documentdb/latest/developerguide/index.html
  • Kinesis 开发者指南:https://docs.aws.amazon.com/streams/latest/dev/introduction.html

 

本篇作者

刘冰冰

AWS数据库解决方案架构师,负责基于AWS的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入AWS 之前曾在Oracle工作多年,在数据库云规划、设计运维调优、DR解决方案、大数据和数仓以及企业应用等方面有丰富的经验。

李迎峰

AWS 解决方案架构师。负责基于 AWS 云计算解决方案架构的咨询和设计,同时致力于 AWS 云服务在国内半导体行业的应用和推广。在加入 AWS 前,拥有超过18年的IT项目经验,曾就职于Oracle,主要服务于大中型企事业单位客户。