亚马逊AWS官方博客

在 Amazon Kinesis Data Analytics Studio 中尝试的十大 Flink SQL 查询

通过 Amazon Kinesis Data Analytics Studio,您可以轻松地实时分析流数据并使用标准 SQL、Python 和 Scala 构建流处理应用程序。只需在亚马逊云科技管理控制台上单击几下,就可以启动无服务器笔记本来查询数据流,只需几秒钟即可获得结果。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。Apache Flink 是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。

Apache Flink SQL 支持 Apache Calcite,它执行 SQL 标准,使您能够编写简单的 SQL 语句来创建、转换数据并将其插入 Apache Flink 中定义的流数据表中。在本文中,我们将讨论一些可以在 Kinesis Data Analytics Studio 中运行的 Flink SQL 查询。

Flink SQL 接口可与 Apache Flink Table API 以及 Apache Flink DataStream 和 Dataset API 无缝协作。为了以适合当前操作的方式处理流数据,流工作负载通常会在这些抽象层中切换。一个简单的过滤器模式可能需要 Flink SQL 语句,而涉及以对象为导向的状态控制的更复杂的聚合可能需要 DataStream API。工作负载可使用 DataStream API 从数据流中提取模式,然后使用 Flink SQL API 对模式进行分析、扫描、过滤和聚合。

有关 Flink SQL 和 Table API 的更多信息,请参阅概念和常见 API,尤其是关于解释器使用的不同计划器以及如何构建 Apache Flink SQL 或 Table API 程序的章节。

 

Kinesis Data Analytics Studio 编写 Apache Flink SQL 应用程序

通过Amazon Kinesis Data Analytics Studio,您每秒可查询数百万条流记录,笔记本也可以得到相应的扩展。通过 Apache Flink 强大的Amazon Kinesis Data Analytics 功能,只需几个简单的 SQL 语句,您就能够拥有强大的 Apache Flink 应用程序或分析控制面板。

需要入门指导? Amazon Kinesis Data Analytics Studio 很容易上手。在接下来的部分中,我们将介绍多种与传入数据流交互的方法 — 在Amazon Kinesis Data Analytics Studio 笔记本中查询、聚合、接收和处理数据。首先,为数据流创建一个内存表。

 

为传入的数据创建一个内存表

首先使用 CREATE 语句注册内存表。您可以将这些语句配置为连接到 Amazon Kinesis Data StreamsAmazon Managed Streaming for Apache Kafka (Amazon MSK) 集群或 Apache Flink 中目前受支持的任何其他连接器,例如 Amazon Simple Storage Service(Amazon S3)。

您需要在段落开头指出使用的是 Flink SQL 解释器,该解释器由 Zeppelin magic % 指示,后跟 flink.ssql 和段落的类型。在大多数情况下是更新段落 type=update,会持续更新输出。如果查询的结果只有一行,可以使用 type=single;如果需要将查询的输出附加到现有结果后面,则可以使用 type=append。请参阅以下代码:

%flink.ssql(type=update)

 

CREATE TABLE stock_table (

ticker VARCHAR(6),

price DOUBLE,

event_time TIMESTAMP(3),

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

)

PARTITIONED BY (ticker)

WITH (

'connector' = 'kinesis',

'stream' = 'input-stream',

'aws.region' = 'us-east-1',

'scan.stream.initpos' = 'LATEST',

'format' = 'json',

'json.timestamp-format.standard' = 'ISO-8601')

此示例展示了创建一个名为 stock_table 的表,其中包含股票、价格和表示记录股票价格时间的 event_time 列。WATERMARK 子句根据 event_time (row_time) 列定义了用于生成水位线的水位线策略。event_time 列被定义为 Timestamp(3),是与水位线一起使用的顶级列。WATERMARK定义后面的语法 — FOR event_time AS event_time – INTERVAL ‘5’ SECOND — 声明水位线是根据 bounded-out-of-orderness 策略发出的,允许 event_time 数据有 5 秒的延迟。该表使用 Kinesis 连接器从最新的流位置读取 us-east-1 区域中名为 input-stream 的 Kinesis 数据流。

在 Zeppelin 笔记本中运行此语句时,将会根据 CREATE 语句中的声明创建一个 Amazon Glue 数据目录表,该表可立即用于来自 Kinesis Data Streams 的查询。

如果数据目录已包含该表,则无需完成此步骤。您可以如前文所述创建表,也可以使用现有的数据目录表。

