亚马逊AWS官方博客

利用 Amazon DocumentDB 变更流(Change Streams)实现事件驱动架构

概览

Amazon DocumentDB 中的变更流功能(兼容 MongoDB)提供了集群集合中发生的一系列按时间排序的变更事件。应用可以通过监听这种事件来完成多种业务场景。例如数据的同步,实时数据分析,或者当某种特定事件发生时发送告警等。在此文章中我们利用 Amazon DocumentDB 对 AWS IoT Chat Application 的聊天室数据进行持久化,并通过变更流功能对聊天信息进行分析,检测其中是否存在欺诈信息。

注意:文章的主要目的是演示 Amazon DocumentDB 的事件流功能,对欺诈信息的检测采用简单的关键词匹配方式进行模拟,实际当中欺诈检测可能需要更加复杂的实现方式。

整体架构如下图:

1. 虚线的左半部分主要由 AWS IoT Chat Application 提供,我们仅需要增加相应的 IoT 消息路由规则使得 IoT 消息可以被 Lambda 接收到。

2. 右半部分是用来演示 Amazon DocumentDB 事件流的主要部分。

3. 文章中用到的代码可以从 Github 中找到。

实现步骤

在此演示中我们将在默认的 VPC 中创建 Amazon DocumentDB 以及 Lambda 等资源。

1. 按照 AWS IoT Chat Application 中的提示执行其中的 CloudFormation Template 来部署后端服务。注意,在测试过程中我使用了 Node 14.20.1 版本才能正常部署此应用。而用于演示变更流的 Lambda 采用 Node 16.15.0 版本。

2. 为 DocumentDB 创建安全组以及安全组规则,执行以下 AWS CLI 命令:

VPC=$(aws ec2 describe-vpcs --region us-east-2 --filters Name=is-default,Values=true --query 'Vpcs[].VpcId'  --output text)
aws ec2 create-security-group --group-name docdb-sg --description "security group for DocumentDB" --vpc-id $VPC --region us-east-2

sg=$(aws ec2 describe-security-groups \
    --filters Name=group-name,Values=docdb-sg \
--query "SecurityGroups[*].GroupId" --region us-east-2 --output text)
	aws ec2 authorize-security-group-ingress \
    --group-id $sg \
    --protocol tcp \
    --port 27017 \
    --cidr `aws ec2 describe-vpcs --region us-east-2 --filters Name=is-default,Values=true --query 'Vpcs[].CidrBlockAssociationSet[].CidrBlock'  --output text`

3. 创建 Amazon DocumentDB

下载 CloudFormation Template 并为其中的 DBCluster 增加 VpcSecurityGroupIds 参数。修改后的 CloudFormation Template 片段注意先执行注释中的命令,然后将结果填入 Default 对应的值中。

4. 为 Amazon DocumentDB 的 chatdb 的 chatroom 集合(集合的概念类似于关系数据库中的表)激活变更流功能。在 DocumentDB 的控制台中提供了如何以命令行方式访问数据库的说明。

rs0:PRIMARY> use chatdb
switched to db chatdb
rs0:PRIMARY> db
chatdb
rs0:PRIMARY> db.adminCommand({modifyChangeStreams: 1,
database: "chatdb",
collection: "chatroom", 
enable: true});
{ "ok" : 1, "operationTime" : Timestamp(1684735194, 1) }

5. 将 Amazon DocumentDB 的相关访问信息注册到 Secrets Manager 中便于后续从 Lambda 中安全访问。配置好后记录下相应的 Secret ARN。

6. 创建用于持久化聊天信息的 Lambda Function,名为 persistChatMessage。注意为其配置 VPC 和安全组。

7. 为 persistChatMessage Function创建环境变量来访问 Secrets Manager 中 DocumentDB 的认证信息。

aws lambda update-function-configuration --function-name persistChatMessageV1 \
    --environment "Variables={SecretArn=在第3步中记录的ARN}"

