亚马逊AWS官方博客

Amazon Redshift 的新增功能 – 适用于 Kinesis Data Streams 的流式摄取和适用于 Apache Kafka 的托管流式处理正式推出

十年前,在我加入 AWS 仅几个月后,Amazon Redshift 就正式推出。多年来,该工具加入诸多功能以提高性能并更易于使用。借助 Amazon Redshift,您现在可以使用 SQL 跨数据仓库、运营数据库和数据湖分析结构化和半结构化数据。最近,Amazon Redshift Serverless 正式发布,可让用户更轻松地运行和扩展分析,而无需管理数据仓库基础设施。

为了尽快处理来自实时应用程序的数据,客户正在采用流式处理引擎,例如 Amazon KinesisAmazon Managed Streaming for Apache Kafka。以前,要将流式数据加载到您的 Amazon Redshift 数据库中,您必须配置一个流程,以便于加载之前在 Amazon Simple Storage Service (Amazon S3) 中暂存数据。这样做会产生一分钟或更长时间的延迟,具体取决于数据量。

今天,我很高兴与大家分享 Amazon Redshift Streaming Ingestion 的正式发布。借助这项新功能,Amazon Redshift 每秒可以从 Amazon Kinesis Data Streams 和 Amazon MSK 将数百兆字节的数据提取到 Amazon Redshift 实体化视图中,然后在几秒钟内对其进行查询。

架构图。

流式摄取受益于通过实体化视图优化查询性能的能力,并可让用户更有效地使用 Amazon Redshift 进行运营分析和作为实时控制面板的数据源。流式摄取的另一个有趣使用案例是分析来自游戏玩家的实时数据,以优化他们的游戏体验。这种全新的集成还有助于实现物联网设备分析、点击流分析、应用程序监控、欺诈检测和实时排行榜。

我们来看看该方法的实际应用。

配置 Amazon Redshift Streaming Ingestion
除了管理权限外,可以在 Amazon Redshift 中完全使用 SQL 配置 Amazon Redshift 流式摄取。这对于缺少 AWS 管理控制台访问权限或配置 AWS 服务之间集成的专业知识的企业用户特别有用。

您可以分三个步骤设置流式摄取:

  1. 创建或更新 AWS Identity and Access Management (IAM) 角色以允许访问您使用的流式平台(Kinesis Data Streams 或 Amazon MSK)。请注意,IAM 角色应具有允许 Amazon Redshift 担任该角色的信任策略。
  2. 创建外部架构以连接到流式服务。
  3. 创建引用外部架构中流对象(Kinesis 数据流或 Kafka 主题)的实体化视图。

之后,您可以查询实体化视图以在分析工作负载中使用来自流的数据。流式摄取适用于 Amazon Redshift 预置的集群和新的无服务器选项。为了最大限度地提升简单性,我将在本演练中使用 Amazon Redshift Serverless。

为了准备环境,我需要一个 Kinesis 数据流。在 Kinesis 控制台中,我在导航窗格中选择 Data streams(数据流),然后选择 Create data stream(创建数据流)。对于 Data stream name(数据流名称),我使用 my-input-stream,然后将所有其他选项设置为默认值。几秒钟后,Kinesis 数据流就准备就绪。请注意,默认情况下我使用的是按需容量模式。在开发或测试环境中,您可以选择具有一个分片预置容量模式来优化成本。

现在,我创建了一个 IAM 角色来授予 Amazon Redshift 访问 my-input-stream Kinesis 数据流的权限。在 IAM 控制台中,我使用以下策略创建了一个角色:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:123412341234:stream/my-input-stream"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

为了让 Amazon Redshift 担任此角色,我使用了以下信任策略:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "redshift.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Amazon Redshift 控制台中,我从导航窗格中选择 Redshift serverless,然后创建一个新的工作组和命名空间,类似于我在此博客文章执行的操作。创建命名空间时,在 Permissions(权限)部分中,我从下拉菜单中选择 Associate IAM roles(关联 IAM 角色)。然后,我选择刚才创建的角色。请注意,只有当信任策略允许 Amazon Redshift 担任该角色时,该角色才会可供选择。之后,我使用默认选项完成命名空间的创建。几分钟后,无服务器版数据库就可供使用。