以下屏幕截图展示了在 Amazon Glue 数据目录中创建的表。


 

使用实时更新查询数据流

创建表后,可以通过编写 SELECT 语句来执行简单的数据流查询,该语句允许以表格形式或条形图、饼图等显示数据:

%flink.ssql(type=update)

SELECT * FROM stock_table;

从不同图表中选择不同的可视化效果非常简单,可以直接从结果集的左上角选择。

要删除或重新创建此表,您可以导航到 Amazon Glue 控制台中的表目录,手动将其从数据目录中删除,也可以从Amazon Kinesis Data Analytics Studio 笔记本中显式地删除此表:

%flink.ssql(type=update)

DROP TABLE stock_table;

 

过滤函数

您可以使用关键字“WHERE”对数据流执行简单的过滤操作。在以下代码示例中,从所有股票代码记录过滤出以“AM”开头的流数据:

%flink.ssql(type=update)

 

SELECT * FROM stock_table WHERE ticker LIKE 'AM%'

如下屏幕截图显示了结果。

 

用户定义函数

您可以在笔记本中注册用户定义函数 (UDF),以便在我们的 Flink SQL 查询中使用。必须在表环境中注册才能供Amazon Kinesis Data Analytics Studio 应用程序中的 Flink SQL 使用。UDF 是可以在 Flink SQL 范围之外定义的函数,它们使用自定义逻辑或频繁的转换,通常这些内容不方便在 SQL 中表达出来。

UDF在Amazon Kinesis Data Analytics Studio 中是通过 Scala 实现的,其中 Python UDF 支持即将推出。UDF 可使用任意库处理数据。

让我们定义一个将股票符号转换为小写字母的 UDF 和一个将 event_time 转换为epoch seconds的 UDF:

%flink

 

import java.time.LocalDateTime

import java.time.format.DateTimeFormatter._

import java.time.ZoneOffset

 

class DateTimeToEpoch extends ScalarFunction {

def eval(datetime: LocalDateTime) = datetime.toEpochSecond(ZoneOffset.UTC)

}

stenv.registerFunction("dt_to_epoch", new DateTimeToEpoch())

 

class ScalaLowerCase extends ScalarFunction {

def eval(str: String) = str.toLowerCase

}

stenv.registerFunction("to_lower", new ScalaLowerCase())

在每个 UDF 定义的底部,Scala 中的 stenv(StreamingTableEnvironment) 用于注册指定名称的函数。

注册后,您只需在 Flink SQL 段落中调用 UDF 即可转换我们的数据:

%flink.ssql(type=update)

SELECT to_lower(ticker) as lowercase_ticker, price, dt_to_epoch(event_time) as epoch_time from stock_table;

如下屏幕截图显示了结果。

 

使用外部数据源(join)来扩充数据

您可能需要使用存储在数据流之外的静态或参考数据来扩充流数据。例如,除了股票交易之外,公司地址和元数据可能流入到关系数据库或Amazon S3上的文件。为了扩充数据流,Flink SQL 允许您将参考数据连接到流数据源。这种扩充静态数据可能有或没有与之关联的时间元素。如果没有关联的时间元素,您可能需要向从外部读取的数据添加时间处理元素,以便将其与基于时间的流连接起来。这是为了避免得到过时的数据,在扩充数据时应注意这一点。

让我们定义一个扩充文件作为数据源,该文件位于 Amazon S3 中。存储桶包含一个 CSV 文件,其中包含股票代码和关联公司的元数据 — 全称、城市和州:

%flink.ssql(type=update)

 

CREATE TABLE company_details_table (

  ticker_symbol VARCHAR(6),

  company_name VARCHAR,

  company_city VARCHAR,

  company_state_abbrev VARCHAR

)  WITH (

  'connector' = 'filesystem',        

  'path' = 's3a://interactive-applications/data-mapping-stock-enrichment.csv',

  'format' = 'csv'                  

)

此 CSV 文件被一次性读取,且任务被标记为已完成。现在,您可以将它与现有的 stock_table 连接:

%flink.ssql(type=update)

SELECT ticker, price, company_name, event_time, company_city, company_state_abbrev FROM (SELECT CAST(event_time AS TIMESTAMP) as event_time, ticker, price from stock_table)

JOIN company_details_table cd

ON ticker=ticker_symbol;

