亚马逊AWS官方博客

利用 Amazon MSK,Amazon Redshift 和 Amazon Quicksight 搭建简易实时数仓

概述

很多客户有快速搭建简易实时数仓的需求,例如内控审计平台,希望从公司各业务线实时收集要监控的数据,并不需要复杂的 ETL 处理,而进到数仓后,希望既支持近实时的多维度查询,又支持后续的离线分析。那么,利用 Amazon MSK,Amazon Redshift 和 Amazon Quicksight 搭建简易实时数仓,是一个很好的选择,能实现项目的快速落地。

Amazon MSK 是 AWS 完全托管的、可用性高且安全的 Apache Kafka 服务,能用于处理流数据的应用程序。

Amazon MSK Serverless 于 2022 年 5 月正式上线,它使您可以轻松地运行 Apache Kafka,而无需管理和扩展集群容量。

Amazon Redshift 是 AWS 的云数仓产品,使用 SQL 在数据仓库、运营数据库和数据湖间分析结构化和半结构化数据。

Amazon Redshift Serverless 于 2022 年 7 月正式上线,它让你只需加载数据然后开始查询,无需设置和管理集群,且只为用量付费。Amazon Redshift 推出的流式摄取的功能,可以每秒从 Amazon MSK 原生地将数百兆字节的数据摄取到 Amazon Redshift 的物化视图中,并在几秒钟内进行查询。

Amazon QuickSight 是 AWS 的云原生 Serverless BI 服务,连接到您的云端数据,合并来自不同来源的数据,例如 Amazon Redshift、Amazon RDS、Amazon Athena 和 Amazon S3 等。

整体实验的架构图如下:

架构图说明:

  1. Amazon MSK 创建 topic,设定消息格式,实时接收各个生产者发送的消息。
  2. Amazon Redshift 创建 topic 对应的物化视图 MBO,自动流式摄取被发送到 Amazon MSK 的消息,实现数仓的实时数据集成,并通过 SQL 来做简单的数据转换。
  3. Amazon Quicksight 设置 Amazon Redshift 为数据源,来制作各种图表,实现数仓的可视化。

详细步骤说明

1.新建 Amazon MSK 集群

Amazon MSK 入门 按照步骤 1-6,来创建 MSK 集群、创建 IAM 角色、创建客户端计算机、创建主题、生成和使用数据、查看指标。记录下您选择的 VPC、安全组、kafka 版本号。

例如我选择的 VPC 是 vpc-09c6a2895b6e3a1c1(172.31.0.0/16),安全组叫 mskgroup,kafka 版本号是 2.8.1。

注意:我们创建 MSK 集群时,默认勾选了基于 IAM 角色的身份验证。

也可以通过 CloudFormation 快速部署 MSK:

https://us-east-1.console.aws.amazon.com/msk/home?region=us-east-1#/streamingDataSolutions

2.新建 Amazon Redshift 集群

1)创建集群子网组

如图所示:

我们点击“创建集群子网组”,例如叫 cluster-subnet-group-1,选择 Amazon MSK 所在的 VPC。

2)创建 Redshift 集群

创建选项 1:Redshift 支持常规的集群,适用于用量相对稳定的工作负载,根据使用示例数据集 – Amazon Redshift 的步骤 1 来创建集群,步骤 2 来尝试示例查询。

创建选项 2:Redshift 也支持 Serverless 的集群,适用于用量波动较大的工作负载,根据 Amazon Redshift Serverless 的步骤来配置 Serverless 集群,并尝试示例查询

作为实验,我们选择 Redshift Serverless,仅为使用量付费。我们使用自定义设置进行配置,创建并关联 IAM 角色,选择 Amazon MSK 所在的 VPC 和子网,创建并关联安全组(我的叫 redshiftgroup,设置入站规则为:允许名为 redshiftgroup 的安全组访问端口 5439,出站规则默认不变),点击“保存设置”。

然后,我们打开 MSK 集群的安全组(我的叫 mskgroup),设置入站规则为:安全组为 mskgroup 访问所有流量,安全组为 redshiftgroup 访问端口 9098,出站规则默认不变(说明:在 Amazon MSK 服务页面,  查看客户端信息,能看到 9098 是其 bootstrap 服务器的端口,redshift 需要访问它来获取 MSK 的元数据)。

