亚马逊AWS官方博客

使用 Kafka Connect 简化数据采集管道

1 背景介绍

在当今的商业和技术领域中,数据始终是不可或缺的组成部分。大数据正在全面重构商业生产、流通、分配、消费等领域。由知名咨询公司IDC定义的大数据四个特征却受到业界的广泛接受,也就是4V特征——数据量大(Volume)、数据种类多(Variety)、数据价值密度低(Value) 以及数据产生和处理速度快(Velocity)。由于不同服务、互联网技术框架、数据库及数据生产软硬件的繁荣发展,我们面临的处理各种异构数据的困难与日俱增。

对于刚刚起步的商业公司,面对繁杂庞大的数据处理体系,往往望而却步,企业专职数据开发人员缺少,对技术选型摇摆不定,担心生产面临问题时无法及时处理等等一系列问题。其中,数据采集在数据处理体系工作占有最大比重,将数据以高质量高效率进行收集,往往是投入工作最大的部分。当然,有需求的地方就有不断的创新。我们看到围绕Hadoop生态的Apache Flume,Elasticsearch为早起发起者的logstash,新起的开源项目Apache Seatunnel,AWS提供的AWS Glue,Amazon Kinesis Data Firehose等都在解决数据采集的问题。

Apache Kafka是一个分布式流处理平台,在数据处理中,Kafka用于构建实时流数据管道和实时流应用程序。数据管道在不同系统之间可靠处理和移动数据,而流应用程序是消耗数据流的应用程序。Kafka Connect是一种在Apache Kafka和其他数据系统之间实现可靠地流式传输数据的工具。快速定义将大型数据集进出Kafka的连接器变得简单。Kafka Connect可以摄入整个数据库或从所有应用程序服务器中收集指标,使其成为Kafka主题数据,从而使数据可用于低延迟的流处理。本文将介绍如何使用Kafka Connect简化数据采集管道架构。

2 使用Kafka Connect的数据采集管道架构

Kafka Connect是开源组件,我们可以在开源Kafka发行版本及托管的AWS MSK Connect使用它,在全球开发人员的共同参与下,目前可使用的Kafka Connector有几百种,在典型ETL场景中,我们可以使用如文件系统采集Filepulse Connector,数据库cdc日志采集Debezium Connector,支持Amazon Cloudwatch、Amazon SQS的Connector方便将数据写入Kafka集群Topic。同样的,也可以直接使用对应的Connector将Kafka中的数据消费写入到S3存储、Redshift数仓、Amazon Kinesis及使用开源的流计算引擎Apache Flink,数据湖存储Apache Hudi等等。典型的数据采集架构如图示例。

3 服务器日志数据采集使用指南

3.1 准备MSK集群

使用AWS Management Console创建 Amazon MSK集群,本文测试选用2.8.2.tiered版本预置集群,无认证无加密(生产中可选需要的版本及配置),位于两个可用区子网的端点。

3.2 在EC2安装启动Kafka Connect分布式进程

3.2.1 下载Kafka客户端


# 使用2.8.2 版本为例
$ wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.12-2.8.2.tgz
$ tar -xvf kafka_2.12-2.8.2.tgz
$ cd kafka_2.12-2.8.2.

3.2.2 编辑Connect配置文件并启动Connect进程

配置文件路径kafka_2.12-2.8.2/config/connect-distributed.properties


### 修改以下配置
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=b-1.testprovision.xxxxx.us-east-1.amazonaws.com:9092,b-2.testprovision.xxx.us-east-1.amazonaws.com:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
config.storage.topic=connect-configs
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
#Specify hostname as 0.0.0.0 to bind to all interfaces.
listeners=HTTP://:8083
启动connect 进程
$ sh kafka_2.12-2.8.2/bin/connect-distributed.sh -daemon config/connect-distributed.properties

3.3 编辑日志监听Task配置文件


"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.directory.path":"/opt/data/",   # 监听文件路径
"fs.scan.interval.ms":"10000",   
"read.max.wait.ms":"3600000",               # 文件更新最长等待时间
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.json$",    # 监听文件名正则表达式
"task.reader.class":"io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"offset.strategy":"name",                    # 文件读取记录唯一标识符
"fs.listing.class":"io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing", 
"tasks.reader.class":"io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"topic":"test",
"internal.kafka.reporter.bootstrap.servers":"b-1.testprovision.xxx.us-east-1.amazonaws.com:9092,b-2.testprovision.xxxx.kafka.us-east-1.amazonaws.com:9092",
"internal.kafka.reporter.topic":"connect-status",
"fs.cleanup.policy.class":"io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"filters":"ParseJSON, ExcludeFieldMessage",
"filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"filters.ParseJSON.source":"message",
"filters.ParseJSON.merge":"true",
"filters.ParseJSON.explode.array":"true",
"filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
"tasks.file.status.storage.bootstrap.servers":" b-1.testprovision.xxx.us-east-1.amazonaws.com:9092,b-2.testprovision.xxxx.kafka.us-east-1.amazonaws.com:9092",
"filters.ExcludeFieldMessage.fields":"message",
"tasks.max":1

更多配置信息可以参考https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/configuration/

3.4 启动日志监听任务观察数据写入


