Finding Similar Items with Amazon Elastic MapReduce, Python, and Hadoop Streaming

Data Wrangling blogger and AWS developer Peter Skomoroch gives us an introduction to Amazon Elastic MapReduce. Peter Skomoroch is a consultant at


Submitted By: Jai@AWS
AWS Products Used: Amazon Elastic MapReduce
Language(s): Python
Created On: April 02, 2009


Introduction

Amazon Elastic MapReduce is a web service that enables businesses, researchers, data analysts, and developers to easily and cost-effectively process vast amounts of data. It utilizes a hosted Hadoop framework running on the web-scale infrastructure of Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3). This sample application uses Amazon Elastic MapReduce to run a Multiple Step JobFlow that calculates pairwise similarity in a large database of items. In this example, we'll apply the sample code to music and film recommendations, but the example could potentially be run on other datasets such as document term counts, product sales, or website logs. This article assumes some familiarity with MapReduce and Hadoop Streaming. Python and Hadoop Streaming were used to make the algorithm code as clear as possible, better performance can be obtained by porting the example code to Java.

Running an Amazon Elastic MapReduce Streaming Job from the AWS Management Console

Before we get to the main similarity computation, we will take a detour to learn more about our dataset using the AWS Management Console. The AWS Management Console makes tasks that involve a single map and reduce step simple to execute. We will test drive the console by running a Python streaming job from our code sample on the May 2005 Audioscrobbler dataset to analyze user listening statistics. The Audioscrobbler data contains the number of times each user listened to a particular artist. This means that, unlike Netflix star ratings, Audioscrobbler ratings are "implicit". The assumption is that the more someone listens to an artist, the higher they would rate them.

The Audioscrobbler input data has the following format:

user_artist_data.txt
   3 columns:  userid artistid playcount
   2 4164  23
   3 4164  4
   2  1002471 1
   3  1002471 643
   4  1000324 1
   artist_data.txt
   2 columns:  artistid artist_name
   4164    Britney Spears
   1002471  Elton John
   1000324 Guns N' Roses

Here is the Python code for the streaming mapper:

user_count_mapper.py
import  sys
for  line in sys.stdin:
(user, item, rating) = line.strip().split()
print  "LongValueSum:%s\t1" % user
 

Create a Streaming JobFlow

At this point, we have logged into the console and clicked "Create JobFlow". Now we need to decide what to name our job or select a sample application:

Specify the Job Parameters

The next step involves filling in a number of parameters. For the sample application, most of these are already filled in for you. See the Hadoop Streaming 18.3 Documentation for more details about the Extra Args you can supply to Hadoop.

To count the number of artists listened to by each user we used the following parameters:

   -input  elasticmapreduce/samples/similarity/lastfm/input 
   -output aws-hadoop/lastfm/output/user-counts 
   -mapper  elasticmapreduce/samples/similarity/user_count_mapper.py 
   -reducer aggregate

Remember to replace the default "yourbucket" with a desired Amazon S3 output path, and ensure that the path doesn't exist before the job runs.

Configure the number and type of instances

For this job we will just stick with the default of 4 small instances which will cost $0.40 per hour. When running larger JobFlows, I usually use a cluster of c1.medium instances.

Review the Job Details and Start the Job

Everything looks correct, so we kick off the job and wait for it to complete. By default, the Amazon Elastic MapReduce cluster you launch will shut itself down when the job is complete and the output files are copied to Amazon S3. You can monitor the job status using the dashboard, the API, or by using FoxyProxy to monitor the Hadoop Web UI as described in the Amazon Elastic MapReduce documentation. Hadoop logs are also uploaded to Amazon S3 if you specify a path in the Job parameters.

Fetch the Results from Amazon S3

When our job is complete, we can download the output from the Amazon S3 bucket we indicated and analyze the results. For small files, I usually just use S3Fox to pull the output down to my local machine.

Our job produced a set of files named part-0000*, which together contain the number of unique artists listened to by each user. We can cat these together and use SciPy to run some simple statistics on the aggregated data (NumPy and SciPy are also preinstalled on the Amazon Elastic MapReduce AMIs if you want to use these modules in streaming jobs):

$ cd user-counts/
   $ ls
   part-00000       part-00002      part-00004      part-00006
   part-00001       part-00003      part-00005      
   $ cat part-0000* >  user_counts.txt
 >>> from numpy import *
 >>> from scipy import stats
 >>> data = loadtxt('user_counts.txt',  unpack=True)
 >>> shape(data)
   (2, 147160)
 >>> max(data[1,:])
   3331.0
 >>> stats.scoreatpercentile(data[1,:], 25)
   29.0
 >>> stats.scoreatpercentile(data[1,:], 50)
   79.0
 >>> stats.scoreatpercentile(data[1,:], 99)
   695.0
 >>> stats.scoreatpercentile(data[1,:],  99.99)
   1989.443899999198

