亚马逊AWS官方博客

在北京/宁夏区部署 MSK 集群触发跨账号 Lambda 函数的架构

针对企业客户的流式数据处理场景,亚马逊云科技推荐采用 Amazon MSK + Lambda 的无服务器事件驱动处理架构,可以为客户带来诸多的云原生优势受益:

  1. 保持原生 Apache Kafka 生态的支持,可以通过 Kafka API 将实时数据流导入数据湖仓。
  2. 基于消息驱动的 Lambda 运行时,可以提供极致的资源弹性和数据流处理时效性。同时兼顾高并发波峰的响应处理能力和资源开销成本。
  3. 提供广泛的后处理扩展能力,包括多种类型的数据存储服务,全流程安全控制、数据加密存储传输,监控告警日志,以及后续的机器学习和大数据分析应用等。

但目前的企业级客户大多都采用多账号的组织架构,与 MSK 集群相关联的消息生产者和消费者往往分布在不同的账号内。因此,跨账号的 MSK 消费成为了比较普遍的架构需求。在亚马逊云科技的海外区域,客户可以采用 Amazon MSK 和 Lambda ESM for MSK 事件源映射功能,构建跨账号的消息触发处理流程。请参考 Triggering AWS Lambda function from a cross-account Amazon Managed Streaming for Apache Kafka 部署。

在中国大陆的北京/宁夏区域,Lambda ESM for MSK 功能暂未发布,用户在按照海外部署的方式配置基于 MSK 的触发器时,在保存时则会收到“Feature not support in the current region”的信息提示。因此需要采用 Apache Kafka URL 作为跨账号 Lambda trigger 源的方式实现 Amazon MSK + 跨账号 Lambda 的数据流处理架构。本博客将详细介绍在北京/宁夏区,配置 Kafka 原生的 SASL 认证和基于 Apache Kafka URL 事件源,构建 MSK 触发跨账号 Lambda 函数的消息处理架构,实现组织内数据流的自动化处理分析能力。

一、架构描述

在北京/宁夏区域,构建跨账号的 MSK 消息自动化处理,需要利用 Lambda trigger 触发器调用 Apache Kafka URL 的方式下实现跨账号的 Lambda 函数触发。所关联的限制是跨账号的 MSK 集群访问需要采用 SASL/SCRAM 认证方式。同时,还需要为 MSK 集群开启 Kafka ACL 功能。实现的验证环境架构如下图所示。

如图所示,完成本架构的验证部署,需要采用 2 个亚马逊云科技账号,账号 1 为生产者账号,完成消息的采集和 MSK 集群托管;账号 2 为消费者账号,完成消息的自动化订阅和消费处理。架构实现的主要过程步骤包括:

  1. 在账号 1 生产者账号部署 Amazon MSK 集群,MSK 消息生产者应用。
  2. 在账号 2 消费者账号部署 MSK 消费者 Lambda 函数,函数的执行过程是从 MSK 订阅消息,并将消息数据写入到目标 S3 桶,实现流数据的持久化存储。
  3. 在账号 2 的 Amazon MSK 服务创建托管 VPC 连接,通过 Amazon PrivateLink 网络服务构建跨账号的托管私有网络通道。
  4. 在账号 2 的 Lambda 函数中将 Apache Kafka URL 作为 Lambda 触发器源,创建 Lambda 触发器,实现消息数据的跨账号消费处理。

注意
为了帮助快速验证整个流程的有效性,本博客还为实验用户提供了 MSK 生产者和消费者的模拟应用,以及样本数据。用户可以通过 AWS CloudFormation 的 yaml 模版文件,快速部署实验环境。源代码已经全部托管在 GitHub

二、验证部署

1、部署生产者/消费者应用

用户可以通过所提供的 AWS CloudFormation 模版来部署生产者/消费者应用。其中生产者应用为一个 Kafka Client 应用程序,通过配置后可以向 MSK 的终端节点发送消息流。消费者应用为一个 Python Lambda 函数,同样可以通过 AWS Cloud Formation 模版部署,负责将订阅的消息写入到目标 S3 存储桶。源代码可查阅 GitHub