# 向connect 服务发送rest api请求创建监听任务
$ curl -sX PUT http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j/config --header "Content-Type: application/json"  -d “{\"connector.class\":\"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector\",\"fs.listing.directory.path\":\"/opt/data/\",\"fs.scan.interval.ms\":\"10000\",\"fs.scan.filters\":\"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter\",\"file.filter.regex.pattern\":\".*\\\\.json$\",\"task.reader.class\":\"io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader\",\"offset.strategy\":\"name\",\"fs.listing.class\":\"io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing\",\"tasks.reader.class\":\"io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader\",\"topic\":\"test\",\"internal.kafka.reporter.bootstrap.servers\":\"b-1.testprovision.xxx.us-east-1.amazonaws.com:9092,b-2.testprovision.xxx.kafka.us-east-1.amazonaws.com:9092\",\"internal.kafka.reporter.topic\":\"connect-status\",\"fs.cleanup.policy.class\":\"io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy\",\"filters\":\"ParseJSON, ExcludeFieldMessage\",\"filters.ParseJSON.type\":\"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter\",\"filters.ParseJSON.source\":\"message\",\"filters.ParseJSON.merge\":\"true\",\"filters.ParseJSON.explode.array\":\"true\",\"filters.ExcludeFieldMessage.type\":\"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter\",\"tasks.file.status.storage.bootstrap.servers\":\"b-1.testpronxxxx.kafka.us-east-1.amazonaws.com:9092,b-2.testxxxxkafka.us-east-1.amazonaws.com:9092\",\"filters.ExcludeFieldMessage.fields\":\"message\",\"tasks.max\":1}”

测试文件数据是否可以正常写入Kafka Topic


#追加文件数据
$ echo "{\"id\":26,\"first_name\":\"Rozanna\",\"last_name\":\"Goch\",\"email\":\"rgochp@buzzfeed.com\",\"gender\":\"Female\",\"ip_address\":\"25.101.49.192\",\"last_login\":\"2018-05-06T21:38:32Z\",\"account_balance\":21289.06,\"country\":\"RU\",\"favorite_color\":\"#65e577\"}" >> /opt/data/test.json
#消费 topic 数据
$ sh bin/kafka-console-consumer.sh --bootstrap-server  b-1.xxx.kafka.us-east-1.amazonaws.com:9092,b-2.xxxx.c5.kafka.us-east-1.amazonaws.com:9092 --topic test-json --from-beginning 
{"schema":{"type":"struct","fields":[{"type":"double","optional":true,"field":"account_balance"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"favorite_color"},{"type":"string","optional":true,"field":"first_name"},{"type":"string","optional":true,"field":"gender"},{"type":"int64","optional":true,"field":"id"},{"type":"string","optional":true,"field":"ip_address"},{"type":"string","optional":true,"field":"last_login"},{"type":"string","optional":true,"field":"last_name"}],"optional":false},"payload":{"account_balance":21289.06,"country":"RU","email":"rgochp@buzzfeed.com","favorite_color":"#65e577","first_name":"Rozanna","gender":"Female","id":26,"ip_address":"25.101.49.192","last_login":"2018-05-06T21:38:32Z","last_name":"Goch"}}

3.5 在AWS MSK创建Connect插件

下载S3 Sink Connector,访问网站 https://www.confluent.io/hub/confluentinc/kafka-connect-s3解压安装包,将lib包压缩为zip文件,并上传S3桶。

进入Console创建自定义插件

填写Connector S3 URL ,创建插件。

3.6 在AWS MSK创建连接器任务

进入创建好的自定义插件,点击创建连接器

选择插件

选择MSK集群

填写连接器配置配置连接器容量


connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=3600000
flush.size=10
schema.compatibility=NONE
tasks.max=1
topics=test-json
s3.part.size=5242880
timezone=Asia/Chongqing
rotate.interval.ms=5000
aws.access.key.id=xxxx
aws.secret.access.key=xxxx
locale=ii_CN
partition.field.name=country
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=kafka-connect-data-2023
path.format='year'=YYYY/'month'=MM/'day'=dd   
s3.compression.type=gzip

更多配置信息,参考https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html

3.7 观察数据写入S3存储

3.8 创建Athena关联表进行数据查询

创建表,因为数据写入包含了带有转义字符的json数据,我们直接简单创建csv格式表,后续使用SQL进行格式转换


CREATE EXTERNAL TABLE IF NOT EXISTS `test`.`test-json` (`text` string)
PARTITIONED BY (`year` int, `month` int, `day` int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = '\n')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://kafka--xxx-2023/topics/test-json/'
TBLPROPERTIES (
  'classification' = 'csv',
  'write.compression' = 'GZIP'
);

加载分区数据


MSCK REPAIR TABLE `test`.`test-json`

数据查询测试


select json_extract_scalar(json_parse(regexp_replace(regexp_replace(regexp_replace(text,'\\'),'^"' ),'"$') ),'$.payload.country') from "test"."test-json"

4 总结

本文介绍了使用Kafka Connect/MSK Connect通过简单配置构建数据流管道的架构设计,并对典型场景服务器日志采集入湖S3进行示例讲解。使用Kafka Connect无需编写程序代码即可轻松完成数据采集工作,并可以扩充多种数据源,支持数据库change log,文件流式读取,对接SQS、Cloudwatch等消息队列及日志系统并兼容开源及云服务。

参考链接

Kafka Connect官方文档 https://docs.confluent.io/platform/current/connect/index.html

Kafka Connect Hub https://www.confluent.io/hub/#

AWS MSK Connect https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect.html

Debezium特性深入介绍 https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/

Amazon MSK Connect简介 https://aws.amazon.com/cn/blogs/china/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/

本篇作者

张鑫

AWS解决方案架构师,负责基于AWS云平台的解决方案咨询和设计,在系统架构、数仓和实时离线计算领域有丰富的研发和架构实践经验。