亚马逊AWS官方博客

利用 Lambda 将 Kinesis Data Stream 数据批量自动写入 MSK

背景

在混合云架构中,用户的一些应用原本运行在自建的数据中心。这些应用程序统一从 Kafka 中拉取实时数据做分析和处理,例如监控系统、大数据分析平台等。由于业务发展需要,用户将这些工作负载部分迁移到了 AWS 上,或者在 AWS 上构建新的应用。

由于 AWS 部分服务仅支持以 Kinesis Data Stream 方式输出日志或数据,例如 Pinpoint。Kinesis Data Stream 数据投递一般使用 Kinesis Firehose 或者自有应用拉取。而 Kinesis Firehose 的数据投递目标并不支持 Kafka,因此需要用户修改应用拉取数据的方式修改应用去拉取数据。如果这个应用需要同时运行数据中心和 AWS 云上,用户则需要维护支持 2 个接口的应用;或者假如这些应用的数量非常多,用户改造和测试它需要大量的人力和时间的时候;我们会希望通过一种无需运维管理的方式,能自动将 Kinesis Data Stream 中的数据导入到 Kafka 的方案。这样所有的应用无需修改,便能平滑迁移到 AWS 上,同时运维人力成本上也基本不会增加。

AWS Lambda 是一项无服务器事件驱动型计算服务。利用 AWS Lambda 的无服务器特性,我们可以做到无论需要对接多少个 Kinesis Data Stream 都可以支持;每个 Kinesis Data Stream 需要接收多大的数据流量,我们都能自动扩展;而底层资源无需运维管理。

解决方案

架构图

架构说明

本方案完全采用 Serverless 架构,主要通过 Lambda 实现 Kinesis 和 MSK 数据相互传输到目的。当 Lambda 执行失败的时候,会触发 SNS(可替换为SQS)以发送通知信息。

一键部署代码

参考 Github 开源代码 

部署方式

先安装 SAM-cli,具体步骤参考:安装 AWS SAMLinux 上的 CLI – AWS Serverless Application Model

然后下载代码修改参数后部署

git clone https://github.com/yourlin/Kinesis2MSK
cd Kinesis2MSK
sam build
sam deploy --guided

关键参数

主要配置文件 template.yaml,MSK 配置在 kinesis_to_msk/config.py

环境配置说明

全局配置

Globals:
  Function:
    Timeout: 5
    MemorySize: 128

函数超时时间默认为:5秒。

函数内存:128MB。如果加大每次批处理的数据量可适当提高内存量。

VPC 配置

VpcConfig:
        SecurityGroupIds:
          - sg-087f1d9e26d9140ad # 该安全组,是指定的 VPC 内提前创建好的
        SubnetIds:
          - subnet-0eafba9fee295d045 # 提前创建好的子网 1
          - subnet-093cb4ccabff0d3d5 # 提前创建好的子网 2
          - subnet-08c55a13b324420b8 # 提前创建好的子网 3

注意:除非 MSK 配置为公网可访问,否则必须将 Lambda 配置在和 MSK 同一个 VPC 或者跟 MSK 所在 VPC 已经打通的其他 VPC 内,否则会 lambda 无法正常连接 MSK,引起连接超时。

安全组和子网推荐使用预定义好的,子网至少选择 2 个,保证其高可用。

Kinesis 源配置

Events:
        Kinesis2MSK:
          Type: Kinesis # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
          Properties:
            Stream: arn:aws:kinesis:us-east-1:784675006790:stream/poc-kinesis # Create kinesis stream before run it
            StartingPosition: TRIM_HORIZON
            BatchSize: 10
            MaximumBatchingWindowInSeconds: 10
            Enabled: true
            ParallelizationFactor: 8
            MaximumRetryAttempts: 100
            BisectBatchOnFunctionError: true
            MaximumRecordAgeInSeconds: 604800
            DestinationConfig:
              OnFailure:
                Type: SNS
                Destination: arn:aws:sns:us-east-1:784675006790:email # Create SNS Topic before run it
            TumblingWindowInSeconds: 0
            FunctionResponseTypes:
              - ReportBatchItemFailures

详细参数说明参考文档:将 AWS Lambda 与 Amazon Kinesis 结合使用

其中 DestinationConfigDestination,可以设置成 SQS/SNS。示例中采用 SNS。

Stream 设置为目标 Kinesis 的 ARN

网络

如果 Lambda 和 MSK 不在同一个 VPC 内(包括跨 region 的情况),2 个 VPC 之间需要做 Peering,Lambda 才能访问 MSK。VPC Peering 创建方法参考:创建 VPC 对等连接

安全性

安全组设置

同 region 访问:

  • MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的安全组
  • Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的安全组

跨 region 访问:

  • MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的所在 VPC 的 subnet CDIR。如果 Lambda 部署在多个 subnet 中,需要允许多个 CIDR 访问。
  • Lambda 设置出战规则为,允许 9092-9093 端口访问,目标为 MSK 的所在 VPC 的 subnet CDIR。

安全组配置方法参考:使用安全组控制到资源的流量

MSK 认证设置

推荐安全策略:MSK 集群使用 IAM Role 进行认证,然后对 Lambda 附加相关的 Policy。参考 Amazon MSK 如何与 IAM 协同工作

其他安全策略:如果 MSK 使用其他认证方式,除了用户名密码/pem 证书外,访问控制也受到安全组规则设置限制。

常见问题

问题一

KafkaTimeoutError: Failed to update metadata after 60.0 secs

原因分析

目标 Topic 不存在

可能的解决方案:

  1. 修改 MSK 集群的配置,设置 create.topics.enable=true
  2. 手动创建目标 Kafka 的 Topic

问题二

Lambda 执行超时

原因分析

Kafka 访问失败,失败原因可能是因为网络无法连通;也可能是问题一引起的超时。

可能的解决方案

  1. 检查 Lambda 和 MSK 所在的 VPC,是否为同一 VPC 或者 2 个 VPC 之间是否已经 Peering。VPC Peering 创建方法参考:创建 VPC 对等连接
  2. 检查 Lambda 使用的安全组,确保在出站规则中,允许 9092-9093 端口,目标为 MSK 使用的安全组。
  3. 检查 MSK 使用的安全组,确保在入站规则中,允许 9092-9093 端口,源包含 Lambda 使用的安全组。
  4. 如果 MSK 使用用户名密码方式访问,请确保用户名密码正确。

其他优化

在数据量足够大的情况下,需要调节 BatchSize,加大 ParallelizationFactor。在调整参数后需要测试实际数据传输的时间,并且调整 Lambda 执行的超时时间。

本篇作者

林业

AWS 解决方案架构师,负责基于 AWS 的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IoT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。

Dora Gui

AWS 技术客户经理,主要支持游戏,互联网行业客户的架构优化、成本管理、技术咨询等工作,并专注在 IAAS,大数据和容器等方向的技术选型,方案落地和实践。在加入 AWS 之前,曾就职于 EMC 和微软,腾讯等科技公司,拥有近 10 年虚拟化与公有云领域的架构优化和技术支持经验。