AWS Big Data Blog

Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR

Veronika Megler, Ph.D., is a Senior Consultant with AWS Professional Services

We are surrounded by more and more sensors – some of which we’re not even consciously aware. As sensors become cheaper and easier to connect, they create an increasing flood of data that’s getting cheaper and easier to store and process.

However, sensor readings are notoriously “noisy” or “dirty”. To produce meaningful analyses, we’d like to identify anomalies in the sensor data and remove them before we perform further analysis. Or we may wish to analyze the anomalies themselves, as they may help us understand how our system really works or how our system is changing. For example, throwing away (more and more) high temperature readings in the Arctic because they are “obviously bad data” would cause us to miss the warming that is happening there.

The use case for this post is from the domain of road traffic: freeway traffic sensors. These sensors report three measures (called “features”): speed, volume, and occupancy, each of which are sampled several times a minute (see “Appendix: Measurement Definitions” at the end of this post for details on measurement definitions). Each reading from the sensors is called an observation. Sensors of different types (radar, in-road, Bluetooth) are often mixed in a single network and may be installed in varied configurations. For in-road sensors, there’s often a separate sensor in each lane; in freeways with a “carpool” lane, that lane will have different traffic characteristics from the others. Different sections of the freeway may have different traffic characteristics, such as rush hour on the inbound vs. outbound side of the freeway.

Thus, anomaly detection is frequently an iterative process where the system, as represented by the data from the sensors, must first be segmented in some way and “normal” characterized for each part of the system, before variations from that “normal” can be detected. After these variations or anomalies are removed, we can perform various analyses of the cleaned data such as trend analysis, model creation, and predictions. This post describes how two popular and powerful open-source technologies, Spark and Hive, were used to detect anomalies in data from a network of traffic sensors. While it’s based on real usage (see “References” at the end of this post), here you’ll work with similar, anonymized data.