开启增强型 VPC 路由,启用此选项会强制集群和数据存储库之间的网络流量通过 VPC,而不是 Internet。

等待数分钟后 Redshift Serverless 完成初始化,我们就可以点击 Redshift 的“查询编辑器 V2”,做示例查询。

3.配置 MSK 到 Redshift 的连接

1)在 Redshift 的 IAM 角色上添加 Amazon MSK IAM 策略

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:0123456789:cluster/*/*/",
                "arn:aws:kafka:*:0123456789:topic/*/*/*"
            ]
        },
        {
            "Sid": "MSKPolicy",
            "Effect": "Allow",
            "Action": [
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": "*"

        }
    ]
}

将上述策略中的 0123456789 替换为当前账号的 ID:

“arn:aws:kafka:us-east-1:0123456789:cluster/*/*/”

2)在 Amazon Redshift 的查询编辑器中,创建一个外部 Schema 以映射到 Amazon MSK 集群

CREATE EXTERNAL SCHEMA myschema
FROM MSK
IAM_ROLE 'iam-role-arn'
AUTHENTICATION iam
CLUSTER_ARN 'msk-cluster-arn';

将 iam-role-arn 替换为 redshift 集群的 IAM 角色,将 msk-cluster-arn 替换为 msk 集群的 ARN。运行后,可以看到左侧菜单栏出现了“myschema”。

3)在 Amazon Redshift 的查询编辑器中,创建一个物化视图以使用来自 MSK 主题的数据

先在创建 MSK 集群的客户端机器上执行 kafka-topics.sh,创建测试主题为 demotopic(替换 BootstrapServerString):

<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server Bb-3.kafkacluster0123456789.8z90kh.c8.kafka.us-east-1.amazonaws.com:9098,b-2.kafkacluster0123456789.8z90kh.c8.kafka.us-east-1.amazonaws.com:9098,b-1.kafkacluster0123456789.8z90kh.c8.kafka.us-east-1.amazonaws.com:9098 --command-config client.properties --replication-factor 3 --partitions 1 --topic demotopic 

继续执行 kafka-console-producer.sh,向 demotopic 发送样例 JSON 消息(替换BootstrapServerString):

<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --bootstrap-server <BootstrapServerString> --producer.config client.properties --topic demotopic

输入以下几条消息:

{"sensor_id": 66, "current_temperature": 69.67, "status": "OK", "event_time": "2022-11-20T18:31:30.693395"}
{"sensor_id": 45, "current_temperature": 122.57, "status": "OK", "event_time": "2022-11-20T18:31:31.486649"}
{"sensor_id": 15, "current_temperature": 101.64, "status": "OK", "event_time": "2022-11-20T18:31:31.671593"}

然后在 Redshift 的查询编辑器中,执行以下命令来创建对应的物化视图。

CREATE MATERIALIZED VIEW demoview AUTO REFRESH YES AS
    SELECT "kafka_partition",
        "kafka_offset",
        "kafka_timestamp_type",
        "kafka_timestamp",
        "kafka_key",
        JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kafka_value, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,
        JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kafka_value, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,
        JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kafka_value, 'utf-8'),'status')::VARCHAR(8) as status,
        JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kafka_value, 'utf-8'),'event_time')::CHARACTER(26) as event_time,       
        "kafka_headers"
FROM myschema.demotopic;

注意,这个样例视图因为调用了 JSON_EXTRACT_PATH_TEXT 将 JSON 消息中的数据转换为 Redshift 的相应数据类型,所以会确认 MSK 的消息数据是有效 JSON 和 utf8;它也通过 auto refresh yes,开启了自动刷新。

4)在 Amazon Redshift 的查询编辑器中,刷新物化视图并查询数据

然后在 Redshift 的查询编辑器中,执行以下命令来刷新物化视图,并查询数据。

REFRESH MATERIALIZED VIEW demoview;
select * from demoview;

可以看到 JSON 数据流被成功抽取到了不同的数据列(sensor_id, current_temperature, status, event_time),如下图所示:

通过以上的几个步骤,就成功建立了 MSK 到 Redshift 的实时通道。之后,您可以在 Redshift 中创建额外的物化视图和数据表,从而逐步搭建起完整的数仓。