The version of the Audioscrobbler dataset we processed has been cleaned to remove bad records and filtered to contain only artists with at least 100 unique users. The resulting dataset contains 147160 total users, and the maximum number of artists listened to by one user is 3331. The function stats.scoreatpercentile tells us that 50% of users listened to less than 79 artists.

Using MapReduce to Compute Item Similarity

The item similarity sample application will use a MapReduce approach to compute the pairwise similarity of the items in the Audioscrobbler dataset using Python streaming, where similarity is measured by the Pearson correlation coefficient of the item ratings for the subset of users who rated both items. We won't go into all the details of the MapReduce algorithm or the similarity calculations here, but each step of the algorithm is documented in the Python code sample.

Pearson similarity in MapReduce won't win the Netflix Prize, but it will quickly generate an item similarity matrix and baseline recommendations for a number of different types of data. With the right parameter settings, you should be able to throw more data at the algorithm and scale the computation by adding compute nodes. If you have serious performance demands it would probably be worthwhile to port the Python streaming example code over to Java or C++ and take advantage of other Hadoop optimizations we won't cover in this article (including Hadoop sequence files and Combiners).

Testing the Sample Code Locally on a Small Dataset

One of the nice features of Hadoop Streaming is that you can test your application at the command line on a single machine. A small 7 artist dataset is included with the source code for testing purposes:

Artists in the sample dataset:

   ID       ARTIST                   TOTAL_PLAYS
   1001820 2Pac                    12950
   700     De La  Soul              7243
   1011819 A Tribe Called Quest    7601
   1003557 Garth Brooks            2976
   1000143 Toby Keith              1905
   1012511 Kenny Rogers            1881
   1000418 Mark  Chesnutt           184

Run the following to test locally on a small 7 artist Audioscrobbler dataset:

   $  python similarity.py 
   Running  sample Lastfm data for 7 artists...
   real    0m2.110s
   user    0m2.082s
   sys     0m0.456s
   Top 25  related artists:
   x       y        sim     count   name
   1000143  1000143 1       0       Toby Keith
   1000143  1003557 0.2434  809     Garth Brooks
   1000143  1000418 0.1068  120     Mark Chesnutt
   1000143  1012511 0.0758  237     Kenny Rogers
   1000418  1000418 1       0       Mark Chesnutt
   1000418  1000143 0.1068  120     Toby Keith
   1000418  1003557 0.056   114     Garth Brooks
   1000418 1012511 0.0385  50       Kenny Rogers
   1001820  1001820 1       0       2Pac
   1001820  1011819 0.1391  2428    A Tribe Called Quest
   1001820  700     0.1012  2250     De La Soul
   1001820  1003557 0.0629  878     Garth Brooks
   1003557  1003557 1       0       Garth Brooks
   1003557  1000143 0.2434  809     Toby Keith
   1003557  1012511 0.1217  360     Kenny Rogers
   1003557  1001820 0.0629  878     2Pac
   1011819  1011819 1       0       A Tribe Called Quest
   1011819  700     0.2972  3050     De La Soul
   1011819  1001820 0.1391  2428    2Pac
   1011819  1003557 0.0298  288     Garth Brooks
   1012511  1012511 1       0       Kenny Rogers
   1012511  1003557 0.1217  360     Garth Brooks
   1012511  1000143 0.0758  237     Toby Keith
   1012511  1001820 0.0543  468     2Pac
   700     700     1        0       De La Soul
   700     1011819 0.2972  3050     A Tribe Called Quest
   700     1001820 0.1012  2250    2Pac
   700      1000143 0.0479  114     Toby Keith

The main method of the Python script is actually executing the following MapReduce chain:

$ cat input/sample_user_artist_data.txt |  ./similarity.py mapper1  \
| sort | ./similarity.py reducer1 > artist_playcounts.txt
$ cat input/sample_user_artist_data.txt |  ./similarity.py mapper2 log | sort \
|./similarity.py reducer2 | ./similarity.py mapper3  100 artist_playcounts.txt  \
|sort |./similarity.py reducer3 147160 >  artist_similarities.txt
$ cat artist_similarities.txt  | ./similarity.py mapper4 20 | sort | \
./similarity.py  reducer4 3 artist_data.txt > related_artists.tsv 

You can also inspect the intermediate output of each stage:

$ cat input/sample_user_artist_data.txt |  ./similarity.py mapper2 log \
| sort |./similarity.py reducer2 | tail
8390     1|1|1003557,1
8619     1|1|1011819,1
8816     1|2|1001820,2
8873     1|2|700,2
9025     2|6|1011819,4 700,2
9037     1|1|1011819,1
9059     2|4|1001820,1 700,3
9711     2|3|1001820,1 700,2
9838     1|1|700,1
9876    1|3|1011819,3