在 Amazon Redshift 控制台中,我在导航窗格中选择 Query editor v2(查询编辑器 v2)。我通过从资源列表中选择来连接新的无服务器版数据库。现在,我可以使用 SQL 来配置流式摄取。首先,我创建一个映射到流式服务的外部架构。因为我将使用模拟的 IoT 数据作为示例,因此称之为外部架构传感器

CREATE EXTERNAL SCHEMA sensors
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';

为了访问流中的数据,我创建了一个从流中选择数据的实体化视图。通常,实体化视图包含基于查询结果的预先计算的结果集。在这种情况下,查询正在从流中读取,而 Amazon Redshift 是数据流的使用者。

流式数据将作为 JSON 数据进行摄取,因此我有两个选择:

  1. 将所有 JSON 数据保留在一列中,然后使用 Amazon Redshift 功能查询半结构化数据。
  2. 将 JSON 属性提取到自己的单独列中。

让我们看看这两种选择的优缺点。

SELECT 语句中的 approximate_arrival_timestamppartition_keyshard_idsequence_number 列由 Kinesis Data Streams 提供。流中的记录位于 kinesis_data 列中。refresh_time 列由 Amazon Redshift 提供。

为了将 JSON 数据保留在 sensor_data 实体化视图的单列中,我使用 JSON_PARSE 函数:

CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_PARSE(kinesis_data, 'utf-8') as payload    
      FROM sensors."my-input-stream";
CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data) as payload 
FROM sensors."my-input-stream";

我使用了 AUTO REFRESH YES 参数,因此当流中有新数据时,实体化视图的内容会自动刷新。

为了将 JSON 属性提取到 sensor_data_extract 实体化视图的单独列中,我使用 JSON_EXTRACT_PATH_TEXT 函数:

CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time
      FROM sensors."my-input-stream";

将数据加载到 Kinesis 数据流中
要将数据放入 my-input-stream Kinesis Data Stream 中,我使用以下 random_data_generator.py Python 脚本模拟来自 IoT 传感器的数据:

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

我启动该脚本并查看正在放入流中的记录。它们使用 JSON 语法并包含随机数据。

$ python3 random_data_generator.py
{'sensor_id': 66, 'current_temperature': 69.67, 'status': 'OK', 'event_time': '2022-11-20T18:31:30.693395'}
{'sensor_id': 45, 'current_temperature': 122.57, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.486649'}
{'sensor_id': 15, 'current_temperature': 101.64, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.671593'}
...

查询来自 Amazon Redshift 的流式数据
为了比较两个实体化视图,我从每个视图中选择前十行:

  • sensor_data 实体化视图中,流中的 JSON 数据位于 payload 列中。我可以使用 Amazon Redshift JSON 函数来访问以 JSON 格式存储的数据。控制台屏幕截图。
  • sensor_data_extract 实体化视图中,流中的 JSON 数据已提取到不同的列中:sensor_idcurrent_temperaturestatusevent_time控制台屏幕截图。

现在,我可以在分析工作负载中使用这些视图中的数据,并且结合我的数据仓库、操作数据库和数据湖中的数据。我可以将这些视图中的数据与 Redshift ML 结合使用,以此训练机器学习模型或使用预测分析。由于实体化视图支持增量更新,因此这些视图中的数据可以有效地用作控制面板的数据源,例如,使用 Amazon Redshift 作为 Amazon Managed Grafana 的数据源。

可用性和定价
适用于 Kinesis Data Streams 的 Amazon Redshift 流式摄取和适用于 Apache Kafka 的托管流式处理现已在所有商用 AWS 区域正式推出。

使用 Amazon Redshift 流式摄取不会产生额外费用。有关更多信息,请参阅 Amazon Redshift 定价

在数据仓库和数据湖中使用低延迟流式数据从未如此简单。欢迎与我们分享您使用此新功能构建了什么!

Danilo