在撰写本文时,Flink 存在一个限制,它无法区分间隔连接(两个表都需要时间戳)和常规连接。因此,您需要将 rowtime 列 event_time 显式转换为常规时间戳,以免其被纳入常规连接中。如果两个表都有时间戳,理想的情况是将它们包含在连接语句的 WHERE 子句中。如下屏幕截图显示了结果。

 

滚动(Tumbling)窗口

可将滚动窗口视为在不重叠的时间窗口中的小批次聚合。例如,计算 30 秒内的最高价格,或10 秒内的股票计数。要使用 Apache Flink SQL 执行此功能,请使用以下代码:

%flink.ssql(type=update)

 

SELECT ticker_symbol, COUNT(ticker_symbol) AS ticker_symbol_count

FROM stock_ticker_table

GROUP BY TUMBLE(processing_time, INTERVAL '10' second), ticker_symbol;

以下截图显示了我们的输出。

 

滑动(Sliding)窗口

滑动窗口(也称为跳跃窗口)与滚动窗口基本相同,区别在于这些窗口可能会重叠。滑动窗口可以每隔 X 秒发出窗口大小为 Y 秒的数据。例如,对于上述使用案例,您可以每隔 5 秒发出一次 10 秒的数据统计:

%flink.ssql(type=update)

 

SELECT ticker_symbol, COUNT(ticker_symbol) AS ticker_symbol_count

FROM stock_ticker_table

GROUP BY HOP(processing_time, INTERVAL '5' second, INTERVAL '10' second), ticker_symbol;

如下屏幕截图显示了结果。

 

带过滤警报的滑动窗口

要过滤数据流中的记录,以触发某种警报或在下游使用它们,以下示例展示了如何将过滤出的滑动窗口插入聚合计数表中,该表的配置为写入数据流。之后可以使用 Amazon CloudWatch 或其他触发机制来发出高交易率或其他指标的警报。

以下 CREATE TABLE 语句连接到 Kinesis 数据流,其后的插入语句将过滤出所有以 AM 开头的代码记录,其中 1 分钟间隔内有 750 条记录:

%flink.ssql(type=update)

 

CREATE TABLE stock_ticker_count_table (

    ticker_symbol VARCHAR(4),

    ticker_symbol_count INTEGER

)

WITH (

'connector' = 'kinesis',

'stream' = 'output-stream',

'aws.region' = 'us-east-1',

'scan.stream.initpos' = 'LATEST',

'format' = 'json',

'json.timestamp-format.standard' = 'ISO-8601');

 

INSERT INTO  stock_ticker_count_table

SELECT * FROM

    (SELECT ticker_symbol, CAST(COUNT(ticker_symbol) AS INTEGER) AS ticker_symbol_count

    FROM stock_ticker_table

    WHERE ticker_symbol like 'AM%'

    GROUP BY HOP(processing_time, INTERVAL '30' second, INTERVAL '1' minute), ticker_symbol)

WHERE ticker_symbol_count > 750;

 

事件时间

如果传入的数据包含时间戳信息,您的数据管道将使用事件时间而不是处理时间,从而更好地反映实际情况。这两种时间的区别在于,事件时间反映的是生成记录的时间,而处理时间指 Apache Flink 的Amazon Kinesis Data Analytics 收到记录的时间。

要在 Flink SQL 创建语句中指定事件时间,用于事件时间的元素必须为 TIMESTAMP(3)类型,并且必须伴随水位线策略表达式。如果事件时间列不是 TIMESTAMP(3)类型,也可以计算出来。定义水位线策略表达式将事件时间字段标记为事件时间属性,并说明如何处理迟到的数据。

水位线策略表达式定义了水位线策略。计算为每条记录生成的水位线,并相应地处理数据的顺序。

流数据工作负载中的迟到数据很常见,大多数情况下是不可避免的。数据之所以迟到,可能是因为网络滞后、数据缓冲或处理速度缓慢以及介于它们之间的任何其他原因。对于可能引入迟到数据的升序的时间戳工作负载,您可以使用以下水位线策略:

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

此代码发出观察到的最大时间戳减去一条记录的水位线。时间戳早于或等于最大时间戳的行不会被视为迟到。

 

Bounded-out-of-orderness 时间戳

要发出观察到的最大时间戳减去指定延迟的水位线,可以通过Bounded-out-of-orderness时间戳定义允许的数据流中的记录延迟:

WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘3’ SECOND

上面的代码发出 3 秒延迟水位线。可以在本文的简介部分找到这个例子。水位线指示数据流处理迟到的数据。思考以下场景:股票代码每 5 秒钟用实时数据更新实时控制面板。如果数据延迟 10 秒(根据事件时间)到达数据流,我们会丢弃该数据,而不会反映在控制面板中。水位线指导 Apache Flink 处理迟到的数据。,

 

MATCH_RECOGNIZE

流数据中的一个常见模式是能够检测模式。Apache Flink 具有复杂的事件处理库,能够检测数据中的模式,而且 Flink SQL API 允许在关系查询语法中进行检测。

通过 Flink SQL 中的 MATCH_RECOGNIZE 查询能够实现逻辑分区并识别流表中的模式。以下示例演示了如何操作我们的股票表:

%flink.ssql(type=update)

 

SELECT *

FROM stock_table

    MATCH_RECOGNIZE(

        PARTITION BY ticker

        ORDER BY event_time

        MEASURES

            A.event_time AS initialPriceTime,

            C.event_time AS dropTime,

            A.price - C.price AS dropDiff,

            A.price as initialPrice,

            C.price as lastPrice

        ONE ROW PER MATCH

        AFTER MATCH SKIP PAST LAST ROW

        PATTERN (A B* C) WITHIN INTERVAL '10' MINUTES

        DEFINE

            B AS B.price > A.price - 500

    )

在此查询中,我们识别了 10 分钟内某只下跌了 500 美元的股票。让我们将 MATCH_RECOGNIZE 查询细分为几个组件。

以下代码查询我们现有的 stock_table:

SELECT * FROM stock_table

MATCH_RECOGNIZE 关键字开始将模式与查询子句相匹配。这表示我们正在识别表中的模式。

下面的代码定义了表的逻辑分区,类似于 GROUP BY 表达式:

PARTITION BY ticker

以下代码定义了如何对传入数据进行排序。所有 MATCH_RECOGNIZE 模式都需要分区和排序方案才能识别模式。

ORDER BY event_time

MEASURES 定义了查询的输出。您可以将其视为 SELECT 语句,因为这是模式的最终结果。

在下面的代码中,我们从模式识别中选择要输出的行:

A.event_time AS initialPriceTime,

C.event_time AS dropTime,

A.price - C.price AS dropDiff,

A.price as initialPrice,

C.price as lastPrice

我们使用以下参数:

  • A.event_time — 记录在模式中的第一个时间,500 美元的价格从这个时间开始下跌
  • C.event_time — 记录在模式中的最后一个时间,这个时间的价格比 A.price 下跌了至少 500 美元
  • A.price – C.price — 记录在模式中的第一个时间到最后一个时间的价差
  • A.price — 记录在模式中的第一个价格,500 美元的价格开始下跌
  • C.price — 记录在模式中的最后一个价格,比 A.price 下跌了至少 500 美元

ONE ROW PER MATCH 定义了输出模式 — 每找到一个匹配应发出多少行。从 Apache Flink 1.12 开始,这是唯一受支持的输出模式。有关当前不支持的替代方案,请参阅输出模式

以下代码定义了匹配后策略

AFTER MATCH SKIP PAST LAST ROW

此代码指导 Flink SQL 在找到匹配后启动新的匹配过程。这个特定的定义跳过当前模式中的所有行,然后转到数据流中的下一行。这可以确保模式事件中不存在重叠。有关 AFTER MATCH SKIP 替代策略,请参阅匹配后策略。可将这种策略视为一种滚动窗口类型的聚合,因为模式的结果相互不重叠。

在下面的代码中,我们定义了模式 A B* C,表示我们将有一个序列的连接记录:

PATTERN (A B* C) WITHIN INTERVAL '10' MINUTES

我们使用以下顺序:

  • A — 序列中的第一条记录
  • B* — 与 DEFINE 子句中定义的约束相匹配的零或多条记录
  • C — 序列中的最后一条记录

这些变量的名称在 PATTERN 子句中得到定义,并遵循类似正则的语法。有关详细信息,请参见定义模式

在下面的代码中,我们将 B 模式变量定义为记录的价格,只要该价格大于模式中第一条记录减 500 的值:

DEFINE

    B AS B.price > A.price - 500

例如,假设我们有以下模式。

