亚马逊AWS官方博客

Kinesis Data Analytics Studio 和Python交互式开发自定义聚合查询

Amazon Kinesis Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准SQL、Python和Scala构建由Apache Flink 支持的流处理应用程序。 只需在AWS管理控制台中单击几下,客户就可以启动serverless notebook来查询数据流并在几秒钟内获得结果。

Apache Flink是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics降低了构建和管理Apache Flink应用程序的复杂性。

运行Apache Flink工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio将Apache Zeppelin 笔记本电脑的易用性与Apache Flink处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。

在本文中,我们将向您介绍Kinesis Data Analytics Studio,并开始使用Apache Flink(Pyflink)的Python API从Amazon Kinesis数据流交互式查询数据。本文我们使用Kinesis Data Stream作为数据源。Kinesis Data Analytics Studio还与Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Amazon Simple Storage Service(Amazon S3)以及Apache Flink支持的各种其他数据源兼容。

准备:

  • Kinesis Data Stream
  • Cloud9

创建 Kinesis Data Stream

进入Cloud9, 新建一个terminal,并执行下列cli创建一个名为teststream的消息队列:

$ aws kinesis create-stream \
--stream-name teststream \
--shard-count 1 \
--region ap-northeast-1

创建一个Kinesis Data Analytics Studio notebook

您可以通过以下步骤开始与数据流交互:

  • 打开AWS管理控制台并导航至Amazon Kinesis/Data Analytics/Streaming applications
  • 选择主页上的Studio选项卡,然后选择Create Studio Notebook。
  • 选择Create with custom settings, 输入Studio笔记本的名称,并让Kinesis Data Analytics Studio为此创建AWS IAM角色。
  • 选择一个AWS Glue数据库来存储Kinesis Data Analytics Studio使用的源和目标周围的元数据。
  • 选择创建Studio notebook。

创建应用程序后,选择Run以启动Apache Flink应用程序。这将需要几分钟的时间来完成,此后可以点击Open in Apache Zeppelin打开。

在Cloud9中创建样本数据

在cloud9中创建ticker.py文件,并复制如下代码到文件内并保存

import datetime
import json
import random
import boto3
import time

STREAM_NAME = "teststream"
price = 100

def get_data():
    global price 
    price = price + (random.random()*2-1)*10
    price = 0 if price < 0 else price
    return {
        #'EVENT_TIME': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['BTC','ETH','BSC','SOL']),
        'price': price,
        'event_time': datetime.datetime.now().isoformat()
    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()   
        print(data)
        time.sleep(1)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))

运行代码,程序会将模拟的数据发送到Kinesis中。这里注意,由于Cloud9使用临时凭证,所以有可能会出现token过期的问题,重新运行即可。

编写Studio代码实现自定义聚合

首先我们创建一个source table,用于定义数据的schema以及watermark:

%flink.ssql(type=update)

create table stock_from_flink (
    ticker varchar(6),
    price double,
    event_time TIMESTAMP(3),
    WATERMARK for event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH(
    'connector' = 'kinesis',
    'stream' = 'teststream',
    'aws.region' = 'ap-northeast-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

点击右上角的执行按钮,然后添加新的paragraph并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:

%flink.pyflink

class CountAndSumAggregateFunction(AggregateFunction):

    def get_value(self, accumulator):
        return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])

    def create_accumulator(self):
        return Row(-1, 0,-1,0)

    def accumulate(self, accumulator, row: Row):
        accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] > 0 else row[1]
        accumulator[1] = max(accumulator[1],row[1])
        accumulator[2] = accumulator[2] if  accumulator[2] > 0 else row[1]
        accumulator[3] = row[1]

    def retract(self, accumulator, row: Row):
        pass

    def merge(self, accumulator, accumulators):
        pass

    def get_accumulator_type(self):
        return DataTypes.ROW(
            [DataTypes.FIELD("minp", DataTypes.DOUBLE()),
             DataTypes.FIELD("maxp", DataTypes.DOUBLE()),
             DataTypes.FIELD("initialp", DataTypes.DOUBLE()),
             DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])

    def get_result_type(self):
        return DataTypes.ROW(
             [DataTypes.FIELD("minp", DataTypes.DOUBLE()),
             DataTypes.FIELD("maxp", DataTypes.DOUBLE()),
             DataTypes.FIELD("initialp", DataTypes.DOUBLE()),
             DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])

function = CountAndSumAggregateFunction()
agg = udaf(function,
           result_type=function.get_result_type(),
           accumulator_type=function.get_accumulator_type(),
           name=str(function.__class__.__name__))

创建新的paragraph,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:

%flink.pyflink

input_table = st_env.from_path("stock_from_flink")
new_table3 = input_table.window(Tumble.over("10.seconds").on("event_time").alias("ten_seconds_window")) \
            .group_by("ten_seconds_window, ticker") \
            .aggregate(agg.alias("minp","maxp","initialp","lastedp")) \
            .select(" ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price,  ten_seconds_window.end as epoch_time")

最后一个paragraph我们用来展示我们聚合后的结果:

%flink.pyflink

z.show(new_table3, stream_type="update")

点击执行后,我们会很快接收到数据并生成窗口:

我们还可以通过不同的可视化图形来观察数据

同时,同传统的应用一样,Studio提供了Apache Flink Dashboard, 方便查询程序运行时的状况。

总结:

Kinesis Data Analytics Studio使Apache Flink应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对Apache Flink的Kinesis Data Analytics应用程序。

参考文档:

https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html

https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html

本篇作者

孙标

亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。