Running JobFlows with Multiple Steps on Amazon EC2 Using the Amazon Elastic MapReduce API

The AWS Management Console is great for single stage jobs, but the Amazon Elastic MapReduce API is much more powerful if we need to run a complex MapReduce algorithm involving multiple steps, add steps at runtime, or have JobFlows that need to be run automatically by another application.

To run our item similarity code for the Last.fm data on Amazon Elastic MapReduce, we will use the Ruby CLI client supplied by Amazon to launch a JobFlow based on a JSON JobFlow description. If you already launch multiple stage Hadoop jobs using bash scripts, you will find it easy to translate your existing scripts. Make sure you have downloaded the Ruby client and have configured your environment as described in the Amazon Elastic MapReduce Documentation.

To run the example on the entire Audioscrobbler dataset, download the sample code and use the json file lastfm_jobflow.json with the Ruby CLI.lastfm_jobflow.json uses '' as replaceable tokens for the Amazon S3 bucketnames. The Ruby CLI supports token substitution in json file using the --param option. Remember that the output path should not already exist on Amazon S3. When submitted, this JobFlow will launch 10 c1.medium instances and begin processing the data as described in Step 1 of the JobFlow.

We can launch the JobFlow as follows using the Ruby client and get back a JobFlowId:

$ ./elastic-mapreduce --create --name 'Last FM  JobFlow' --num-instances 10 --instance-type c1.medium --json ~/similarity/lastfm_jobflow.json  —-param '=yourbucketname'
 Created job flow j- I114OYCUSTB1

We can check on the status of this Job in the AWS Management Console, or by using the API. The DescribeJobFlows command can be passed the id returned from RunJobFlow to check on the Job Status (the response below is truncated to save space). Within a few minutes, our job will be running on a 10 node cluster.

