亚马逊AWS官方博客

Amazon Kinesis Analytics 中的实时热点检测

今天,我们将在 Amazon Kinesis Data Analytics 中发布新的 Machine Learning 功能,用以检测流式数据中的“热点”。2016 年 8 月,我们推出了 Kinesis Data Analytics,并在此后不断添加功能。您或许已经知道,Kinesis Data Analytics 是一种完全托管的流式数据实时处理引擎,可让您编写 SQL 查询以便从您的数据中推演含义,并将结果输出到 Kinesis Data FirehoseKinesis Data Streams 甚至是 AWS Lambda。全新 HOTSPOT 函数将补充 Kinesis 中的现有 Machine Learning 功能,允许客户利用基于无人监管流的 Machine Learning 算法。客户不需要是数据科学或 Machine Learning 领域的专家也能得心应手地利用这些功能。

热点

HOTSPOTS 函数是一种全新的 Kinesis Data Analytics SQL 函数,可供您用于识别数据中相对密集的区域,而不必显式构建和训练复杂的 Machine Learning 模型。您可以识别需要立即关注的数据子节点,并通过将热点流式传输到 Kinesis 数据流、Firehose 传输流或者通过调用 AWS Lambda 函数,以编程方式执行操作。

这一函数有很多超酷的应用场合,可以让您的操作更轻松。想象一下,传达交通拥堵情况时空数据的骑乘共享项目或无人驾驶车辆车队,或者某个数据中心内的大量服务器过热,说明暖通空调 (HVAC) 系统存在问题。 HOTSPOTS 不仅限于时空数据,还可以应用于众多问题领域。

该函数遵循简单的语法,接受 DOUBLEINTEGERFLOATTINYINTSMALLINTREALBIGINT 数据类型。

HOTSPOT 函数获取游标作为输入,并返回描述热点的 JSON 字符串。通过示例可以更轻松地理解这一函数。

利用 Kinesis Data Analytics 检测热点

我们以纽约市出租车和豪华轿车委员会的一个简单数据集为例,该数据集用于跟踪黄色出租车的接客和送客地点。 其中大多数数据已经存储在 S3 中,可通过 s3://nyc-tlc/ 公开访问。我们将创建一个小型 Python 脚本,为 Kinesis 数据流载入出租车数据,并将这些数据提供给 Kinesis Data Analytics。最后,我们会将此信息输出到与 Amazon Elasticsearch Service 集群相连的 Kinesis Data Firehose,以通过 Kibana 呈现。根据我自己在纽约 5 年的生活经历,我们或许能在这些数据中找到一两个热点。

首先,我们将创建一个 Kinesis 输入流,并开始将我们的纽约市出租车载客数据发送到此流中。我编写了一个简单的 Python 脚本来读取其中一个 CSV 文件,并使用 boto3 将记录推送到 Kinesis。您可以按照自己的方式处理记录。

import csv
import json
import boto3
def chunkit(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
    reader = csv.DictReader(f)
    records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
    for chunk in records:
        kinesis.put_records(StreamName="TaxiData", Records=chunk)

接下来,我们将创建一个 Kinesis Data Analytics 应用程序,并将我们的输入流与出租车数据一起添加为源。

随后我们要自动检测 schema。

现在,我们将创建一个简单的 SQL 脚本来检测热点,并将其添加到应用程序的实时分析部分。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "pickup_longitude" DOUBLE,
    "pickup_latitude" DOUBLE,
    HOTSPOTS_RESULT VARCHAR(10000)
); 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
        TABLE(HOTSPOTS(
            CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            1000,
            0.013,
            20
        )
    );


我们的 HOTSPOTS 函数将获取输入流、窗口大小、扫描半径以及要计为热点的最小点数。这些值取决于应用程序,但您可以轻松在控制台中修改它们,直至获得所需的结果。文档中提供了有关参数的更多详细信息。 HOTSPOTS_RESULT 会返回一些有用的 JSON,让我们可以在热点周围绘制边界框:

{
  "hotspots": [
    {
      "density": "elided",
      "minValues": [40.7915039, -74.0077401],
      "maxValues": [40.7915041, -74.0078001]
    }
  ]
}

获得所需结果后,即可保存脚本,并将应用程序连接到 Amazon Elastic Search Service Firehose 传输流。我们可以在 Firehose 中运行一个中间 Lambda 函数,将记录转换成更符合地理工作需求的格式。然后可以在 Elasticsearch 中更新映射,以将热点对象的索引编制为 Geo-Shapes。

最后,我们可以连接到 Kibana 并呈现结果。

看起来曼哈顿的交通十分繁忙!

现已推出
此功能现已在提供 Kinesis Data Analytics 的所有区域推出。我认为这是 Kinesis Data Analytics 一项十分有趣的新功能,可以直接帮助许多应用程序创造价值。在 Twitter 或评论中告诉我们您构建的内容!

Randall