8. 为了让 persistChatMessage Function 正常访问 Secrets Manager 和 DocumentDB,需要为其对应的 Role 增加如下权限,可以采用 Inline Policy 的方式:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "rds:DescribeDBClusters",
                "rds:DescribeDBClusterParameters",
                "rds:DescribeDBSubnetGroups",
                "ec2:CreateNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeVpcs",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeSubnets",
                "ec2:DescribeSecurityGroups",
                "kms:Decrypt",
                "secretsmanager:GetSecretValue"
            ],
            "Resource": "*"
        }
    ]
}

9. 为了能够在私有子网中访问到相应的服务,需要为 Default VPC 创建 VPC Endpoints,执行如下 AWS CLI 命令,注意替换命令中的参数为您所在环境的参数:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-0d68bd7c9c61695eb \
    --vpc-endpoint-type Interface \
    --service-name com.amazonaws.us-east-2.secretsmanager \
    --subnet-ids "subnet-04fad0e27c6233e0b" "subnet-0fe5bb83384625f05" "subnet-06b9f5c745fecffe8" \
    --security-group-id "sg-013501d5ae3482555" "sg-0f806f50eb3125191"

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-0d68bd7c9c61695eb \
    --vpc-endpoint-type Interface \
    --service-name com.amazonaws.us-east-2.lambda \
    --subnet-ids "subnet-04fad0e27c6233e0b" "subnet-0fe5bb83384625f05" "subnet-06b9f5c745fecffe8" \
    --security-group-id "sg-013501d5ae3482555" "sg-0f806f50eb3125191"
    
aws ec2 create-vpc-endpoint \
    --vpc-id vpc-0d68bd7c9c61695eb \
    --vpc-endpoint-type Interface \
    --service-name com.amazonaws.us-east-2.sts \
    --subnet-ids "subnet-04fad0e27c6233e0b" "subnet-0fe5bb83384625f05" "subnet-06b9f5c745fecffe8" \
    --security-group-id "sg-013501d5ae3482555" "sg-0f806f50eb3125191"

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-0d68bd7c9c61695eb \
    --vpc-endpoint-type Interface \
    --service-name com.amazonaws.us-east-2.sns \
    --subnet-ids "subnet-04fad0e27c6233e0b" "subnet-0fe5bb83384625f05" "subnet-06b9f5c745fecffe8" \
    --security-group-id "sg-013501d5ae3482555" "sg-0f806f50eb3125191"

10. 为 persistChatMessage 部署代码,用 npm install 之后可以用其目录下的 sh 脚本将代码上传到 Lambda 中。

11. 按如下方式创建一条 IoT 消息路由规则,注意其中的 SQL statement 中的 topic 为‘room/public/#’,这是在 AWS IoT Chat Application 中定义的 topic 前缀 room/public 决定的,这里的‘#’通配符可以确保所有聊天室的消息都会被接收到。如果这里指定为明确的 topic 名称,例如 room/public/room1 则只有聊天室名称为 room1 的聊天室消息会被接收到。

12. 执行到这里已经可以做一些验证工作了,确保前面的配置是正确的。MQTT 提供了一个 test client,可以利用此 client 向任意 topic 发送消息。例如可以向名为/room/public/room1 的 topic 发送如下消息,如果一切正常我们将看到在 DocumentDB 的 chatdb 的 chatroom 集合中多了一条数据。

{
  "message": "Hello from AWS IoT console"
}

13. 创建另一个 Lambda Function 用于处理 Amazon DocumentDB 变更事件。

Lambda 函数创建好以后还需要为其角色增加权限,注意修改下面的角色名称为你环境中的角色名称:

aws iam put-role-policy --role-name eventsDetectorLambdaExecutionRole --policy-name inlinePolicy --policy-document file://detector-lambda-role-permission-policy.json
aws iam attach-role-policy --role-name eventsDetectorLambdaExecutionRole --policy-arn arn:aws:iam::aws:policy/AmazonSNSFullAccess

14. 为 Lambda 配置事件源,这里我们将选择 DocumentDB 作为事件源,并选择正确的数据库和用于存储聊天信息的集合。

事件源配置好以后你可能会注意到 Last processing result 中有报连接错误:

