亚马逊AWS官方博客

海纳百川:基于 Redshift Streaming Ingestion 实现日志实时入仓

1. 背景

在一些场景下会期望用户行为相关的事件/信息尽快进入数仓,以支撑实时大屏、运营决策等需求。Amazon Redshift 作为高性价比的云数仓,在 2022 年 2 月推出了流式摄取功能。该功能消除了在将数据摄入到 Amazon Redshift 之前在 Simple Storage Service(Amazon S3)中暂存数据的要求,使客户能够以秒级延迟将每秒数百兆的流数据存储到 Redshift。在传统应用中,用户端上报的事件/信息通常会以日志的方式保存至服务器。本文介绍如何使用 fluent bit 将日志文件推送至 Amazon Kinesis 数据流,再借助 Redshift streaming ingestion 将数据实时存储到 Redshift。其总体架构图如下:

各组件用途说明如下:

  • fluent bit:读取日志并推送至 Amazon Kinesis 数据流(KDS)
  • KDS:无服务器服务,缓存数据
  • Redshift:数仓,通过 streaming ingestion 功能流式从 KDS 摄取数据
  • Managed Apache Airflow(MWAA):可选组件,托管的 Airflow,用于调度 ETL 任务

详细配置说明见下文。

2. 配置示例

2.1 Kinesis 数据流环境准备

1)在控制台搜索 “Kinesis” ,点击搜索结果中的 Kinesis

2)在 Kinesis 控制台左侧导航栏选择“数据流”,右侧控制面板点击“创建数据流”

3)根据需要定义名称,容量模式选择“按需”(处理能力会根据数据量自动调整),其余保持默认创建数据流

4)返回控制界面,确认数据量已创建完成

2.2 Redshift 环境准备

1)控制台搜索“Redshift”,在搜索结果中点击“Amazon Redshift”

2)在 Redshift 界面左侧导航栏选择“Redshift Serverless”菜单,开始创建 Serverless 集群

3)在 Redshift 无服务器控制面板点击“创建工作组”

4)根据需要填写工作组名称,配置 RPU 容量

5)根据需要选择网络、安全组

6)选择创建新的命名空间,根据需要数据命名空间名称

7)设置管理员用户名及密码

8)创建 IAM 角色,并根据提示设置为默认角色

9)根据需要设置 Redshift 角色 S3 访问权限

10)返回 Redshift 无服务器控制面板,确认工作组创建完成

11)进入 IAM 界面,为步骤 8 创建角色添加权限访问 Kinesis 数据流,为简化配置可考虑为该角色添加 AmazonKinesisFullAccess 权限

2.3 日志摄取示例

1)启动 EC2,参考 fluent bit 官方文档安装 fluent bit

2)配置权限,允许 fluent bit 访问 Kinesis 数据流,推荐给 EC2 配置角色,该角色包含 AmazonKinesisFullAccess 策略

3)编辑 fluent-bit.conf 文件,设置 KDS 名称、需要采集的日志位置

[SERVICE]
    Flush   5   # 结合日志量微调,KDS 按需模式处理能力为 200MB/s, 20万条记录/s。注意:达到峰值处理能力需要预热时间
    log_file    /tmp/fluent-bit.log # fluent-bit 日志,开启便于调试后续可关闭

[INPUT]
    name    tail
    Parser  json
    path    /tmp/pickup*.log
    db      test.db     # check point持久化
    Mem_Buf_Limit 4096MB    # fluent bit能够只用的内存上限,结合服务器总内存配置


[OUTPUT]
    Name  kinesis_streams
    Match *
    region ap-northeast-1
    stream demo-log-ingestion # KDS 名称
    Retry_Limit False   # 不限制重试次数

4)为便于演示,可以使用如下脚本生成模拟日志生成,其格式如下。使用方式为下载该 python 文件后,运行`python3 fakelog.py > /tmp/pickup.log`

  • 日志生成脚本

https://gist.github.com/ensean/03779121383e3ed7e46417b56c298559

  • 样例日志
{"vendorId":1,"pickupDate":"2023-07-28T02:40:58.014248","googleDuration":9847,"gcDistance":1,"tripDuration":9847,"storeAndFwdFlag":0,"dropoffLongitude":"-73.97737885","passengerCount":8,"googleDistance":1,"dropoffLatitude":"40.75883865","dropoffDate":"2023-07-28T03:35:58.014253","pickupLatitude":"40.77410507","pickupLongitude":"-73.87303925","id":"id8422372"}

5)开启 Redshift 查询编辑器

6)创建 Redshift 连接,根据提示输入用户名、密码

7)确保编辑器的连接的集群、数据库无误

8)在编辑器内输入如下内容,创建 schema、物化视图摄取 Kinesis 数据流中的数据

-- 创建schema绑定到kinesis data streams

CREATE EXTERNAL SCHEMA kds_demo
FROM KINESIS
IAM_ROLE  default ;

-- 创建物化视图接收数据

CREATE MATERIALIZED VIEW mv_log_ingestion
sortkey(approximate_arrival_timestamp, pt, vendor_id) 
AUTO REFRESH YES
AS
    SELECT
        approximate_arrival_timestamp,
        refresh_time,
        partition_key,
        shard_id,
        sequence_number,
        json_parse(kinesis_data) as payload,
        json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'vendorId', true)::varchar(64) as vendor_id,
        to_date(json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'pickupDate', true)::varchar(64), 'YYYY-MM-DD') as pt,
        md5(from_varbyte(kinesis_data,'utf-8')) as md5_key
    FROM
        kds_demo."demo-log-ingestion"
    WHERE
        can_json_parse(kinesis_data);

-- 参考数据摄取情况
select * from mv_log_ingestion order by approximate_arrival_timestamp desc limit 10;

至此,我们已经实现了通过 fluent bit 将日志实时传输至 Kinesis 数据流,然后使用 Redshift streaming ingestion 功能实时摄取至数仓。

3. 总结

本文展示了如何使用 fluent bit 及 Redshift streaming ingestion 特性将日志信息实时摄取到 Redshift 数仓,为后续实时大屏、运营决策提供支撑。为进一步降低延迟,还可以考虑改造应用将日志信息直接推送至 Kinesis 数据流。对于后续基于实时数据的 ETL 需求,还可以使用托管的 Airflow 对 ETL 任务进行调度编排。

4. 参考资料

1. Redshift streaming ingestion 功能介绍:https://aws.amazon.com/cn/redshift/redshift-streaming-ingestion/

2. Kinesis 数据流介绍:https://aws.amazon.com/cn/kinesis/data-streams/?nc1=h_ls

3. fluent bit 安装指引:https://docs.fluentbit.io/manual/installation/linux/amazon-linux

4. EC2 角色配置:https://repost.aws/zh-Hans/knowledge-center/assign-iam-role-ec2-instance

本篇作者

李寅祥

AWS 解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案。曾就职于 IBM,负责企业私有云方案咨询和架构设计,在基础架构方面有丰富经验。