亚马逊AWS官方博客
利用 Lambda 将 Kinesis Data Stream 数据批量自动写入 MSK
背景
在混合云架构中,用户的一些应用原本运行在自建的数据中心。这些应用程序统一从 Kafka 中拉取实时数据做分析和处理,例如监控系统、大数据分析平台等。由于业务发展需要,用户将这些工作负载部分迁移到了 AWS 上,或者在 AWS 上构建新的应用。
由于 AWS 部分服务仅支持以 Kinesis Data Stream 方式输出日志或数据,例如 Pinpoint。Kinesis Data Stream 数据投递一般使用 Kinesis Firehose 或者自有应用拉取。而 Kinesis Firehose 的数据投递目标并不支持 Kafka,因此需要用户修改应用拉取数据的方式修改应用去拉取数据。如果这个应用需要同时运行数据中心和 AWS 云上,用户则需要维护支持 2 个接口的应用;或者假如这些应用的数量非常多,用户改造和测试它需要大量的人力和时间的时候;我们会希望通过一种无需运维管理的方式,能自动将 Kinesis Data Stream 中的数据导入到 Kafka 的方案。这样所有的应用无需修改,便能平滑迁移到 AWS 上,同时运维人力成本上也基本不会增加。
AWS Lambda 是一项无服务器事件驱动型计算服务。利用 AWS Lambda 的无服务器特性,我们可以做到无论需要对接多少个 Kinesis Data Stream 都可以支持;每个 Kinesis Data Stream 需要接收多大的数据流量,我们都能自动扩展;而底层资源无需运维管理。
解决方案
架构图
架构说明
本方案完全采用 Serverless 架构,主要通过 Lambda 实现 Kinesis 和 MSK 数据相互传输到目的。当 Lambda 执行失败的时候,会触发 SNS(可替换为SQS)以发送通知信息。
一键部署代码
参考 Github 开源代码
部署方式
先安装 SAM-cli,具体步骤参考:安装 AWS SAMLinux 上的 CLI – AWS Serverless Application Model
然后下载代码修改参数后部署
关键参数
主要配置文件 template.yaml
,MSK 配置在 kinesis_to_msk/config.py
中
环境配置说明
全局配置
函数超时时间默认为:5秒。
函数内存:128MB。如果加大每次批处理的数据量可适当提高内存量。
VPC 配置
注意:除非 MSK 配置为公网可访问,否则必须将 Lambda 配置在和 MSK 同一个 VPC 或者跟 MSK 所在 VPC 已经打通的其他 VPC 内,否则会 lambda 无法正常连接 MSK,引起连接超时。
安全组和子网推荐使用预定义好的,子网至少选择 2 个,保证其高可用。
Kinesis 源配置
详细参数说明参考文档:将 AWS Lambda 与 Amazon Kinesis 结合使用
其中 DestinationConfig
的 Destination
,可以设置成 SQS/SNS。示例中采用 SNS。
Stream
设置为目标 Kinesis 的 ARN
网络
如果 Lambda 和 MSK 不在同一个 VPC 内(包括跨 region 的情况),2 个 VPC 之间需要做 Peering,Lambda 才能访问 MSK。VPC Peering 创建方法参考:创建 VPC 对等连接
安全性
安全组设置
同 region 访问:
- MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的安全组
- Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的安全组
跨 region 访问:
- MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的所在 VPC 的 subnet CDIR。如果 Lambda 部署在多个 subnet 中,需要允许多个 CIDR 访问。
- Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的所在 VPC 的 subnet CDIR。
安全组配置方法参考:使用安全组控制到资源的流量
MSK 认证设置
推荐安全策略:MSK 集群使用 IAM Role 进行认证,然后对 Lambda 附加相关的 Policy。参考 Amazon MSK 如何与 IAM 协同工作
其他安全策略:如果 MSK 使用其他认证方式,除了用户名密码/pem 证书外,访问控制也受到安全组规则设置限制。
常见问题
问题一
KafkaTimeoutError: Failed to update metadata after 60.0 secs
原因分析
目标 Topic 不存在
可能的解决方案:
- 修改 MSK 集群的配置,设置
create.topics.enable=true
- 手动创建目标 Kafka 的 Topic
问题二
Lambda 执行超时
原因分析
Kafka 访问失败,失败原因可能是因为网络无法连通;也可能是问题一引起的超时。
可能的解决方案
- 检查 Lambda 和 MSK 所在的 VPC,是否为同一 VPC 或者 2 个 VPC 之间是否已经 Peering。VPC Peering 创建方法参考:创建 VPC 对等连接
- 检查 Lambda 使用的安全组,确保在出站规则中,允许
9092-9093
端口,目标为 MSK 使用的安全组。 - 检查 MSK 使用的安全组,确保在入站规则中,允许
9092-9093
端口,源包含 Lambda 使用的安全组。 - 如果 MSK 使用用户名密码方式访问,请确保用户名密码正确。
其他优化
在数据量足够大的情况下,需要调节 BatchSize,加大 ParallelizationFactor。在调整参数后需要测试实际数据传输的时间,并且调整 Lambda 执行的超时时间。