亚马逊AWS官方博客
使用 Kafka Connect 简化数据采集管道
1 背景介绍
在当今的商业和技术领域中,数据始终是不可或缺的组成部分。大数据正在全面重构商业生产、流通、分配、消费等领域。由知名咨询公司IDC定义的大数据四个特征却受到业界的广泛接受,也就是4V特征——数据量大(Volume)、数据种类多(Variety)、数据价值密度低(Value) 以及数据产生和处理速度快(Velocity)。由于不同服务、互联网技术框架、数据库及数据生产软硬件的繁荣发展,我们面临的处理各种异构数据的困难与日俱增。
对于刚刚起步的商业公司,面对繁杂庞大的数据处理体系,往往望而却步,企业专职数据开发人员缺少,对技术选型摇摆不定,担心生产面临问题时无法及时处理等等一系列问题。其中,数据采集在数据处理体系工作占有最大比重,将数据以高质量高效率进行收集,往往是投入工作最大的部分。当然,有需求的地方就有不断的创新。我们看到围绕Hadoop生态的Apache Flume,Elasticsearch为早起发起者的logstash,新起的开源项目Apache Seatunnel,AWS提供的AWS Glue,Amazon Kinesis Data Firehose等都在解决数据采集的问题。
Apache Kafka是一个分布式流处理平台,在数据处理中,Kafka用于构建实时流数据管道和实时流应用程序。数据管道在不同系统之间可靠处理和移动数据,而流应用程序是消耗数据流的应用程序。Kafka Connect是一种在Apache Kafka和其他数据系统之间实现可靠地流式传输数据的工具。快速定义将大型数据集进出Kafka的连接器变得简单。Kafka Connect可以摄入整个数据库或从所有应用程序服务器中收集指标,使其成为Kafka主题数据,从而使数据可用于低延迟的流处理。本文将介绍如何使用Kafka Connect简化数据采集管道架构。
2 使用Kafka Connect的数据采集管道架构
Kafka Connect是开源组件,我们可以在开源Kafka发行版本及托管的AWS MSK Connect使用它,在全球开发人员的共同参与下,目前可使用的Kafka Connector有几百种,在典型ETL场景中,我们可以使用如文件系统采集Filepulse Connector,数据库cdc日志采集Debezium Connector,支持Amazon Cloudwatch、Amazon SQS的Connector方便将数据写入Kafka集群Topic。同样的,也可以直接使用对应的Connector将Kafka中的数据消费写入到S3存储、Redshift数仓、Amazon Kinesis及使用开源的流计算引擎Apache Flink,数据湖存储Apache Hudi等等。典型的数据采集架构如图示例。
![]() |
3 服务器日志数据采集使用指南
3.1 准备MSK集群
使用AWS Management Console创建 Amazon MSK集群,本文测试选用2.8.2.tiered版本预置集群,无认证无加密(生产中可选需要的版本及配置),位于两个可用区子网的端点。
![]() |
3.2 在EC2安装启动Kafka Connect分布式进程
3.2.1 下载Kafka客户端
3.2.2 编辑Connect配置文件并启动Connect进程
配置文件路径kafka_2.12-2.8.2/config/connect-distributed.properties
3.3 编辑日志监听Task配置文件
更多配置信息可以参考https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/configuration/
3.4 启动日志监听任务观察数据写入
测试文件数据是否可以正常写入Kafka Topic
3.5 在AWS MSK创建Connect插件
下载S3 Sink Connector,访问网站 https://www.confluent.io/hub/confluentinc/kafka-connect-s3解压安装包,将lib包压缩为zip文件,并上传S3桶。
进入Console创建自定义插件
![]() |
填写Connector S3 URL ,创建插件。
3.6 在AWS MSK创建连接器任务
进入创建好的自定义插件,点击创建连接器
![]() |
选择插件
![]() |
选择MSK集群
![]() |
填写连接器配置配置连接器容量
![]() |
更多配置信息,参考https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html
3.7 观察数据写入S3存储
![]() |
3.8 创建Athena关联表进行数据查询
创建表,因为数据写入包含了带有转义字符的json数据,我们直接简单创建csv格式表,后续使用SQL进行格式转换
加载分区数据
数据查询测试
![]() |
4 总结
本文介绍了使用Kafka Connect/MSK Connect通过简单配置构建数据流管道的架构设计,并对典型场景服务器日志采集入湖S3进行示例讲解。使用Kafka Connect无需编写程序代码即可轻松完成数据采集工作,并可以扩充多种数据源,支持数据库change log,文件流式读取,对接SQS、Cloudwatch等消息队列及日志系统并兼容开源及云服务。
参考链接
Kafka Connect官方文档 https://docs.confluent.io/platform/current/connect/index.html
Kafka Connect Hub https://www.confluent.io/hub/#
AWS MSK Connect https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect.html
Debezium特性深入介绍 https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/
Amazon MSK Connect简介 https://aws.amazon.com/cn/blogs/china/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/