在北京/宁夏区域部署上述应用,首先需要对 CloudFormation yaml 文件进行代码微调,以适应在不同区域的部署规范。

  1. 将 producer-account.yaml 模版文件下载到本地,修改的如下段落的代码,将“arn:aws”替换为“arn:aws-cn”。
    EC2Role: 
        Type: AWS::IAM::Role
        Properties:
          AssumeRolePolicyDocument: 
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Principal:
                  Service: ec2.amazonaws.com
                Action: "sts:AssumeRole"
          Path: "/"
          Policies:
            - PolicyName: msk-access
              PolicyDocument:
                Version: 2012-10-17
                Statement:
                  - Effect: Allow
                    Action: 
                      - kafka-cluster:Connect
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${AWS::AccountId}:cluster/*/*'
                  - Effect: Allow
                    Action: 
                      - kafka-cluster:DescribeTopic
                      - kafka-cluster:CreateTopic
                      - kafka-cluster:WriteData
                      - kafka-cluster:ReadData
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${AWS::AccountId}:topic/*/*'
                  - Effect: Allow
                    Action: 
                      - kafka-cluster:AlterGroup
                      - kafka-cluster:DescribeGroup
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${AWS::AccountId}:group/*/*'
          ManagedPolicyArns:
            - arn:aws-cn:iam::aws:policy/AmazonSSMManagedInstanceCore
    
  2. 下载并修改 consumer-account.yaml 模版文件的如下代码段,将“arn:aws”替换为“arn:aws-cn”。
                  - Effect: Allow
                    Action:
                      - logs:CreateLogGroup
                      - logs:CreateLogStream
                      - logs:PutLogEvents
                    Resource:
                      - "arn:aws-cn:logs:*:*:*"
     - PolicyName: "lambda-msk-esm"
              PolicyDocument:
                Version: '2012-10-17'
                Statement:
                  - Effect: Allow
                    Action:
                      - logs:CreateLogGroup
                      - logs:CreateLogStream
                      - logs:PutLogEvents
                      - ec2:CreateNetworkInterface
                      - ec2:DescribeNetworkInterfaces
                      - ec2:DescribeVpcs
                      - ec2:DeleteNetworkInterface
                      - ec2:DescribeSubnets
                      - ec2:DescribeSecurityGroups
                      - kafka:DescribeCluster
                      - kafka:DescribeClusterV2
                      - kafka:GetBootstrapBrokers
                      - kafka:ListScramSecrets
                      - secretsmanager:GetSecretValue
                      - kms:Decrypt
                      - kafka:DescribeVpcConnection
                    Resource: "*"
                  - Effect: Allow
                    Action:
                      - kafka-cluster:Connect
                      - kafka-cluster:DescribeClusterDynamicConfiguration
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${MSKAccountId}:cluster/*/*'
                  - Effect: Allow
                    Action:
                      - kafka-cluster:DescribeTopic
                      - kafka-cluster:ReadData
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${MSKAccountId}:topic/*/*'
                  - Effect: Allow
                    Action:
                      - kafka-cluster:AlterGroup
                      - kafka-cluster:DescribeGroup
                    Resource: !Sub 'arn:aws-cn:kafka:${AWS::Region}:${MSKAccountId}:group/*/*'
          ManagedPolicyArns:
            - arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
  3. 下载并修改 producer-msk-cluster-policy.yaml 模版文件的如下代码段,将“arn:aws”替换为“arn:aws-cn”。
    MSKClusterPolicy:
        Type: AWS::MSK::ClusterPolicy
        Properties:
          ClusterArn: !Ref MSKClusterArn
          Policy: 
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Principal:
                  AWS: !Sub arn:aws-cn:iam::${LambdaAccountId}:root
                Action:
                  - kafka:CreateVpcConnection
                  - kafka:GetBootstrapBrokers
                  - kafka:DescribeCluster
                  - kafka:DescribeClusterV2
                Resource: !Ref MSKClusterArn
              - Effect: Allow
                Principal:
                  AWS: !Ref LambdaRoleArn
                Action:
                  - kafka-cluster:Connect
                  - kafka-cluster:AlterCluster
                  - kafka-cluster:DescribeCluster
                Resource: !Ref MSKClusterArn
              - Effect: Allow
                Principal:
                  AWS: !Ref LambdaRoleArn
                Action:
                  - kafka-cluster:*Topic*
                  - kafka-cluster:WriteData
                  - kafka-cluster:ReadData
                Resource: !Sub arn:aws-cn:kafka:${AWS::Region}:${AWS::AccountId}:topic/*
              - Effect: Allow
                Principal:
                  AWS: !Ref LambdaRoleArn
                Action:
                  - kafka-cluster:AlterGroup
                  - kafka-cluster:DescribeGroup
                Resource: !Sub arn:aws-cn:kafka:${AWS::Region}:${AWS::AccountId}:group/*
    
  4. 在生产者账号里面,通过 AWS CloudFormation 服务的控制台界面,创建 Stack,上传 producer-account.yaml 模版文件,执行环境部署。成功部署后,可从堆栈的输出中,获取到 MSKClusterArn 和 MSKVPC 等信息,用于后续的其他部署配置。
  5. 在消费者账号的 CloudFormation 服务的控制台界面,创建 Stack 堆栈,上传 consumer-account.yaml 模版文件,部署创建消费者应用 lambda 函数和 S3 对象存储桶。“MSKAccountId”入参请填入生产者账号的 12 位数字串。部署成功后,可从 Stack 堆栈的输出中获取 LambdaRoleArn 等信息。

2、为集群配置 Kafka ACL 功能和 SASL/SCRAM 认证

AWS Cloudformation 模版文件创建的 MSK 集群默认采用了 IAM 的认证方式,在北京/宁夏区域部署需要添加更新 SASL/SCR 认证方式。同时还需要启动 Kafka ACL 功能,以提供更好的客户端访问安全控制。Kafka ACL 功能启动需提前创建授权用户的白名单。执行配置操作的 EC2 role 需具备 MSK 集群的以下权限:

“kafka:DescribeCluster“
“kafka:GetBootstrapBrokers”

可以通过编辑 EC2 role 的 policy,添加如下 Json 权限代码。

{
"Action": [
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers"
],
"Resource": "arn:aws-cn:kafka:cn-northwest-1:500363364825:group/*",
"Effect": "Allow"
}
  1. 通过 EC2 控制台,为 procducer-KafkaClient 实例创建一个内网 SSH 的 endpoint,采用与实例一致的安全组,VPC 和子网。完成创建后,通过 EC2 的 session connect 功能,可以登陆到 EC2 命令行。

    注意:如果连接过程中出现错误,请检查 endpoint 的状态是否为可用,安全组是否允许 SSH 端口和来源访问。

  2. 为 MSK 集群创建一个 ACL 白名单 root 用户。
    export ZKS=<Apache ZooKeeper connection:2181>  ##也可以通过MSK集群的客户端信息界面中获取
    ~/kafka/bin/kafka-acls.sh —authorizer-properties zookeeper.connect=$ZKS —add —allow-principal User:william —operation All —cluster *
    ~/kafka/bin/kafka-acls.sh —authorizer-properties zookeeper.connect=$ZKS —add —allow-principal User:william —operation All —group "*" —topic "*" —allow-host "*"
    
  3. 通过 MSK 集群控制台页面,修改 cluster config,启用 ACL,添加如下配置项。
    allow.everyone.if.no.acl.found=false
  4. 为 MSK 集群应用新的 Cluster config 配置,启用 Kafka ACL 功能。
  5. 通过 MSK 控制台,在集群属性页面,为 MSK 集群开启 SASL 认证。
  6. 为 MSK 集群关联 SASL/SCRAM 认证的密钥,密钥是通过 Amazon Secrets Manager 托管,命名必须以“AmazonMSK_”开头,采用 Amazon KMS 服务来管理加解密密钥,在 Amazon KMS 服务中创建一个默认对称加密的密钥即可。详细过程步骤请查阅为 Amazon MSK 集群配置 SASL/SCRAM 认证注意:需要在消费者账号内同样创建 Amazon MSK 密钥文件和 KMS 密钥,在后续的 Lambda 触发器配置时对应配置。
  7. 在 MSK 集群属性的控制台界面,为 MSK 集群关联 Amazon MSK 密钥文件。
  8. 为 MSK 集群的安全组添加 9096 端口的 inbound role。同时可以一并添加 14001,14002,14003 端口的 inbound role,为后续的托管 VPC 连接端口放开限制。
  9. 登陆生产者 EC2 实例,为 Producter Client 配置 SASL/SCRAM 认证方式,详细的操作步骤请查阅为 Amazon MSK 集群配置 SASL/SCRAM 认证的“通过 sign-in 凭证连接到集群”的章节内容。
    ## 创建客户端凭证文件
    sudo cat <<EOF > /home/ec2-user/users_jaas.conf
    KafkaClient {
       org.apache.kafka.common.security.scram.ScramLoginModule required
       username="william"
       password="Zangying1!az";
    };
    EOF
    ## 配置认证文件路径
    export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ec2-user/users_jaas.conf
    cp /usr/lib/jvm/java-17-amazon-corretto.x86_64/lib/security/cacerts /tmp/kafka.client.truststore.jks
    sudo cp /tmp/kafka.client.truststore.jks /home/ec2-user/kafka/libs/
    ## 配置用户属性文件和认证方式
    sudo cat <<EOF > /home/ec2-user/kafka/bin/client_sasl.properties
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    ssl.truststore.location=/home/ec2-user/kafka/libs/kafka.client.truststore.jks
    EOF
  10. 通过 Producter Client 获取 SASL 验证 9096 端点信息,验证 SASL/SCRAM 连接成功。
    aws kafka get-bootstrap-brokers —cluster-arn ClusterArn 
    export BS=<BootstrapBrokerStringSaslScram>     ## 替换为集群的端点字符串
    ~/kafka/bin/kafka_create_topic.sh             ## 创建一个topic。默认为"customer"
    ~/kafka/bin/kafka_produce_events.sh —broker-list BootstrapBrokerStringSaslScram —topic ExampleTopicName —producer.config client_sasl.properties
    

3、配置 MSK 托管 VPC 连接

通过在消费者账号创建 MSK 托管 VPC 连接,通过 Amazon PrivateLink 服务构建跨账号的私有网络通道,让消费者账号可以访问到生产者账号的 MSK 集群端点信息。

  1. 开通 MSK 集群的 PrivateLink 功能,并配置认证类型为 SASL/SCRAM 方式。
  2. 在生产者账号的 Cloud formation 服务控制台部署 producer-msk-cluster-policy.yaml 模版,开通 MSK 集群的跨账号 role 访问权限。其中参数配置来自于前述的 Cloudformation 堆栈。
    MSKClusterArn:   ## 来自于Producer-account.yaml 的 CloudFormation 输出中
    LambdaRoleArn:   ## 来自于Consumer-account.yaml 的 CloudFormation 输出中
    LambdaAccountId: ## 消费者账号的12位数字串
    
  3. 登陆消费者账号的 Amazon MSK 控制台,按照导航步骤创建 MSK 托管 VPC 连接。

    注意:
    1. 需要填入的是 MSK 集群 arn 字符串,认证方式选择 SASL/SCRAM 等。
    2. 生产者账号与消费者账号的 VPC 子网数量和可用区需要一一对应,如果存在消费者账号的子网找不到,可以为消费者账号的其他可用区添加新的子网补充对应关系。

  4. 为安全组添加 14001,14002,14003 端口的 inbound role,为后续的托管 VPC 连接端口放开限制。

4、配置 Lambda 函数的触发器

  1. 登陆消费者账号的 Lambda 服务控制台,选择自动创建的 Lambda 函数。为其添加 Trigger 触发器。
  2. 有别于海外区域部署的触发器类型选择,北京/宁夏区域部署,选择 Apache Kafka,然后依次配置认证方式,Secrets Manager 密钥等信息。
  3. 其中 Bootstrap servers 一栏每个端点 URL 为一行独立记录。完成配置后的触发器信息类似如下截图。

    配置至此,我们整个配置过程就结束。接下来可以开展 MSK + Lambda 处理的全过程测试。

5、验证测试

  1. 登陆生产者 EC2 实例,让 Producter Client 向 MSK 发送消息数据。
    aws kafka get-bootstrap-brokers --cluster-arn ClusterArn
    ~/Kafka/bin/kafka_produce_events.sh --broker-list BootstrapBrokerStringSaslScram --topic ExampleTopicName --producer.config client_sasl.properties
    
  2. 直接刷新消费者账号的 S3 目标存储桶,查看成功写入的 S3 对象文件。对比文件数据,可以发现与 Producter Client 的样本数据一致。

注意:如果 S3 桶没有对象文件生成,可以通过查看 Lambda 函数的 log 日志执行分析。

三、资源清理

本博客实验部署会带来一些云计算费用成本,包括 Amazon MSK,Lambda,EC2,EBS 等资源费用。实验结束后:

1. 请登录生产者账号,清理手工创建的 VPC endpoint。

2. 分别登录生产者账号,消费者账号的 Amazon Cloudformation 控制台,将 producer-account.yaml,consumer-account.yaml,producer-msk-cluster-policy.yaml 所创建的堆栈清理掉即可。

本篇作者

William Yee

亚马逊云科技资深解决方案架构师。主要技术方向为云基础设施,容器和安全。具备全面的云计算技术知识和丰富的企业上云实践经验。