The same characteristics and challenges apply to many other sensor networks. Specific examples I’ve worked with include weather stations, such as Weather Underground (www.wunderground.com), that report temperature, air pressure, humidity, wind and rainfall, amongst other things; ocean observatories such as CMOP (http://www.stccmop.org/datamart/observation_network) that collect physical, geochemical and biological observations; and satellite data from NOAA (http://www.nodc.noaa.gov/).

Detecting anomalies

An anomaly in a sensor network may be a single variable with an unreasonable reading (speed = 250 m.p.h.; for a thermometer, air temperature = 200F). However, each traffic sensor reading has several features (speed, volume, occupancy). There can be situations where each reading itself has a reasonable value, but the combination itself is highly unlikely (an anomaly). For traffic sensors, a speed of more than 100 m.p.h. is possible during times of low congestion (that is, low occupancy and low volume) but extremely unlikely during a traffic jam.

Many of these “valid” or “invalid” combinations are situational, as is the case here. Common combinations often have descriptive terms, such as “traffic jam”, “congested traffic”, or “light traffic”. These terms are representative of a commonly seen combination of characteristics, which would be represented in the data as a cluster of observations.

So, to detect anomalies: First, identify the common situations (as represented by a large cluster of similar combinations of features), and then identify observations that are sufficiently different from those clusters. You essentially apply two methods from basic statistics: clustering using the most common algorithm, k-means. Then, measure the distance from each observation to the closest cluster, and classify those “far away” as being anomalies. (Note that other anomaly detection techniques exist, some of which could be used against the same data, but would reflect a different model or understanding of the problem.)

This post walks through the three major steps:

  1. Clustering the data.
  2. Choosing the number of clusters.
  3. Detecting probable anomalies.

For the project, you process the data using Spark, Hive, and Hue on an Amazon EMR cluster, reading input data from an Amazon S3 bucket.

Clustering the data

To perform k-means clustering, you first need to know how many clusters exist in the data. However, in most cases, as is true here, you don’t know the “right” number to use. A common solution is to repeatedly cluster the data, each time using a different number (“k”) of clusters. For each “k”,  calculate a metric: the sum of the squared distance of each point from its closest cluster center, known as the Within Set Sum of Squared Error (WSSSE). (My code extends this sample.) The smaller the WSSSE, the better your clustering is considered to be – within limits, as more clusters will almost always give a smaller WSSSE but having more clusters may distract rather than add to your analysis.

Here, the input data is a CSV format file stored in an S3 bucket. Each row contains a single observation taken by a specific sensor at a specific time, and consists of 9 numeric values. There are two versions of the input:

s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinput/, with 24M rows

s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/, an extract with 50,000 rows

In this post, I show how to run the programs here with the smaller input. However, the exact same code runs over the 24M row input. Here’s the Hive SQL definition for the input:

CREATE EXTERNAL TABLE sensorinput (
       highway int,		-- highway id
       sensorloc int,		-- one sensor location may have 
       -- multiple sensors, e.g. for different highway lanes
       sensorid int, 		-- sensor id
       dayofyear bigint,		-- yyyyddd 
       dayofweek bigint,		-- 0=Sunday, 1=Monday, etc 
       time decimal(10,2), 	-- seconds since midnight
				-- e.g. a value of 185.67 is 3:05:67 a.m.
       volume int, 		-- a count
       speed int, 			-- average, in m.p.h.
       occupancy int		-- a count
)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinput/';

Start an EMR cluster in us-west-2 (where this bucket is located), specifying Spark, Hue, Hive, and Ganglia. (For more information, see Getting Started: Analyzing Big Data with Amazon EMR.)  I’ve run the same program in two different clusters: a small cluster with 1 master and 2 core nodes, all m3.xlarge; and a larger cluster, with 1 master and 8 core nodes, all m4.mxlarge.

Spark has two interfaces that can be used to run a Spark/Python program: an interactive interface, pyspark, and batch submission via spark-submit. I generally begin my projects by reviewing my data and testing my approach interactively in pyspark, while logged on to the cluster master. Then, I run my completed program using spark-submit (see also Submitting User Applications with spark-submit). After the program is ready to operationalize, I start submitting the jobs as steps to a running cluster using the AWS CLI for EMR or from a script such as a Python script using Boto3 to interface to EMR, with appropriate parameterization.

I’ve written two PySpark programs: one to repeatedly cluster the data and calculate the WSSSE using different numbers of clusters (kmeanswsssey.py); and a second one (kmeansandey.py) to calculate the distances of each observation from the closest cluster. The other parts of the anomaly detection—choosing the number of clusters to use, and deciding which observations are the outliers—are performed interactively, using Hue and Hive. I also provide a file (traffic-hive.hql), with the table definitions and sample queries.

For simplicity, I’ll describe how to run the programs using spark-submit while logged on to the master instance console.

To prepare the cluster for executing your programs, install some Python packages:

sudo yum install python-numpy python-scipy -y

Copy the programs from S3 onto the master node’s local disk; I often run this way while I’m still editing the programs and experimenting with slightly different variations:

aws s3 cp s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/code/kmeanswsssey.py /home/hadoop
aws s3 cp s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/code/kmeansandey.py /home/hadoop
aws s3 cp s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/code/traffic-hive.hql /home/hadoop  

My first PySpark program (kmeanswsssey.py) calculates WSSSE repeatedly, starting with 1 cluster (k=1), then for 2 clusters, and so on, up to some maximum k that you define. It outputs a CSV file; for each k, it appends a set of lines containing the WSSSE and some statistics that describe each of the clusters. This program takes 3 arguments: the input file location, the maximum k to use, and a prefix to prepend to the output file for this run for when I’m testing multiple variations:  <infile> <maxk> <runId> <outfile>. For example:

spark-submit /home/hadoop/kmeanswsssey.py s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/ 10 run1 s3:///sensorclusterssmall/

When run on the small cluster with the small input, this program took around 5 minutes. The same program, run on the 24M row input on the larger cluster, took 2.5 hours. Running the large input on the smaller cluster produces correct results, but takes over 24 hours to complete.

Choosing the number of clusters

Next, review the clustering results and choose the number of clusters to use for the actual anomaly detection.

A common and easy way to do that is to graph the WSSSE calculated for each k, and to choose “the knee in the curve”. That is, look for a point where the total distance has dropped sufficiently that increasing the number of clusters does not drop the WSSSE by much. If you’re very lucky, each cluster has characteristics that match your mental model for the problem domain such as low speed, high occupancy, and high volume, matching “congested traffic”.

Here you use Hue and Hive, conveniently selected when you started the cluster, for data exploration and simple graphing. In Hue’s Hive Query Editor, define a table that describes the output file you created in the previous step. Here, I’m pointing to a precomputed version calculated over the larger dataset:

CREATE EXTERNAL TABLE kcalcs (
     run string, 
     wssse decimal(20,3),
     k decimal, 
     clusterid decimal, 
     clustersize decimal, 
     volcntr decimal(10,1),
     spdcntr decimal(10,1),
     occcntr decimal(10,1),
     volstddev decimal(10,1),
     spdstddev decimal(10,1),
     occstddev decimal(10,1)
)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorclusters/'
tblproperties ("skip.header.line.count"="2");

To decide how many clusters to use for the next step, use Hue to plot a line graph of the number of clusters versus WSSSE. First, select the information to display:

SELECT DISTINCT k, wssse FROM kcalcs ORDER BY k;

In the results panel, choose Chart. Choose the icon representing a line graph, choose “k” for X-Axis and “wssse” for Y-Axis, and Hue builds the following chart. Hover your cursor above a particular bar, and Hue shows the value of the X and Y axis for that bar.

Chart built by Hue

For the “best number of clusters”, you’re looking for the “knee in the curve”: the place where going to a higher number of clusters does not significantly reduce the total distance function (WSSE). For this data, it looks as though around 4 is a good choice, as the gains of going to 5 or 6 clusters looks minimal.

You can explore the characteristics of the identified clusters with the following SELECT statement:

	SELECT DISTINCT k, clusterid, clustersize, volcntr, spdcntr, occcntr, volstddev, spdstddev, occstddev
	FROM kcalcs
	ORDER BY k, spdcntr;

By looking at, for example, the lines for three clusters (k=3), you can see a “congestion” cluster (17.1 m.p.h., occupancy 37.7 cars), a “free-flowing heavy-traffic” cluster, and a “light traffic” cluster (65.2 m.p.h., occupancy 5.1). With k=4, you still see the “congestion” and “fast, light traffic” clusters, but the “free-flowing heavy-traffic” cluster from k=3 has been split into two distinct clusters with very different occupancy. Choose to stay with 4 clusters.

Detecting anomalies

Use the following method with these clusters to identify anomalies:

  1. Assign each sensor reading to the closest cluster.
  2. Calculate the distance (using some measure) for each reading to the assigned cluster center.
  3. Filter for the entries with a greater distance than some chosen threshold.

I like to use Mahalanobis distance as the distance measure, as it compensates for differences in units (speed in m.p.h., while volume and occupancy are counts), averages, and scales of the several features I’m clustering across.

Run the second PySpark program, kmeansandey.py (you copied this program onto local disk earlier, during setup). Give this program the number of clusters to use, decided in the previous step, and the input data. For each input observation, this program does the following:

  1. Identifies the closest cluster center.
  2. Calculates the Mahalanobis distance from this observation to the closest center.
  3. Creates an output record consisting of the original observation, plus the cluster number, the cluster center, and the distance.

The program takes the following parameters: <infile> <k> <outfile>. The output is a CSV file, placed in an S3 bucket of your choice. To run the program, use spark-submit:

spark-submit /mnt/var/kmeansandey.py s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/ 4 s3://<your_bucket>/sensoroutputsmall/

On the small cluster with the small input, this job finished in under a minute; on the bigger cluster, the 24M dataset took around 17 minutes. In the next step, you review the observations that are “distant” from the closest cluster as calculated by that distance calculation. Because these observations are unlike the majority of the other observations, they are considered outliers, and probable anomalies.

Exploring identified anomalies

Now you’re ready to look at the probable anomalies and decide whether they really should be considered anomalies. In Hive, you define a table that describes the output file created in the previous step. Use an output file from the S3 bucket, which contains the original 7 columns (sensorid through occupancy) plus 5 new ones (clusterid through maldist). The smaller dataset’s output is less interesting to explore as it only contains data from one sensor, so I’ve precomputed the output over the large dataset for this exploration. Here is the modified table definition:

CREATE EXTERNAL TABLE sensoroutput (
       highway int,		        -- highway id etc. (as before)
       …
       occupancy int,		    -- a count
       clusterid int,           -- cluster identifier
       volcntr decimal(10,2),	-- cluster center, volume
       spdcntr decimal(10,2),	-- cluster center, speed
       occcntr decimal(10,2),	-- cluster center, occupancy
       maldist decimal(10,2) 	-- Mahalanobis distance to this cluster
)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensoroutput/';

Explore your results. To look at the number of observations assigned to each cluster for each sensor, try the following query:

SELECT sensorid, clusterid, concat(cast(sensorid AS string), '.', cast(clusterid AS string)) AS senclust, count(*) AS howmany, max(maldist) AS dist
FROM sensoroutput
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid; 

The “concat” statement creates a compound column, senclust, that you can use in Hue’s built-in graphing tool to compare the clusters visually for each sensor. For this chart, choose a bar graph, choose the compound column “senclust” for X-Axis and “howmany” for Y-Axis, and Hue builds the following chart.

You can now easily compare the sizes, and the largest and average distances for each cluster across the different sensors. The smaller clusters probably bear investigation; they either represent unusual traffic conditions, or a cluster of bad readings. Note that an additional cluster of known bad readings (0 speed, volume, and occupancy) was identified using a similar process during a prior run; these observations are all assigned to a dummy clusterid of “-1” and have a high maldist.

SELECT clusterid, volcntr, spdcntr, occcntr, count(*) AS num, max(maldist) AS maxmaldist, avg(maldist) AS avgmaldist, 
stddev_pop(maldist) AS stddevmal
FROM sensoroutput
GROUP BY clusterid, volcntr, spdcntr, occcntr
ORDER BY spdcntr;

How do you choose the threshold for defining an observation as an anomaly? This is another black art. I chose 2.5 by a combination of standard practice, discussing the graphs, and looking at how much and which data I’d be throwing away by using that assumption.  To explore the distribution of outliers across the sensors, use a query like the following:

SELECT sensorid, clusterid, count(*) AS num_outliers, avg(spdcntr) AS spdcntr, avg(maldist) AS avgdist
FROM sensoroutput
WHERE  maldist > 2.5
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid;  

The number of outliers varies quite a bit by sensor and cluster. You can explore the 100 entries for sensor 44, cluster 2:

SELECT *
FROM sensoroutput
WHERE  maldist > 2.5
AND sensorid = 44 AND clusterid = 0
ORDER BY maldist desc
LIMIT 100;

The query results show some entries that look reasonable (volume 6, occupancy 1), and others that look less so (volume of 3 and occupancy of 10). Depending on your intended use, you may decide that the number of observations that might not really be anomalies is small enough that you should just exclude all these entries – but perhaps you want to study these entries further to find a pattern, such as that this is a state that often occurs during transition times from one traffic pattern to another.

After you understand your clusters and the flagged “potential anomalies” sufficiently, you can choose which observations to exclude from further analysis.

Conclusion

This post describes anomaly detection for sensor data, and works through a case of identifying anomalies in traffic sensor data. You’ve dived into some of the complexities that comes with deciding which subset of sensor data is dirty or not, and the tools used to ask those questions. I showed how an iterative approach is often needed, with each analysis leading to further questions and further analyses.

In the real use case (see “References” below), we iteratively clustered subsets of the data: for different highways, days of the week, different sensor types, and so on, to understand the data and anomalies. We’ve seen here some of the challenges in deciding whether or not something is an anomaly in the data, or an anomaly in our approach. We used Amazon EMR, along with Apache Spark, Apache Hive and Hue to implement the approach and explore the results, allowing us to quickly experiment with a number of alternative clusters before settling on the combination that we felt best identified the real anomalies in our data.

Now, you can move forward: providing “clean data” to the business users; combining this data with weather, school holiday calendars, and sporting events to identify the causes of specific traffic patterns and pattern changes; and then using that model to predict future traffic conditions.

Appendix: Measurement Definitions

Volume measures how many vehicles have passed this sensor during the given time period. Occupancy measures the number of vehicles at the sensor at the measurement time. The combination of volume and occupancy gives a view of overall traffic density. For example: if the traffic is completely stopped, a sensor may have very high occupancy – many vehicles sitting at the sensor – but a volume close to 0, as very few vehicles have passed the sensor. This is a common circumstance for sensors at freeway entrances that limit freeway entry, often via lights that only permit one car from one lane to pass every few seconds.

Note that different sensor types may have different capabilities to detect these situations, such as radar vs. in-road sensors, and different sensors types or models may have different defaults for how they report various situations. For example, “0,0,0” may mean no traffic, or known bad data, or assumed bad data based on hard limits, such as traffic above a specific density (ouch!). Thus sensor type, capability, and context are all important factors in identifying “bad data”. In this study, the analysis of which sensors were “similar enough” for the data to be analyzed together was performed prior to data extract. The anomaly detection steps described here were performed separately for each set of similar sensors, as defined by the pre-analysis.

References

V. M. Megler, K. A. Tufte, and D. Maier, “Improving Data Quality in Intelligent Transportation Systems (Technical Report),” Portland  Or., Jul-2015 [Online]. Available: http://arxiv.org/abs/1602.03100

If you have questions or suggestions, please leave a comment below.

—————————–

Related

Large-Scale Machine Learning with Spark on Amazon EMR