亚马逊AWS官方博客
使用 Amazon Kinesis 快速构建流式数据分析架构
简介
Amazon Kinesis 可让您轻松收集、处理和分析实时流数据,以便您及时获得见解并对新信息快速做出响应。Amazon Kinesis 提供多种核心功能,可以经济高效地处理任意规模的流数据,同时具有很高的灵活性,让您可以选择最符合应用程序需求的工具。借助 Amazon Kinesis,您可以获取视频、音频、应用程序日志和网站点击流等实时数据,也可以获取用于机器学习、分析和其他应用程序的 IoT 遥测数据。借助 Amazon Kinesis,您可以即刻对收到的数据进行处理和分析并做出响应,无需等到收集完全部数据后才开始进行处理。
实际上,Amazon Kinesis包含以下四个服务:
- Amazon Kinesis Video Streams – 利用 Amazon Kinesis Video Streams,您可以轻松而安全地将视频从互联设备流式传输到 AWS,用于分析、机器学习 (ML) 和其他处理
- Amazon Kinesis Data Streams – Amazon Kinesis Data Streams 是一种可扩展且持久的实时数据流服务,可以从成千上万个来源中以每秒数 GB 的速度持续捕获数据。
- Amazon Kinesis Data Firehose – Amazon Kinesis Data Firehose 是将流数据可靠地加载到数据湖、数据存储和分析服务中的最简单方式。该服务可以捕获和转换流数据并将其传输给 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、通用 HTTP 终端节点和服务提供商(如 Datadog、New Relic、MongoDB 和 Splunk)
- Amazon Kinesis Data Analytics – Amazon Kinesis Data Analytics 是通过 SQL 或 Apache Flink 实时处理数据流的最简单方法,您无需了解新的编程语言或处理框架。
本篇文章将完整展示如何使用Kinesis构建流式数据分析架构,如下图所示:
提前准备
进入Demo之前,先提前创建好名字为kinesis-demo-us2的S3存储桶,创建过程可以参看:
https://docs.aws.amazon.com/quickstarts/latest/s3backup/step-1-create-bucket.html
然后,再创建Redshift集群,具体步骤可以参看:
https://docs.aws.amazon.com/zh_cn/redshift/latest/gsg/rs-gsg-launch-sample-cluster.html
注意: 在Redshift的安全组里,需要加上对Firehose放行的规则,示例为AWS美西2区域的IP段
其他区域的IP可以参看此链接:
https://docs.aws.amazon.com/zh_cn/firehose/latest/dev/controlling-access.html
1. 创建Kinesis Data Stream
输入名字demo_ds,由于是测试使用,分片数量写1即可,实际生产环境的分片数量要根据数据量的吞吐决定。
2. 在EC2上安装配置Kinesis Agent:
2.1 从https://mockaroo.com/准备一份样本数据,其中包含id,first_name,last_name,email,gender,ip_address字段,数据格式为CSV
2.2 创建一台EC2,然后按照如下文档,安装Kinesis Agent https://docs.aws.amazon.com/zh_cn/firehose/latest/dev/writing-with-agents.html#download-install
修改配置文件如下,在此配置文件里默认数据是CSV格式,配置里把CSV转换为JSON格式,详细配置请参考链接https://docs.aws.amazon.com/zh_cn/firehose/latest/dev/writing-with-agents.html#agent-config-settings
2.3 配置后启动Agent
[ec2-user@ip-172-31-52-232 ~]$ sudo /etc/init.d/aws-kinesis-agent start
2.4 通过如下脚本持续生成数据
[ec2-user@ip-172-31-52-232 ~]$ while true;do sleep 5;cat sample.csv >> /tmp/app1.csv;done
2.5 通过查看日志,可以看出数据被解析后,发送成功
[ec2-user@ip-172-31-52-232 ~]$ tail /var/log/aws-kinesis-agent/aws-kinesis-agent.log
2020-11-05 06:34:03.557+0000 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 5750745 records parsed (355242075 bytes), and 5750575 records sent successfully to destinations. Uptime: 236670045ms
3. 使用Kinesis Data Analytics分析Data Stream里的数据:
3.1 创建Kinesis Data Analytics
3.2 连接流数据,即之前创建的demo_ds
3.3 创建新的IAM权限会自动创建一个IAM角色,点击发现架构,系统会自动识别数据结构,并进入SQL Editor页面
3.4 编写SQL语句对流数据进行分析,主要分4个语句
创建应用流DEMO_STREAM
创建泵从现有的数据流持续插入到应用流DEMO_STREAM里
创建统计流,统计每分钟的gender的分布统计
创建统计泵,从应用流里把数据汇总统计,并持续插入到统计流里
详细解释可以参看
https://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/streams-pumps.html
3.5 将结果导出,点击连接新目标,这里选择Firehose作为目标,最终把数据传送到Redshift上
4. 使用Kinesis Firehose把Kinesis Data Analytics的结果发送到Redshift上:
4.1 点击上面的连接新目标后,可以选择Kinesis Firehose,创建一个新的Firehose交付流,同时输出格式选择CSV
4.2 选择Direct PUT或其他源
4.3 这次实验里我们不需要再处理数据,也不需要转换格式,禁用这两项后点击下一步
4.4 选择目标Amazon Redshift
4.5 选择已经创建好的Redshift集群
4.6 选择中间的S3目标,选择之前创建的kinesis-demo-us2的S3桶,并在COPY选项里加上delimiter ‘,’ (因为4.1步选择的是CSV的输出)
4.7 修改默认缓冲区时间和大小,加速数据导出,下面选择创建新的IAM角色,系统会自动为Firehose创建相应的权限
5. 在Redshift上创建表以接受数据:
5.1 在Redshift数据库中执行以下SQL语句
5.2 再通过SQL语句select * from demo_table limit 10进行查询;可以看到每分钟的数据在不断的流入
总结
从上述的实验可以看出来,使用Kinesis Data Stream和Kinesis Data Analytics构建流式数据架构非常简单,用Kinesis Firehose和Redshift 可以把流式处理后的结果及时的发送到数仓,为业务提供见解。同时Kinesis是一项完全托管的服务,不需要额外的维护工作量,大大减少了运维的成本,数据工程师只需要专注通过SQL语句实现业务的需求即可。