Amazon Web Services ブログ

Amazon Kinesis Analytics でのリアルタイムのホットスポット検知

AWS では本日、Amazon Kinesis Data Analytics でストリーミングデータの「ホットスポット」を検知するための新しい機械学習機能をリリースしました。AWS は Kinesis Data Analytics を 2016 年8月に公開して以来、機能を追加し続けています。すでにご存じかもしれませんが、Kinesis Data Analytics はストリーミングデータ用の完全マネージド型リアルタイム処理エンジンで、データから意味を引き出し、結果を Kinesis Data FirehoseKinesis Data Streams、または AWS Lambda 関数にさえも出力する SQL クエリを記述することができます。新しい HOTSPOT 関数は、お客様がストリーミングベースの教師なし機械学習アルゴリズムを活用することを可能にする、Kinesis の既存機械学習能力を高めます。これらの能力を利用するために、お客様がデータサイエンスや機械学習の専門家である必要はありません。

ホットスポット

HOTSPOTS 関数は、複雑な機械学習モデルを明示的に構築して訓練することなく、データの比較的高密度な領域を特定するために使用できる新しい Kinesis Data Analytics SQL 関数です。早急な対応が必要なデータのサブセクションを特定し、Kinesis Data ストリーム、Firehose 配信ストリームにストリーミングする、または AWS Lambda 関数を呼び出すことによって、プログラム的にアクションを実行することができます。

これが業務を容易にし得る非常に素晴らしいシナリオが数多くあります。交通渋滞に関する時空間データを伝えるライドシェアプログラムや自動化された車両運行、または多数のサーバーが過熱状態になり始め、HVAC 問題を示しているデータセンターを想像してみてください。 HOTSPOTS は、時空間データ以外にも数多くの問題領域にわたる適用が可能です。

この関数はいくつかのシンプルな構文に従い、 DOUBLEINTEGERFLOATTINYINTSMALLINTREAL、および BIGINT データ型を受け入れます。

HOTSPOT 関数はカーソルを入力として受け取り、ホットスポットを説明する JSON 文字列を返します。これは、例を使うと理解しやすいでしょう。

ホットスポットを検知するための Kinesis Data Analytics の使用

タクシーの乗車地と降車地を追跡するニューヨーク市タクシー・リムジン委員会からのシンプルなデータセットを見てみましょう。 このデータのほとんどはすでに S3 上にあり、s3://nyc-tlc/ で一般に公開されています。 Kinesis Data Analytics にデータを供給する、タクシー記録の Kinesis Data Stream をロードするために小さな python スクリプトを作成します。最後に、Kibana を使った可視化のために、すべてを Amazon Elasticsearch Service クラスターに接続された Kinesis Data Firehose に出力します。5 年間のニューヨーク生活から、このデータにホットスポットがいくつか発見されることは了承済です。

まず、入力用の Kinesis ストリームを作成し、それにニューヨーク市のタクシー乗車データを送信し始めます。私は CSV ファイルの 1 つから読み取りを行うための簡単な python スクリプトを書き、boto3 を使って記録を Kinesis にプッシュしました。この記録は、あなたに都合の良い方法で投入できます。

import csv
import json
import boto3
def chunkit(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
    reader = csv.DictReader(f)
    records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
    for chunk in records:
        kinesis.put_records(StreamName="TaxiData", Records=chunk)

次に、Kinesis Data Analytics アプリケーションを作成し、タクシーデータの入力ストリームをソースとして追加します。

その後、スキーマを自動で検知します。

ここで、ホットスポットを検知するための簡単な SQL スクリプトを作成し、それをアプリケーションの Real Time Analytics セクションに追加します。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "pickup_longitude" DOUBLE,
    "pickup_latitude" DOUBLE,
    HOTSPOTS_RESULT VARCHAR(10000)
); 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
        TABLE(HOTSPOTS(
            CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            1000,
            0.013,
            20
        )
    );


この HOTSPOTS 関数は入力ストリーム、ウィンドウサイズ、スキャン範囲、およびホットスポットとしてカウントするポイントの最小数を受け取ります。これらの値はアプリケーションに依存しますが、希望する結果を得るまでコンソール内で簡単に操作することができます。ドキュメントには、パラメーター自体に関する詳細が記載されています。 HOTSPOTS_RESULT は、ホットスポット四方にバウンディングボックスを描くことを可能にする便利な JSON を返します。

{
  "hotspots": [
    {
      "density": "elided",
      "minValues": [40.7915039, -74.0077401],
      "maxValues": [40.7915041, -74.0078001]
    }
  ]
}

希望する結果を得られたら、スクリプトを保存して、アプリケーションを Amazon Elastic Search Service Firehose 配信ストリームに接続できます。記録を地理的な作業により適したフォーマットに変換するために、Firehouse で中間的な Lambda 関数を実行することも可能です。その後、Elasticsearch でマッピングを更新して、ホットスポットオブジェクトを Geo-Shape としてインデックスできます。

最後に、Kibana に接続して結果を可視化できます。

マンハッタンは相当賑わっているようですね!

今すぐ利用可能
この機能は、Kinesis Data Analytics の全既存リージョンで今すぐご利用いただけます。私は、この機能が多くのアプリケーションに即時的な価値をもたらす非常に興味深い Kinesis Data Analytics の新機能であると考えています。この機能を使って何を構築するか、ツイッターやコメントでお知らせください!

Randall