$ ./elastic-mapreduce --describe -j j-I114OYCUSTB
{
  "JobFlows": [
    {
      "Name": "lastfm_large_jobflow_50.json",
      "LogUri": "s3n:\/\/aws-hadoop\/lastfm\/logs\/",
      "ExecutionStatusDetail": {
        "EndDateTime": null,
        "StartDateTime": null,
        "LastStateChangeReason": "Starting master",
        "CreationDateTime": 1238499756.0,
        "State": "STARTING"
      },
      "Steps": [
        {
          "ExecutionStatusDetail": {
            "EndDateTime": null,
            "StartDateTime": null,
            "LastStateChangeReason": null,
            "CreationDateTime": 1238499756.0,
            "State": "PENDING"
          },
          "StepConfig": {
            "Name": "MR Step 1: Count number of ratings for each item, use single reducer",
            "HadoopJarStep": {
              "Jar": "\/home\/hadoop\/contrib\/streaming\/hadoop-0.18-streaming.jar",
              "Args": [
                "-input",
                "s3n:\/\/aws-hadoop\/lastfm\/input\/",
                "-output",
                "s3n:\/\/aws-hadoop\/lastfm\/large-item-counts-50\/",
                "-mapper",
                "python similarity.py mapper1",
                "-reducer",
                "python similarity.py reducer1",
                "-cacheFile",
                "s3n:\/\/aws-hadoop\/similarity.py#similarity.py",
                "-jobconf",
                "mapred.map.tasks=36",
                "-jobconf",
                "mapred.reduce.tasks=1",
                "-jobconf",
                "mapred.compress.map.output=true"
              ],
              "Properties": [
              ],
              "MainClass": null
            },
            "ActionOnFailure": "TERMINATE_JOB_FLOW"
          }
        },
        ....

Sample Code Output: Related Item Recommendations

The Last.fm JobFlow launched in the last section finishes in just under an hour using 10 c1.medium Amazon EC2 instances and costs $2.00 to run. The final output of the process is a tab delimited file related_artists.tsv containing the top 25 related items for each item in our input data. The first column contains the item ID (item_x) and the second column contains the item ID for a similar item (item_y). The remaining columns are the similarity score, the number of users the items had in common, and the name of the similar item.

Similar Artists in the Audioscrobbler Data

Here is a sample of our Audioscrobbler results, showing the top 10  most similar artists for The Cure, Toby Keith, and 2Pac: 
   
   $ grep  ^190 related_movies.tsv  
   item_x   item_y  sim     num      item_y_name
   190     190     1        0       The Cure
   190     464     0.2457   3482    The Smiths
   190     536     0.23     2597    Joy Division
   190     582     0.2219   2947    Depeche Mode
   190     29      0.2063   2393    New Order
   190     235     0.2063   3690    Pixies
   190     980     0.2031   5751    Radiohead
   190     1001910 0.1967  3568     Interpol
   190     314     0.1937   3538    David Bowie
   190     1008486 0.1932  789      Siouxsie and the Banshees
   190     1001647 0.1922  3749     The Smashing Pumpkins
   
   1000143  1000143 1       0       Toby Keith
   1000143  1001001 0.2279  294     Tim McGraw
   1000143  1000414 0.1864  276     Kenny Chesney
   1000143  1000410 0.184   164     George Strait
   1000143  1100370 0.1706  118     Montgomery Gentry
   1000143  1000088 0.1674  249     Alan Jackson
   1000143  1000910 0.1595  193     Lonestar
   1000143  1003557 0.157   238     Garth Brooks
   1000143  1238925 0.1546  191     Brooks & Dunn
   1000143  1062331 0.1532  225     Brad Paisley
   1000143  1000655 0.1323  100     Tracy Byrd
   
   1001820  1001820 1       0       2Pac
   1001820  931     0.2602  2978     Eminem
   1001820  2815    0.2592  2412     50 Cent
   1001820  830     0.2201  1664     Nas
   1001820  1004029 0.217   1226    Notorious B.I.G.
   1001820  1007615 0.2148  2030    Jay-Z
   1001820  740     0.2089  1298     DMX
   1001820  4606    0.2076  2001     Snoop Dogg
   1001820  1003250 0.2076  1499    Ludacris
   1001820  1812    0.1939  1739     Dr. Dre
   1001820  1300643 0.1926  1347    The Game
 

Similar Movies in the Netflix Prize Data

The same JobFlow was run on an input bucket containing the Netflix Prize data. You will need to download a copy of the data yourself from the Netflix site and upload it to Amazon S3 to run with the sample code. Scripts are provided in the code sample accompanying this article that will convert the Netflix data into a format ready for loading into Hadoop. Here is a sample of the results for a few movies:

   $ grep  ^299 related_movies.tsv  
   item_x   item_y  sim     num      item_y_name     
   299     299     1        0       Bridget Jones's Diary
   299     16948   0.29     8263    Bridget Jones: The Edge of  Reason
   299     4472    0.2231   14818   Love Actually
   299     3320    0.1988   11052   About a Boy
   299     6206    0.1796   18567   My Big Fat Greek Wedding
   299     2000    0.1777   5864    Four Weddings and a Funeral
   299     15058   0.1761   9863    Notting Hill
   299     2660    0.163    10476   When Harry Met Sally
   299     1470    0.1596   12726   Bend It Like Beckham
   299     17308   0.1584   12998   Legally Blonde
   299     1754    0.1581   7171    Sixteen Candles
   
   8846    8846    1        0       Election
   8846    5226    0.2467   12409   Rushmore
   8846    2122    0.2204   14948   Being John Malkovich
   8846    5614    0.2092   12545   Best in Show
   8846    14382   0.2075   11698   Raising Arizona
   8846    8782    0.2045   17208   The Royal Tenenbaums
   8846    5237    0.2034   3140    To Die For
   8846    12232   0.1935   15496   Lost in Translation
   8846    3355    0.1927   3367    The Ice Storm
   8846    571     0.189    17640   American Beauty
   8846    12084   0.187    13274   Adaptation
   
   8782    8782    1        0       The Royal Tenenbaums
   8782    6690    0.3759   1301    The Royal Tenenbaums: Bonus  Material
   8782    12232   0.3712   47207   Lost in Translation
   8782    5226    0.3689   22545   Rushmore
   8782    1719    0.3602   14804   The Life Aquatic with  Steve Zissou
   8782    1865    0.3224   28605   Eternal Sunshine of the Spotless  Mind
   8782    7904    0.3216   20349   Punch-Drunk Love
   8782    12084   0.3211   34926   Adaptation
   8782    2122    0.3026   34167   Being John Malkovich
   8782    14274   0.28     14137   I Heart Huckabees
   8782    15425   0.277    24183   The Big Lebowski
   
   9940    9940    1        0       Night of the Living Dead
   9940    10627   0.3306   2100    Dawn of the Dead
   9940    3168    0.2498   1797    Evil Dead 2: Dead by Dawn
   9940    15465   0.2277   2363    Friday the 13th
   9940    8627    0.2115   1835    The Evil Dead
   9940    17004   0.2047   2111    Halloween
   9940     9368    0.2015  1594     The Texas Chainsaw Massacre
   9940    15597   0.2002   984     Day of the Dead
   9940    16793   0.198    3266    The Exorcist
   9940    4402    0.1943   2725    A Nightmare on Elm Street
   9940    10832   0.1935   2914    Psycho

Resources

Pete Skomoroch is a consultant at Data Wrangling in Arlington, VA where he mines large datasets to solve problems in search, finance, and recommendation systems.