亚马逊AWS官方博客
在北京/宁夏区部署 MSK 集群触发跨账号 Lambda 函数的架构
针对企业客户的流式数据处理场景,亚马逊云科技推荐采用 Amazon MSK + Lambda 的无服务器事件驱动处理架构,可以为客户带来诸多的云原生优势受益:
- 保持原生 Apache Kafka 生态的支持,可以通过 Kafka API 将实时数据流导入数据湖仓。
- 基于消息驱动的 Lambda 运行时,可以提供极致的资源弹性和数据流处理时效性。同时兼顾高并发波峰的响应处理能力和资源开销成本。
- 提供广泛的后处理扩展能力,包括多种类型的数据存储服务,全流程安全控制、数据加密存储传输,监控告警日志,以及后续的机器学习和大数据分析应用等。
但目前的企业级客户大多都采用多账号的组织架构,与 MSK 集群相关联的消息生产者和消费者往往分布在不同的账号内。因此,跨账号的 MSK 消费成为了比较普遍的架构需求。在亚马逊云科技的海外区域,客户可以采用 Amazon MSK 和 Lambda ESM for MSK 事件源映射功能,构建跨账号的消息触发处理流程。请参考 Triggering AWS Lambda function from a cross-account Amazon Managed Streaming for Apache Kafka 部署。
在中国大陆的北京/宁夏区域,Lambda ESM for MSK 功能暂未发布,用户在按照海外部署的方式配置基于 MSK 的触发器时,在保存时则会收到“Feature not support in the current region”的信息提示。因此需要采用 Apache Kafka URL 作为跨账号 Lambda trigger 源的方式实现 Amazon MSK + 跨账号 Lambda 的数据流处理架构。本博客将详细介绍在北京/宁夏区,配置 Kafka 原生的 SASL 认证和基于 Apache Kafka URL 事件源,构建 MSK 触发跨账号 Lambda 函数的消息处理架构,实现组织内数据流的自动化处理分析能力。
一、架构描述
在北京/宁夏区域,构建跨账号的 MSK 消息自动化处理,需要利用 Lambda trigger 触发器调用 Apache Kafka URL 的方式下实现跨账号的 Lambda 函数触发。所关联的限制是跨账号的 MSK 集群访问需要采用 SASL/SCRAM 认证方式。同时,还需要为 MSK 集群开启 Kafka ACL 功能。实现的验证环境架构如下图所示。
如图所示,完成本架构的验证部署,需要采用 2 个亚马逊云科技账号,账号 1 为生产者账号,完成消息的采集和 MSK 集群托管;账号 2 为消费者账号,完成消息的自动化订阅和消费处理。架构实现的主要过程步骤包括:
- 在账号 1 生产者账号部署 Amazon MSK 集群,MSK 消息生产者应用。
- 在账号 2 消费者账号部署 MSK 消费者 Lambda 函数,函数的执行过程是从 MSK 订阅消息,并将消息数据写入到目标 S3 桶,实现流数据的持久化存储。
- 在账号 2 的 Amazon MSK 服务创建托管 VPC 连接,通过 Amazon PrivateLink 网络服务构建跨账号的托管私有网络通道。
- 在账号 2 的 Lambda 函数中将 Apache Kafka URL 作为 Lambda 触发器源,创建 Lambda 触发器,实现消息数据的跨账号消费处理。
注意:
为了帮助快速验证整个流程的有效性,本博客还为实验用户提供了 MSK 生产者和消费者的模拟应用,以及样本数据。用户可以通过 AWS CloudFormation 的 yaml 模版文件,快速部署实验环境。源代码已经全部托管在 GitHub。
二、验证部署
1、部署生产者/消费者应用
用户可以通过所提供的 AWS CloudFormation 模版来部署生产者/消费者应用。其中生产者应用为一个 Kafka Client 应用程序,通过配置后可以向 MSK 的终端节点发送消息流。消费者应用为一个 Python Lambda 函数,同样可以通过 AWS Cloud Formation 模版部署,负责将订阅的消息写入到目标 S3 存储桶。源代码可查阅 GitHub。
在北京/宁夏区域部署上述应用,首先需要对 CloudFormation yaml 文件进行代码微调,以适应在不同区域的部署规范。
- 将 producer-account.yaml 模版文件下载到本地,修改的如下段落的代码,将“arn:aws”替换为“arn:aws-cn”。
- 下载并修改 consumer-account.yaml 模版文件的如下代码段,将“arn:aws”替换为“arn:aws-cn”。
- 下载并修改 producer-msk-cluster-policy.yaml 模版文件的如下代码段,将“arn:aws”替换为“arn:aws-cn”。
- 在生产者账号里面,通过 AWS CloudFormation 服务的控制台界面,创建 Stack,上传 producer-account.yaml 模版文件,执行环境部署。成功部署后,可从堆栈的输出中,获取到 MSKClusterArn 和 MSKVPC 等信息,用于后续的其他部署配置。
- 在消费者账号的 CloudFormation 服务的控制台界面,创建 Stack 堆栈,上传 consumer-account.yaml 模版文件,部署创建消费者应用 lambda 函数和 S3 对象存储桶。“MSKAccountId”入参请填入生产者账号的 12 位数字串。部署成功后,可从 Stack 堆栈的输出中获取 LambdaRoleArn 等信息。
2、为集群配置 Kafka ACL 功能和 SASL/SCRAM 认证
AWS Cloudformation 模版文件创建的 MSK 集群默认采用了 IAM 的认证方式,在北京/宁夏区域部署需要添加更新 SASL/SCR 认证方式。同时还需要启动 Kafka ACL 功能,以提供更好的客户端访问安全控制。Kafka ACL 功能启动需提前创建授权用户的白名单。执行配置操作的 EC2 role 需具备 MSK 集群的以下权限:
“kafka:DescribeCluster“
“kafka:GetBootstrapBrokers”
可以通过编辑 EC2 role 的 policy,添加如下 Json 权限代码。
- 通过 EC2 控制台,为 procducer-KafkaClient 实例创建一个内网 SSH 的 endpoint,采用与实例一致的安全组,VPC 和子网。完成创建后,通过 EC2 的 session connect 功能,可以登陆到 EC2 命令行。
注意:如果连接过程中出现错误,请检查 endpoint 的状态是否为可用,安全组是否允许 SSH 端口和来源访问。
- 为 MSK 集群创建一个 ACL 白名单 root 用户。
- 通过 MSK 集群控制台页面,修改 cluster config,启用 ACL,添加如下配置项。
- 为 MSK 集群应用新的 Cluster config 配置,启用 Kafka ACL 功能。
- 通过 MSK 控制台,在集群属性页面,为 MSK 集群开启 SASL 认证。
- 为 MSK 集群关联 SASL/SCRAM 认证的密钥,密钥是通过 Amazon Secrets Manager 托管,命名必须以“AmazonMSK_”开头,采用 Amazon KMS 服务来管理加解密密钥,在 Amazon KMS 服务中创建一个默认对称加密的密钥即可。详细过程步骤请查阅为 Amazon MSK 集群配置 SASL/SCRAM 认证。注意:需要在消费者账号内同样创建 Amazon MSK 密钥文件和 KMS 密钥,在后续的 Lambda 触发器配置时对应配置。
- 在 MSK 集群属性的控制台界面,为 MSK 集群关联 Amazon MSK 密钥文件。
- 为 MSK 集群的安全组添加 9096 端口的 inbound role。同时可以一并添加 14001,14002,14003 端口的 inbound role,为后续的托管 VPC 连接端口放开限制。
- 登陆生产者 EC2 实例,为 Producter Client 配置 SASL/SCRAM 认证方式,详细的操作步骤请查阅为 Amazon MSK 集群配置 SASL/SCRAM 认证的“通过 sign-in 凭证连接到集群”的章节内容。
- 通过 Producter Client 获取 SASL 验证 9096 端点信息,验证 SASL/SCRAM 连接成功。
3、配置 MSK 托管 VPC 连接
通过在消费者账号创建 MSK 托管 VPC 连接,通过 Amazon PrivateLink 服务构建跨账号的私有网络通道,让消费者账号可以访问到生产者账号的 MSK 集群端点信息。
- 开通 MSK 集群的 PrivateLink 功能,并配置认证类型为 SASL/SCRAM 方式。
- 在生产者账号的 Cloud formation 服务控制台部署 producer-msk-cluster-policy.yaml 模版,开通 MSK 集群的跨账号 role 访问权限。其中参数配置来自于前述的 Cloudformation 堆栈。
- 登陆消费者账号的 Amazon MSK 控制台,按照导航步骤创建 MSK 托管 VPC 连接。
注意:
1. 需要填入的是 MSK 集群 arn 字符串,认证方式选择 SASL/SCRAM 等。
2. 生产者账号与消费者账号的 VPC 子网数量和可用区需要一一对应,如果存在消费者账号的子网找不到,可以为消费者账号的其他可用区添加新的子网补充对应关系。 - 为安全组添加 14001,14002,14003 端口的 inbound role,为后续的托管 VPC 连接端口放开限制。
4、配置 Lambda 函数的触发器
- 登陆消费者账号的 Lambda 服务控制台,选择自动创建的 Lambda 函数。为其添加 Trigger 触发器。
- 有别于海外区域部署的触发器类型选择,北京/宁夏区域部署,选择 Apache Kafka,然后依次配置认证方式,Secrets Manager 密钥等信息。
- 其中 Bootstrap servers 一栏每个端点 URL 为一行独立记录。完成配置后的触发器信息类似如下截图。
配置至此,我们整个配置过程就结束。接下来可以开展 MSK + Lambda 处理的全过程测试。
5、验证测试
- 登陆生产者 EC2 实例,让 Producter Client 向 MSK 发送消息数据。
- 直接刷新消费者账号的 S3 目标存储桶,查看成功写入的 S3 对象文件。对比文件数据,可以发现与 Producter Client 的样本数据一致。
注意:如果 S3 桶没有对象文件生成,可以通过查看 Lambda 函数的 log 日志执行分析。
三、资源清理
本博客实验部署会带来一些云计算费用成本,包括 Amazon MSK,Lambda,EC2,EBS 等资源费用。实验结束后:
1. 请登录生产者账号,清理手工创建的 VPC endpoint。
2. 分别登录生产者账号,消费者账号的 Amazon Cloudformation 控制台,将 producer-account.yaml,consumer-account.yaml,producer-msk-cluster-policy.yaml 所创建的堆栈清理掉即可。