这有可能是状态还没有刷新出来,可以继续下面的测试而不需要等待,如果一切正常,状态会在几分钟后变为 OK,如下:

15. eventsDetector 对应的代码在这里,同样先用 npm install 安装依赖包,然后通过期目录下的 sh 脚本来上传代码至 Lambda 中。

16. 如前面所介绍的,当 Lambda 函数检测到有可能存在欺诈的情况会通过 SNS 发送邮件,所以还需要创建 SNS Topic,注意 SNS 会向您的邮箱发送确认邮件,只有您同意接收通知后 SNS 才会发送邮件通知。

aws sns create-topic --name chat_event_topic
{
    "TopicArn": "arn:aws:sns:us-east-2:account-id:chat_event_topic"
}
aws sns list-topics

aws sns subscribe --topic-arn arn:aws:sns:us-east-2: account-id:chat_event_topic --protocol email --notification-endpoint your-email@company.com
{
    "SubscriptionArn": "pending confirmation"
}

最后还需要将创建好的 SNS Topic ARN 配置为 eventsDetector 函数的环境变量,因为在函数的代码中会根据此环境变量的值来触发 SNS 通知。

aws lambda update-function-configuration --function-name eventsDetector \
    --environment "Variables={TopicArn=arn:aws:sns:us-east-2: account-id:chat_event_topic}"

现在,所有的配置和部署工作都已经完成了,您可以在本机启动 AWS IoT Chat Application 的前端 UI,并创建一个聊天室开始聊天。

17. 下面例子中模拟了多人聊天的情况,当其中名为“Jackson”的聊天者发送了一条与预先定义的关键字“money”相匹配的消息时,欺诈检测条件被触发。订阅了SNS的邮箱将收到提醒邮件并提示:

Warning: Please note that the message received is highly potential of fruad. Message  is “I know some business that can bring you money quickly with little investment”

下面是用于检测欺诈逻辑的 Lambda 函数,可以看出其定义的方式并没有什么特别,只要将 DocumentDB 作为此 Lambda 函数的触发事件源(不体现在 Lambda 代码中),DocumentDB 的数据操作事件就会被当作参数传递给 Lambda 函数。在此模拟函数中,其检测逻辑只是简单的单词匹配然后通过 SNS 服务触发通知,在实际当中可以通过调用其他检测服务来实现更复杂检测逻辑。

const fs = require('fs');
const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns")
var words = JSON.parse(fs.readFileSync('words.json', 'utf8')); 

function replaceAll(search, replace) {
    return this.split(search).join(replace);
}
String.prototype.replaceAll = replaceAll

exports.handler = async (event) => {
    
    const message = event.events[0].event.fullDocument.message;
    const normalizedMsg = message.replaceAll("'", "").replaceAll("\"", "").replaceAll("'", "");
    const list = normalizedMsg.split(" ")
    const matchedWords = (words.filter(value => list.includes(value))).length;
    console.log(`message is ${message}, matched words is ${matchedWords}`);
    if (matchedWords > 0) {
        const snsClient = new SNSClient({ region: "us-east-2" });
        var params = {
            Message: "Warning: Please note that the message received is highly potential of fruad. Message is " + event.events[0].event.fullDocument.message, // MESSAGE_TEXT
            TopicArn: process.env.TopicArn
        };
        const data = await snsClient.send(new PublishCommand(params));
    }

    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from Lambda!'),
    };
    return response;
};

总结

这是一个完全基于假设的场景,不过它足以用来说明 DocumentDB 变更事件的能力,在实际项目当中,数据可能是由各种业务逻辑的代码写入到数据库中的,而通过监控事件流可以很好的实现事件驱动的架构,将数据写入和修改的逻辑与事件处理逻辑完全解耦,实现更优美的架构设计。例如您可能有一个订单管理服务在持续接收客户的订单,而另一边通过监听订单创建或修改的事件流可以触发对货物的库存进行修改或者启动邮寄流程。

本篇作者

王崇

亚马逊解决方案架构师,负责企业架构设计,解决方案设计,协助企业加速上云流程和数字化转型。