row ticker price event_time
1 AMZN 800 10:00 am
2 AMZN 400 10:01 am
3 AMZN 500 10:02 am
4 AMZN 350 10:03 am
5 AMZN 200 10:04 am

我们定义以下内容:

  • A — 第 1 行
  • B — 第 2—4 行,它们都匹配 DEFINE 子句中的条件
  • C — 第 5 行,它打破了匹配 B 条件的模式,因此是模式中的最后一行

以下屏幕截图展示了完整的例子。

 

Top-N

Top-N 查询识别按列排序的 N 个最小值或最大值。例如,在需要识别数据流中前 10 个项目或最后 10 个项目的情况下,此查询非常有用。

Flink 可使用 OVER 窗口子句和筛选表达式的组合来生成 Top-N 查询。OVER / PARTITION BY 子句还可以支持每组的 Top-N。请参阅以下代码:

 SELECT * FROM (

    SELECT *, ROW_NUMBER() OVER (PARTITION BY ticker_symbol ORDER BY price DESC) as row_num

    FROM stock_table)

WHERE row_num <= 10;

 

数据去重

如果生成到数据流中的数据可能存在重复条目,有多种策略可消除这些条目。要实现这一目的,最简单的方法是数据去重,您可以在窗口中删除行,并根据时间戳仅保留第一个或最后一个元素。

Flink 可使用 ROW_NUMBER 来删除重复项,正如 Top-N 示例所展示的方法一样。只需编写 OVER / PARTITION BY 查询,然后在 WHERE 子句中指定第一行的编号:

SELECT * FROM (

    SELECT *, ROW_NUMBER OVER (PARTITION BY ticker_symbol ORDER BY price DESC) as row_num

    FROM stock_table)

WHERE row_num = 1;

 

最佳实践

与任何数据流工作负载一样,为了解工作负载的进展情况,您需要测试和监控策略。

以下是需要监控的关键领域:

  •  — 确保源数据流具有足够的吞吐量,并且在使用Amazon Kinesis 的情况下,您没有收到 ThroughputExceededExceptions,或源系统的高内存或 CPU 使用率。
  • 目标 — 与数据源一样,确保 Flink SQL 应用程序的输出不会塞满下游系统。在使用Amazon Kinesis 的情况下,请确保您没有收到任何 ThroughputExceededExceptions。如果收到,应添加分片或者更均匀地分配数据。否则可能会对管道造成背压。
  • 扩缩 — 在分配和扩缩Amazon Kinesis Data Analytics Studio 应用程序时,请确保数据管道有足够的Amazon Kinesis 处理单元。您可以启用基于 CPU 的自动扩缩功能,或者实施自定义自动扩缩程序,以便在大量数据流入时扩展应用程序。
  • 测试 — 在将新的数据管道部署到生产规模数据之前,先开展小规模的测试。如果可能,请使用真实的生产数据来测试管道,或者使用模拟生产数据来了解应用程序的反应,之后再将其部署到生产环境中。
  • 笔记本内存 — 运行应用程序的 Zeppelin 笔记本受浏览器可用内存量的限制,因此,不要向控制台发出太多行,这会导致浏览器的内存冻结笔记本。虽然不会丢失数据和计算,但表示层会变得无法访问。相反,在将数据传到表示层之前,应尝试聚合数据,获取代表性样本,或者限制返回的记录量,以缓解笔记本内存不足的情况。

 

总结

只需几分钟,您就可以使用 Flink SQL 在Amazon Kinesis Data Analytics Studio 中查询数据流并创建数据管道。在本文中,我们讨论了多种不同的查询数据流的方法,Apache Flink SQL 文档还提供了大量的其他示例。

您可以将这些样本传入自己的Amazon Kinesis Data Analytics Studio 笔记本中,然后在自己的流数据上试用! 务必让 AWS 了解您对这项新功能的体验,我们期待看到用户使用Amazon Kinesis Data Analytics Studio 从数据中获得见解。

 

本篇作者

Jeremy Ber

在过去 5 年中一直从事于遥测数据领域,担任软件工程师、机器学习工程师,最近还担任数据工程师。过去,Jeremy 支持并构建了每天流式传输 TB 级数据的系统,并实时处理复杂的机器学习算法。在 亚马逊云科技,他是解决方案架构师和流数据处理专家,为 Managed Streaming for Kafka (Amazon MSK) 和 Amazon Kinesis 提供支持。