Finding Similar Items with Amazon Elastic MapReduce, Python, and Hadoop Streaming
Data Wrangling blogger and AWS developer Peter Skomoroch gives us an introduction to Amazon Elastic MapReduce. Peter Skomoroch is a consultant at
Submitted By: Jai@AWS
AWS Products Used: Amazon Elastic MapReduce
Language(s): Python
Created On: April 02, 2009
Introduction
Amazon Elastic MapReduce is a web service that enables businesses, researchers, data analysts, and developers to easily and cost-effectively process vast amounts of data. It utilizes a hosted Hadoop framework running on the web-scale infrastructure of Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3). This sample application uses Amazon Elastic MapReduce to run a Multiple Step JobFlow that calculates pairwise similarity in a large database of items. In this example, we'll apply the sample code to music and film recommendations, but the example could potentially be run on other datasets such as document term counts, product sales, or website logs. This article assumes some familiarity with MapReduce and Hadoop Streaming. Python and Hadoop Streaming were used to make the algorithm code as clear as possible, better performance can be obtained by porting the example code to Java.
Running an Amazon Elastic MapReduce Streaming Job from the AWS Management Console
Before we get to the main similarity computation, we will take a detour to learn more about our dataset using the AWS Management Console. The AWS Management Console makes tasks that involve a single map and reduce step simple to execute. We will test drive the console by running a Python streaming job from our code sample on the May 2005 Audioscrobbler dataset to analyze user listening statistics. The Audioscrobbler data contains the number of times each user listened to a particular artist. This means that, unlike Netflix star ratings, Audioscrobbler ratings are "implicit". The assumption is that the more someone listens to an artist, the higher they would rate them.
The Audioscrobbler input data has the following format:
user_artist_data.txt 3 columns: userid artistid playcount 2 4164 23 3 4164 4 2 1002471 1 3 1002471 643 4 1000324 1 artist_data.txt 2 columns: artistid artist_name 4164 Britney Spears 1002471 Elton John 1000324 Guns N' Roses
Here is the Python code for the streaming mapper:
user_count_mapper.py import sys for line in sys.stdin: (user, item, rating) = line.strip().split() print "LongValueSum:%s\t1" % user
Create a Streaming JobFlow
At this point, we have logged into the console and clicked "Create JobFlow". Now we need to decide what to name our job or select a sample application:
Specify the Job Parameters
The next step involves filling in a number of parameters. For the sample application, most of these are already filled in for you. See the Hadoop Streaming 18.3 Documentation for more details about the Extra Args you can supply to Hadoop.
To count the number of artists listened to by each user we used the following parameters:
-input elasticmapreduce/samples/similarity/lastfm/input -output aws-hadoop/lastfm/output/user-counts -mapper elasticmapreduce/samples/similarity/user_count_mapper.py -reducer aggregate
Remember to replace the default "yourbucket" with a desired Amazon S3 output path, and ensure that the path doesn't exist before the job runs.
Configure the number and type of instances
For this job we will just stick with the default of 4 small instances which will cost $0.40 per hour. When running larger JobFlows, I usually use a cluster of c1.medium instances.
Review the Job Details and Start the Job
Everything looks correct, so we kick off the job and wait for it to complete. By default, the Amazon Elastic MapReduce cluster you launch will shut itself down when the job is complete and the output files are copied to Amazon S3. You can monitor the job status using the dashboard, the API, or by using FoxyProxy to monitor the Hadoop Web UI as described in the Amazon Elastic MapReduce documentation. Hadoop logs are also uploaded to Amazon S3 if you specify a path in the Job parameters.
Fetch the Results from Amazon S3
When our job is complete, we can download the output from the Amazon S3 bucket we indicated and analyze the results. For small files, I usually just use S3Fox to pull the output down to my local machine.
Our job produced a set of files named part-0000*, which together contain the number of unique artists listened to by each user. We can cat these together and use SciPy to run some simple statistics on the aggregated data (NumPy and SciPy are also preinstalled on the Amazon Elastic MapReduce AMIs if you want to use these modules in streaming jobs):
$ cd user-counts/ $ ls part-00000 part-00002 part-00004 part-00006 part-00001 part-00003 part-00005 $ cat part-0000* > user_counts.txt >>> from numpy import * >>> from scipy import stats >>> data = loadtxt('user_counts.txt', unpack=True) >>> shape(data) (2, 147160) >>> max(data[1,:]) 3331.0 >>> stats.scoreatpercentile(data[1,:], 25) 29.0 >>> stats.scoreatpercentile(data[1,:], 50) 79.0 >>> stats.scoreatpercentile(data[1,:], 99) 695.0 >>> stats.scoreatpercentile(data[1,:], 99.99) 1989.443899999198
The version of the Audioscrobbler dataset we processed has been cleaned to remove bad records and filtered to contain only artists with at least 100 unique users. The resulting dataset contains 147160 total users, and the maximum number of artists listened to by one user is 3331. The function stats.scoreatpercentile tells us that 50% of users listened to less than 79 artists.
Using MapReduce to Compute Item Similarity
The item similarity sample application will use a MapReduce approach to compute the pairwise similarity of the items in the Audioscrobbler dataset using Python streaming, where similarity is measured by the Pearson correlation coefficient of the item ratings for the subset of users who rated both items. We won't go into all the details of the MapReduce algorithm or the similarity calculations here, but each step of the algorithm is documented in the Python code sample.
Pearson similarity in MapReduce won't win the Netflix Prize, but it will quickly generate an item similarity matrix and baseline recommendations for a number of different types of data. With the right parameter settings, you should be able to throw more data at the algorithm and scale the computation by adding compute nodes. If you have serious performance demands it would probably be worthwhile to port the Python streaming example code over to Java or C++ and take advantage of other Hadoop optimizations we won't cover in this article (including Hadoop sequence files and Combiners).
Testing the Sample Code Locally on a Small Dataset
One of the nice features of Hadoop Streaming is that you can test your application at the command line on a single machine. A small 7 artist dataset is included with the source code for testing purposes:
Artists in the sample dataset:
ID ARTIST TOTAL_PLAYS 1001820 2Pac 12950 700 De La Soul 7243 1011819 A Tribe Called Quest 7601 1003557 Garth Brooks 2976 1000143 Toby Keith 1905 1012511 Kenny Rogers 1881 1000418 Mark Chesnutt 184
Run the following to test locally on a small 7 artist Audioscrobbler dataset:
$ python similarity.py Running sample Lastfm data for 7 artists... real 0m2.110s user 0m2.082s sys 0m0.456s Top 25 related artists: x y sim count name 1000143 1000143 1 0 Toby Keith 1000143 1003557 0.2434 809 Garth Brooks 1000143 1000418 0.1068 120 Mark Chesnutt 1000143 1012511 0.0758 237 Kenny Rogers 1000418 1000418 1 0 Mark Chesnutt 1000418 1000143 0.1068 120 Toby Keith 1000418 1003557 0.056 114 Garth Brooks 1000418 1012511 0.0385 50 Kenny Rogers 1001820 1001820 1 0 2Pac 1001820 1011819 0.1391 2428 A Tribe Called Quest 1001820 700 0.1012 2250 De La Soul 1001820 1003557 0.0629 878 Garth Brooks 1003557 1003557 1 0 Garth Brooks 1003557 1000143 0.2434 809 Toby Keith 1003557 1012511 0.1217 360 Kenny Rogers 1003557 1001820 0.0629 878 2Pac 1011819 1011819 1 0 A Tribe Called Quest 1011819 700 0.2972 3050 De La Soul 1011819 1001820 0.1391 2428 2Pac 1011819 1003557 0.0298 288 Garth Brooks 1012511 1012511 1 0 Kenny Rogers 1012511 1003557 0.1217 360 Garth Brooks 1012511 1000143 0.0758 237 Toby Keith 1012511 1001820 0.0543 468 2Pac 700 700 1 0 De La Soul 700 1011819 0.2972 3050 A Tribe Called Quest 700 1001820 0.1012 2250 2Pac 700 1000143 0.0479 114 Toby Keith
The main method of the Python script is actually executing the following MapReduce chain:
$ cat input/sample_user_artist_data.txt | ./similarity.py mapper1 \ | sort | ./similarity.py reducer1 > artist_playcounts.txt $ cat input/sample_user_artist_data.txt | ./similarity.py mapper2 log | sort \ |./similarity.py reducer2 | ./similarity.py mapper3 100 artist_playcounts.txt \ |sort |./similarity.py reducer3 147160 > artist_similarities.txt $ cat artist_similarities.txt | ./similarity.py mapper4 20 | sort | \ ./similarity.py reducer4 3 artist_data.txt > related_artists.tsv
You can also inspect the intermediate output of each stage:
$ cat input/sample_user_artist_data.txt | ./similarity.py mapper2 log \ | sort |./similarity.py reducer2 | tail 8390 1|1|1003557,1 8619 1|1|1011819,1 8816 1|2|1001820,2 8873 1|2|700,2 9025 2|6|1011819,4 700,2 9037 1|1|1011819,1 9059 2|4|1001820,1 700,3 9711 2|3|1001820,1 700,2 9838 1|1|700,1 9876 1|3|1011819,3
Running JobFlows with Multiple Steps on Amazon EC2 Using the Amazon Elastic MapReduce API
The AWS Management Console is great for single stage jobs, but the Amazon Elastic MapReduce API is much more powerful if we need to run a complex MapReduce algorithm involving multiple steps, add steps at runtime, or have JobFlows that need to be run automatically by another application.
To run our item similarity code for the Last.fm data on Amazon Elastic MapReduce, we will use the Ruby CLI client supplied by Amazon to launch a JobFlow based on a JSON JobFlow description. If you already launch multiple stage Hadoop jobs using bash scripts, you will find it easy to translate your existing scripts. Make sure you have downloaded the Ruby client and have configured your environment as described in the Amazon Elastic MapReduce Documentation.
To run the example on the entire Audioscrobbler dataset, download the sample code and use the json file lastfm_jobflow.json with the Ruby CLI.lastfm_jobflow.json uses '
We can launch the JobFlow as follows using the Ruby client and get back a JobFlowId:
$ ./elastic-mapreduce --create --name 'Last FM JobFlow' --num-instances 10 --instance-type c1.medium --json ~/similarity/lastfm_jobflow.json —-param '=yourbucketname' Created job flow j- I114OYCUSTB1
We can check on the status of this Job in the AWS Management Console, or by using the API. The DescribeJobFlows command can be passed the id returned from RunJobFlow to check on the Job Status (the response below is truncated to save space). Within a few minutes, our job will be running on a 10 node cluster.
$ ./elastic-mapreduce --describe -j j-I114OYCUSTB { "JobFlows": [ { "Name": "lastfm_large_jobflow_50.json", "LogUri": "s3n:\/\/aws-hadoop\/lastfm\/logs\/", "ExecutionStatusDetail": { "EndDateTime": null, "StartDateTime": null, "LastStateChangeReason": "Starting master", "CreationDateTime": 1238499756.0, "State": "STARTING" }, "Steps": [ { "ExecutionStatusDetail": { "EndDateTime": null, "StartDateTime": null, "LastStateChangeReason": null, "CreationDateTime": 1238499756.0, "State": "PENDING" }, "StepConfig": { "Name": "MR Step 1: Count number of ratings for each item, use single reducer", "HadoopJarStep": { "Jar": "\/home\/hadoop\/contrib\/streaming\/hadoop-0.18-streaming.jar", "Args": [ "-input", "s3n:\/\/aws-hadoop\/lastfm\/input\/", "-output", "s3n:\/\/aws-hadoop\/lastfm\/large-item-counts-50\/", "-mapper", "python similarity.py mapper1", "-reducer", "python similarity.py reducer1", "-cacheFile", "s3n:\/\/aws-hadoop\/similarity.py#similarity.py", "-jobconf", "mapred.map.tasks=36", "-jobconf", "mapred.reduce.tasks=1", "-jobconf", "mapred.compress.map.output=true" ], "Properties": [ ], "MainClass": null }, "ActionOnFailure": "TERMINATE_JOB_FLOW" } }, ....
Sample Code Output: Related Item Recommendations
The Last.fm JobFlow launched in the last section finishes in just under an hour using 10 c1.medium Amazon EC2 instances and costs $2.00 to run. The final output of the process is a tab delimited file related_artists.tsv containing the top 25 related items for each item in our input data. The first column contains the item ID (item_x) and the second column contains the item ID for a similar item (item_y). The remaining columns are the similarity score, the number of users the items had in common, and the name of the similar item.
Similar Artists in the Audioscrobbler Data
Here is a sample of our Audioscrobbler results, showing the top 10 most similar artists for The Cure, Toby Keith, and 2Pac: $ grep ^190 related_movies.tsv item_x item_y sim num item_y_name 190 190 1 0 The Cure 190 464 0.2457 3482 The Smiths 190 536 0.23 2597 Joy Division 190 582 0.2219 2947 Depeche Mode 190 29 0.2063 2393 New Order 190 235 0.2063 3690 Pixies 190 980 0.2031 5751 Radiohead 190 1001910 0.1967 3568 Interpol 190 314 0.1937 3538 David Bowie 190 1008486 0.1932 789 Siouxsie and the Banshees 190 1001647 0.1922 3749 The Smashing Pumpkins 1000143 1000143 1 0 Toby Keith 1000143 1001001 0.2279 294 Tim McGraw 1000143 1000414 0.1864 276 Kenny Chesney 1000143 1000410 0.184 164 George Strait 1000143 1100370 0.1706 118 Montgomery Gentry 1000143 1000088 0.1674 249 Alan Jackson 1000143 1000910 0.1595 193 Lonestar 1000143 1003557 0.157 238 Garth Brooks 1000143 1238925 0.1546 191 Brooks & Dunn 1000143 1062331 0.1532 225 Brad Paisley 1000143 1000655 0.1323 100 Tracy Byrd 1001820 1001820 1 0 2Pac 1001820 931 0.2602 2978 Eminem 1001820 2815 0.2592 2412 50 Cent 1001820 830 0.2201 1664 Nas 1001820 1004029 0.217 1226 Notorious B.I.G. 1001820 1007615 0.2148 2030 Jay-Z 1001820 740 0.2089 1298 DMX 1001820 4606 0.2076 2001 Snoop Dogg 1001820 1003250 0.2076 1499 Ludacris 1001820 1812 0.1939 1739 Dr. Dre 1001820 1300643 0.1926 1347 The Game
Similar Movies in the Netflix Prize Data
The same JobFlow was run on an input bucket containing the Netflix Prize data. You will need to download a copy of the data yourself from the Netflix site and upload it to Amazon S3 to run with the sample code. Scripts are provided in the code sample accompanying this article that will convert the Netflix data into a format ready for loading into Hadoop. Here is a sample of the results for a few movies:
$ grep ^299 related_movies.tsv item_x item_y sim num item_y_name 299 299 1 0 Bridget Jones's Diary 299 16948 0.29 8263 Bridget Jones: The Edge of Reason 299 4472 0.2231 14818 Love Actually 299 3320 0.1988 11052 About a Boy 299 6206 0.1796 18567 My Big Fat Greek Wedding 299 2000 0.1777 5864 Four Weddings and a Funeral 299 15058 0.1761 9863 Notting Hill 299 2660 0.163 10476 When Harry Met Sally 299 1470 0.1596 12726 Bend It Like Beckham 299 17308 0.1584 12998 Legally Blonde 299 1754 0.1581 7171 Sixteen Candles 8846 8846 1 0 Election 8846 5226 0.2467 12409 Rushmore 8846 2122 0.2204 14948 Being John Malkovich 8846 5614 0.2092 12545 Best in Show 8846 14382 0.2075 11698 Raising Arizona 8846 8782 0.2045 17208 The Royal Tenenbaums 8846 5237 0.2034 3140 To Die For 8846 12232 0.1935 15496 Lost in Translation 8846 3355 0.1927 3367 The Ice Storm 8846 571 0.189 17640 American Beauty 8846 12084 0.187 13274 Adaptation 8782 8782 1 0 The Royal Tenenbaums 8782 6690 0.3759 1301 The Royal Tenenbaums: Bonus Material 8782 12232 0.3712 47207 Lost in Translation 8782 5226 0.3689 22545 Rushmore 8782 1719 0.3602 14804 The Life Aquatic with Steve Zissou 8782 1865 0.3224 28605 Eternal Sunshine of the Spotless Mind 8782 7904 0.3216 20349 Punch-Drunk Love 8782 12084 0.3211 34926 Adaptation 8782 2122 0.3026 34167 Being John Malkovich 8782 14274 0.28 14137 I Heart Huckabees 8782 15425 0.277 24183 The Big Lebowski 9940 9940 1 0 Night of the Living Dead 9940 10627 0.3306 2100 Dawn of the Dead 9940 3168 0.2498 1797 Evil Dead 2: Dead by Dawn 9940 15465 0.2277 2363 Friday the 13th 9940 8627 0.2115 1835 The Evil Dead 9940 17004 0.2047 2111 Halloween 9940 9368 0.2015 1594 The Texas Chainsaw Massacre 9940 15597 0.2002 984 Day of the Dead 9940 16793 0.198 3266 The Exorcist 9940 4402 0.1943 2725 A Nightmare on Elm Street 9940 10832 0.1935 2914 Psycho
Resources
- Amazon Elastic MapReduce Documentation
- Ruby Client for Amazon Elastic MapReduce
- Hadoop Streaming 18.3 Documentation
- Cloudera Hadoop Training Videos
- Cloudera Hadoop Hacks Code
- Pairwise Document Similarity in Large Collections with MapReduce Tamer Elsayed, Jimmy Lin, and Douglas Oard.
- Recommending Jira Issues at Dumbotics
- Running Hadoop MapReduce on Amazon EC2 and Amazon S3 Tom White
- Hadoop: The Definitive Guide Tom White
- Hadoop streaming libraries for use with R, Ruby and Python
- NLTK Hadoop code for running iterative MapReduce steps with Python
- Installing additional Python modules at runtime using Hadoop's distributed cache
- MapReduce: Simplified Data Processing on Large Clusters- Jeffrey Dean and Sanjay Ghemawat
- The Netflix Prize Site and Netflix related links from PyCon
- BellKor Homepage - Papers from the current leaders in the Netflix Competition
Pete Skomoroch is a consultant at Data Wrangling in Arlington, VA where he mines large datasets to solve problems in search, finance, and recommendation systems.