一、 业务背景
 
       当客户端或 IoT 设备的应用程序需要传递日志等消息时,传统方式一般要通过 HTTP 的方式请求到服务端应用程序进行接受,处理,转发或存储,再进行分析处理(架构如下图)。此架构方案,对实时性、扩展性以及安全性上都有很大的挑战。
 
        
       原始架构图
 
       二、改造后架构图
 
        
       改造后架构图
 
       三、架构优势
 
        
        - 客户端及 IoT 设备的应用程序,无需改造,可保持使用 HTTP 协议的方式。
  
        - 不支持 REST API 调用,将 Kafka API 转换为 REST API。
  
        - 从原始的离线处理方式处理转换为实时分析处理。
  
        - 使用 Kafka REST Proxy 镜像部署于 Amazon ECS Fargate 的方式,使用 ELB 提供对外的 HTTP REST 服务解决上述问题,并且可以根据流量负载的情况实现自动伸缩,承载终端设备的海量请求。
  
        - 通过使用托管服务提升安全性。
  
       
 
       四、环境准备
 
       1. 在 cn-northwest-1 控制台创建对应的 VPC(kafka-ecs-proxy)。
 
        
       2.在 VPC(kafka-ecs-proxy)中准备一台 EC2 实例,并使用 aws configure 配置 Access Key 和 Secret Access Key。
 
        
       五、MSK Cluster
 
       1.在 VPC(kafka-ecs-proxy)中按需创建 MSK 集群。
 
        
        
       2.创建完成后点击查看客户端信息获取集群 bootstrap-server。
 
        
       3.配置 MSK 集群安全组以使 EC2 可访问。
 
       4.在 EC2 中,安装并使用命令行工具创建对应 Topic。
 
        
        $ sudo yum install java
$ wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
$ tar -zxf kafka_2.12-2.7.2.tgz   
$ kafka_2.12-2.7.2/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}  
 
         
        
       六、ECR
 
       1.在 ECR 控制台创建存储库。
 
        
       2.在 EC2 中安装 docker,拉取 confluentinc/cp-kafka-rest 镜像并推送至 ECR(官方镜像地址:https://hub.docker.com/r/confluentinc/cp-kafka-rest)。
 
        
        $ sudo yum update 
$ sudo yum install docker   
$ sudo systemctl start docker  
$ sudo docker pull confluentinc/cp-kafka-rest 
$ sudo docker tag confluentinc/cp-kafka-rest:latest <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest
$ sudo docker push <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest
 
         
        
       七、ALB
 
       1.在控制台中创建 ALB
 
        
        
        
        
       2.创建目标组
 
        
        
       3.返回 ALB 创建页面刷新目标组后完成创建
 
       4.ALB 安全组需添加入站 8082 端口
 
       八、ECS
 
       1.在 ECS 控制台中创建 ECS 集群
 
        
        
       2.创建任务定义
 
        
        
       3.添加容器后,完成创建任务定义
 
        
        
        
       4.创建 ECS 服务
 
       a)创建基于 Fargate 的服务。
 
        
       b)选择私有子网,并配置安全组。
 
        
        
       c)配置步骤 5 中创建的 ALB 及注册目标组。
 
        
       d)配置 AutoScaling,并选择基于哪项指标扩展(可配置多个策略)。
 
        
       e)完成创建。
 
        
       f)配置 MSK 安全组,添加 ECS Service 的安全组 Searvi-4828 入站规则。
 
        
       九、测试
 
       1.在公网环境下进行公网测试访问 ALB 地址:
 
        
        # Get a list of topics
$ curl http://<YOUR_ALB_URL>:8082/topics
 
         
        
        
        # Produce a message with JSON data
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
  	--data '{"records":[{"value":{"name": "testUser"}}]}' \
      "http://<YOUR_ALB_URL>:8082/topics/http-logs"
 
         
        
       2.使用压力测试工具 Siege 进行压测,并观测 AutoScaling 情况(默认为 2,最小任务数 2,最大任务数 20)。
 
        
        #安装siege
$ sudo amazon-linux-extras install epel
$ sudo yum install siege
#并发200持60秒
$ siege -c200 -t60S --content-type "application/vnd.kafka.json.v2+json" 'http:// <YOUR_ALB_URL>:8082/topics/http-logs POST {"records":[{"value":{"name": "testUser"}}]}' 
 
         
        
       十、监控
 
       1.打开 CloudWatch 控制台,导航 Container Insights,即可对 ECS 集群、服务以及任务进行监控。
 
        
        
       总结
 
       这篇文章呈现了使用 ALB 配合 Amazon ECS Fargate为Amazon MSK 设置 REST API 的快速构建方式。 该解决方案可以帮助您从任何 IoT 设备、客户端或任意编程语言向 Amazon MSK 生成或使用消息,并且可以使用 Amazon ECS Fargate 自动伸缩的能力灵活扩展,以满足海量实时消息的流失处理。
 
       本篇作者