4.配置 QuickSight 集群到 Redshift 的连接

1)创建 QuickSight 集群

从 AWS 的菜单栏搜索 QuickSight 服务,注册默认的企业版并打开 QuickSight 控制台。然后,点击右上角的菜单,将 QuickSight 切换到 Redshift 所在的 AWS 区域,例如 us-west-2 https://us-west-2.quicksight.aws.amazon.com/sn/admin

2)配置安全组

在 AWS 控制台,创建名为 quicksight-admin 的安全组,选择 MSK 所在的 VPC,添加入站规则为:redshiftgroup 的安全组的所有流量,出站规则为:redshiftgroup 的安全组的访问端口 5439。

在 redshiftgroup 的安全组,添加入站规则为:quicksight-admin 的安全组访问端口 5439,出站规则保持不变。

3)创建 VPC 连接

在 QuickSight 控制台,点击“Manage VPC connection”,创建私有 VPC 连接:选择 Redshift 所在的 VPC,子网 ID,输入 quicksight-admin 的安全组 ID。默认的 execution role 例如 aws-quicksight-service-role-v0,要添加创建 VPC 连接的 policy,例如:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:ModifyNetworkInterfaceAttribute",
"ec2:DeleteNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups"
],
"Resource": "*"
}
]
}

等待片刻后,VPC 连接创建成功。

4)创建数据集

在 QuickSight 控制台,点击左上角“QuickSight”切换到应用视图,再点击“Datasets”,数据源选择 Redshift,  然后输入数据源的名称,连接类型为刚才创建的 VPC 连接,填写 Redshift 的服务端点、数据库名、用户名、密码。如下图所示:

然后,我们就可以使用这个 Redshift 数据源来创建图表了。点击“Analysis”,选择这个 demoview 数据集,就能编辑各种图表了,如下图所示:

当然,QuickSight 也可以接入其他的各种数据源。

通过以上的步骤,就成功建立了 Redshift 到 QuickSight 的可视化能力。

清理

1)删除 Amazon MSK 资源

Amazon MSK 入门 按照步骤 7,删除 MSK 集群、EC2 实例、IAM policy 和角色。

2)删除 Amazon Redshift 资源

在 Redshift 的控制面板,点击左侧菜单栏的“工作组配置”,然后选择工作组并删除,勾选“删除关联的命名空间”,删除所有 Redshift Serverless 资源。

3)删除 Amazon QuickSight 资源

访问地址:https://us-east-1.quicksight.aws.amazon.com/sn/console/unsubscribe ,删除所有 QuickSight 资源。

结论

在这篇文章中,介绍了如何利用 Amazon MSK,Amazon Redshift 和 Amazon QuickSight 来快速搭建简易实时数仓。在实验中,介绍了具体的搭建步骤,Redshift 如何从 MSK 摄入流式数据,QuickSight 如何将 Redshift 配置为数据源。建议用户在搭建完成后,进一步探索 Redshift 的数仓功能,QuickSight 的可视化功能。

参考资料

https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/getting-started.html

https://docs.aws.amazon.com/zh_cn/redshift/latest/gsg/new-user-serverless.html

https://docs.aws.amazon.com/zh_cn/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started-MSK.html

https://aws.amazon.com/cn/blogs/aws/new-for-amazon-redshift-general-availability-of-streaming-ingestion-for-kinesis-data-streams-and-managed-streaming-for-apache-kafka/

https://docs.aws.amazon.com/zh_cn/quicksight/latest/user/working-with-aws-vpc.html

本篇作者

余骏

AWS 解决方案架构师,负责基于 AWS 的云计算方案架构咨询及落地。在加入 AWS 前,拥有多年 IT 咨询和互联网研发经验,在云原生微服务、机器学习应用等方向有丰富的实践经验。

迟锦丰

AWS 解决方案架构师,负责基于 AWS 云计算方案的架构设计以及咨询,在加入 AWS 前,曾就职于多家大型互联网公司,十年以上 SRE 以及运维经验,在企业 IT 基础架构、云计算网络、流媒体直播等方向有着丰富的经验。

曹增轩

AWS 解决方案架构师,负责 AWS Startup 初创生态的架构设计以及咨询,在数据分析领域拥有丰富的架构设计及咨询经验。