AWS News Blog
Real-Time Hotspot Detection in Amazon Kinesis Analytics
|
September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.
Today we’re releasing a new machine learning feature in Amazon Kinesis Data Analytics for detecting “hotspots” in your streaming data. We launched Kinesis Data Analytics in August of 2016 and we’ve continued to add features since. As you may already know, Kinesis Data Analytics is a fully managed real-time processing engine for streaming data that lets you write SQL queries to derive meaning from your data and output the results to Kinesis Data Firehose, Kinesis Data Streams, or even an AWS Lambda function. The new HOTSPOTS
function adds to the existing machine learning capabilities in Kinesis that allow customers to leverage unsupervised streaming based machine learning algorithms. Customers don’t need to be experts in data science or machine learning to take advantage of these capabilities.
Hotspots
The HOTSPOTS
function is a new Kinesis Data Analytics SQL function you can use to identify relatively dense regions in your data without having to explicitly build and train complicated machine learning models. You can identify subsections of your data that need immediate attention and take action programmatically by streaming the hotspots out to a Kinesis Data stream, to a Firehose delivery stream, or by invoking a AWS Lambda function.
There are a ton of really cool scenarios where this could make your operations easier. Imagine a ride-share program or autonomous vehicle fleet communicating spatiotemporal data about traffic jams and congestion, or a datacenter where a number of servers start to overheat indicating an HVAC issue. HOTSPOTS
is not limited to spatiotemporal data and you could apply it across many problem domains.
The function follows some simple syntax and accepts the DOUBLE
, INTEGER
, FLOAT
, TINYINT
, SMALLINT
, REAL
, and BIGINT
data types.
The HOTSPOTS
function takes a cursor as input and returns a JSON string describing the hotspot. This will be easier to understand with an example.
Using Kinesis Data Analytics to Detect Hotspots
Let’s take a simple data set from NY Taxi and Limousine Commission that tracks yellow cab pickup and drop-off locations. Most of this data is already on S3 and publicly accessible at s3://nyc-tlc/. We will create a small python script to load our Kinesis Data Stream with Taxi records which will feed our Kinesis Data Analytics. Finally, we’ll output all of this to a Kinesis Data Firehose connected to an Amazon Elasticsearch Service cluster for visualization with Kibana. I know from living in New York for 5 years that we’ll probably find a hotspot or two in this data.
First, we’ll create an input Kinesis stream and start sending our NYC Taxi Ride data into it. I just wrote a quick python script to read from one of the CSV files and used boto3 to push the records into Kinesis. You can put the record in whatever way works for you.
import csv
import json
import boto3
def chunkit(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
reader = csv.DictReader(f)
records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
for chunk in records:
kinesis.put_records(StreamName="TaxiData", Records=chunk)
Next, we’ll create the Kinesis Data Analytics application and add our input stream with our taxi data as the source.
Next we’ll automatically detect the schema.
Now we’ll create a quick SQL Script to detect our hotspots and add that to the Real Time Analytics section of our application.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"pickup_longitude" DOUBLE,
"pickup_latitude" DOUBLE,
HOTSPOTS_RESULT VARCHAR(10000)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
TABLE(HOTSPOTS(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
1000,
0.013,
20
)
);
Our HOTSPOTS
function takes an input stream, a window size, scan radius, and a minimum number of points to count as a hotspot. The values for these are application dependent but you can tinker with them in the console easily until you get the results you want. There are more details about the parameters themselves in the documentation. The HOTSPOTS_RESULT
returns some useful JSON that would let us plot bounding boxes around our hotspots:
{ "hotspots": [ { "density": "elided", "minValues": [40.7915039, -74.0077401], "maxValues": [40.7915041, -74.0078001] } ] }
When we have our desired results we can save the script and connect our application to our Amazon Elastic Search Service Firehose Delivery Stream. We can run an intermediate lambda function in the firehose to transform our record into a format more suitable for geographic work. Then we can update our mapping in Elasticsearch to index the hotspot objects as Geo-Shapes.
Finally, we can connect to Kibana and visualize the results.
Looks like Manhattan is pretty busy!
Available Now
This feature is available now in all existing regions with Kinesis Data Analytics. I think this is a really interesting new feature of Kinesis Data Analytics that can bring immediate value to many applications. Let us know what you build with it on Twitter or in the comments!
– Randall