Similar Item Recommendations with Python, Amazon Elastic MapReduce, and Hadoop Streaming

Sample Code & Libraries>Python>Similar Item Recommendations with Python, Amazon Elastic MapReduce, and Hadoop Streaming
Community Contributed Software

  • Amazon Web Services provides links to these packages as a convenience for our customers, but software not authored by an "@AWS" account has not been reviewed or screened by AWS.
  • Please review this software to ensure it meets your needs before using it.

Data Wrangling blogger and AWS developer Peter Skomoroch gives us an introduction to Amazon Elastic MapReduce. In this sample application, we use the Elastic MapReduce Ruby client to run a multiple step Python streaming job that identifies similar artists based on Last.fm Audioscrobbler playlists. We also run the MapReduce job on Netflix Prize data to identify similar movies based on user ratings and take the Elastic MapReduce Console for a test drive.

Details

Submitted By: Peter N. Skomoroch
AWS Products Used: Amazon EC2
Language(s): Python
License: Apache License 2.0
Created On: March 31, 2009 12:42 PM GMT
Last Updated: April 24, 2009 4:20 PM GMT
Download

Introduction

This sample application uses Amazon Elastic MapReduce to run a Multiple Step JobFlow that calculates pairwise similarity in a large database of items. We apply the sample code to music and film recommendations, but the example could potentially be run on other datasets such as document term counts, product sales, or website logs. This article assumes some familiarity with MapReduce and Hadoop Streaming. Python and Hadoop Streaming were used to make the algorithm code as clear as possible, better performance can be obtained by porting the example code to Java.

Running an Elastic MapReduce Streaming Job From the Console

Before we get to the main similarity computation, we will take a detour to learn more about our dataset using the Elastic MapReduce Console. The Elastic MapReduce Console makes tasks that involve a single map and reduce step simple to execute. We will test drive the console by running a Python streaming job from our code sample on the May 2005 Audioscrobbler dataset to analyze user listening statistics. The Audioscrobbler data contains the number of times each user listened to a particular artist. This means that, unlike Netflix star ratings, Audioscrobbler ratings are "implicit". The assumption is that the more someone listens to an artist, the higher they would rate them.

The Audioscrobbler input data has the following format:

user_artist_data.txt
    3 columns: userid artistid playcount

	2 4164 23
	3 4164 4
	2 1002471 1
	3 1002471 643
	4 1000324 1

artist_data.txt
    2 columns: artistid artist_name

	4164	Britney Spears
	1002471	Elton John
	1000324	Guns N' Roses	

Here is the Python code for the streaming mapper:

user_count_mapper.py

	import sys

	for line in sys.stdin:
	  (user, item, rating) = line.strip().split()
	  print "LongValueSum:%s\t1" % user
	

Create a Streaming JobFlow

At this point, we have logged into the console and clicked "Create JobFlow". Now we need to decide what to name our job or select a sample application:

Specify the Job Parameters

The next step involves filling in a number of parameters. For the sample application, most of these are already filled in for you. See the Hadoop Streaming 18.3 Documentation for more details about the Extra Args you can supply to Hadoop.

To count the number of artists listened to by each user we used the following parameters:

-input elasticmapreduce/samples/similarity/lastfm/input 
-output aws-hadoop/lastfm/output/user-counts 
-mapper elasticmapreduce/samples/similarity/user_count_mapper.py 
-reducer aggregate

Remember to replace the default "yourbucket" with a desired S3 output path, and ensure that the path doesn't exist before the job runs.

Configure the number and type of instances

For this job we will just stick with the default of 4 small instances which will cost $0.40 per hour. When running larger JobFlows, I usually use a cluster of c1.medium instances.

Review the Job Details and Start the Job

Everything looks correct, so we kick off the job and wait for it to complete. By default, the Elastic MapReduce cluster you launch will shut itself down when the job is complete and the output files are copied to S3. You can monitor the job status using the dashboard, the API, or by using FoxyProxy to monitor the Hadoop Web UI as described in the Elastic MapReduce documentation. Hadoop logs are also uploaded to S3 if you specify a path in the Job parameters.

Fetch the Results from S3

When our job is complete, we can download the output from the S3 bucket we indicated and analyze the results. For small files, I usually just use S3Fox to pull the output down to my local machine.

Our job produced a set of files named part-0000*, which together contain the number of unique artists listened to by each user. We can cat these together and use SciPy to run some simple statistics on the aggregated data (NumPy and SciPy are also preinstalled on the Elastic MapReduce AMIs if you want to use these modules in streaming jobs):

$ cd user-counts/
$ ls
part-00000	part-00002	part-00004	part-00006
part-00001	part-00003	part-00005	

$ cat part-0000* > user_counts.txt

>>> from numpy import *
>>> from scipy import stats
>>> data = loadtxt('user_counts.txt', unpack=True)
>>> shape(data)
(2, 147160)
>>> max(data[1,:])
3331.0
>>> stats.scoreatpercentile(data[1,:], 25)
29.0
>>> stats.scoreatpercentile(data[1,:], 50)
79.0
>>> stats.scoreatpercentile(data[1,:], 99)
695.0
>>> stats.scoreatpercentile(data[1,:], 99.99)
1989.443899999198

The version of the Audioscrobbler dataset we processed has been cleaned to remove bad records and filtered to contain only artists with at least 100 unique users. The resulting dataset contains 147160 total users, and the maximum number of artists listened to by one user is 3331. The function stats.scoreatpercentile tells us that 50% of users listened to less than 79 artists.

Using MapReduce to Compute Item Similarity

The item similarity sample application will use a MapReduce approach to compute the pairwise similarity of the items in the Audioscrobbler dataset using Python streaming, where similarity is measured by the Pearson correlation coefficient of the item ratings for the subset of users who rated both items. We won't go into all the details of the MapReduce algorithm or the similarity calculations here, but each step of the algorithm is documented in the Python code sample.

Pearson similarity in MapReduce won't win the Netflix Prize, but it will quickly generate an item similarity matrix and baseline recommendations for a number of different types of data. With the right parameter settings, you should be able to throw more data at the algorithm and scale the computation by adding compute nodes. If you have serious performance demands it would probably be worthwhile to port the Python streaming example code over to Java or C++ and take advantage of other Hadoop optimizations we won't cover in this article (including Hadoop sequence files and Combiners).

Testing the Sample Code Locally on a Small Dataset

One of the nice features of Hadoop Streaming is that you can test your application at the command line on a single machine. A small 7 artist dataset is included with the source code for testing purposes:

Artists in the sample dataset:

ID  	ARTIST  		TOTAL_PLAYS
1001820 2Pac    		12950
700     De La Soul      	7243
1011819 A Tribe Called Quest  	7601
1003557 Garth Brooks  		2976
1000143 Toby Keith  		1905
1012511 Kenny Rogers  		1881
1000418 Mark Chesnutt   	184

Run the following to test locally on a small 7 artist Audioscrobbler dataset:

	
	$ python similarity.py 
	Running sample Lastfm data for 7 artists...

	real	0m2.110s
	user	0m2.082s
	sys	0m0.456s

	Top 25 related artists:
	x	y	sim	count	name
	1000143	1000143	1	0	Toby Keith
	1000143	1003557	0.2434	809	Garth Brooks
	1000143	1000418	0.1068	120	Mark Chesnutt
	1000143	1012511	0.0758	237	Kenny Rogers
	1000418	1000418	1	0	Mark Chesnutt
	1000418	1000143	0.1068	120	Toby Keith
	1000418	1003557	0.056	114	Garth Brooks
	1000418	1012511	0.0385	50	Kenny Rogers
	1001820	1001820	1	0	2Pac
	1001820	1011819	0.1391	2428	A Tribe Called Quest
	1001820	700	0.1012	2250	De La Soul
	1001820	1003557	0.0629	878	Garth Brooks
	1003557	1003557	1	0	Garth Brooks
	1003557	1000143	0.2434	809	Toby Keith
	1003557	1012511	0.1217	360	Kenny Rogers
	1003557	1001820	0.0629	878	2Pac
	1011819	1011819	1	0	A Tribe Called Quest
	1011819	700	0.2972	3050	De La Soul
	1011819	1001820	0.1391	2428	2Pac
	1011819	1003557	0.0298	288	Garth Brooks
	1012511	1012511	1	0	Kenny Rogers
	1012511	1003557	0.1217	360	Garth Brooks
	1012511	1000143	0.0758	237	Toby Keith
	1012511	1001820	0.0543	468	2Pac
	700	700	1	0	De La Soul
	700	1011819	0.2972	3050	A Tribe Called Quest
	700	1001820	0.1012	2250	2Pac
	700	1000143	0.0479	114	Toby Keith

The main method of the Python script is actually executing the following MapReduce chain:

$ cat input/sample_user_artist_data.txt | ./similarity.py mapper1  \
| sort | ./similarity.py reducer1 > artist_playcounts.txt

$ cat input/sample_user_artist_data.txt | ./similarity.py mapper2 log | sort \
|./similarity.py reducer2 | ./similarity.py mapper3 100 artist_playcounts.txt  \
|sort |./similarity.py reducer3 147160 > artist_similarities.txt

$ cat artist_similarities.txt  | ./similarity.py mapper4 20 | sort | \
./similarity.py reducer4 3 artist_data.txt > related_artists.tsv	

You can also inspect the intermediate output of each stage:

$ cat input/sample_user_artist_data.txt | ./similarity.py mapper2 log \
| sort |./similarity.py reducer2 | tail
8390	1|1|1003557,1
8619	1|1|1011819,1
8816	1|2|1001820,2
8873	1|2|700,2
9025	2|6|1011819,4 700,2
9037	1|1|1011819,1
9059	2|4|1001820,1 700,3
9711	2|3|1001820,1 700,2
9838	1|1|700,1
9876	1|3|1011819,3

Running JobFlows with Multiple Steps on EC2 Using the Elastic MapReduce API

The console is great for single stage jobs, but the Elastic MapReduce API is much more powerful if we need to run a complex MapReduce algorithm involving multiple steps, add steps at runtime, or have JobFlows that need to be run automatically by another application.

To run our item similarity code for the Last.fm data on Elastic MapReduce, we will use the Ruby CLI client supplied by Amazon to launch a JobFlow based on a JSON JobFlow description. If you already launch multiple stage Hadoop jobs using bash scripts, you will find it easy to translate your existing scripts. Make sure you have downloaded the Ruby client and have configured your environment as described in the Elastic MapReduce Documentation.

To run the example on the entire Audioscrobbler dataset, download the sample code and edit the file lastfm_jobflow.json, filling in your bucket name and keypair where indicated. Remember that the output path should not already exist on S3. When submitted, this JobFlow will launch 10 c1.medium instances and begin processing the data as described in Step 1 of the JobFlow.

We can launch the JobFlow as follows using the Ruby client and get back a JobFlowId:

	
$ ./aws157-client.rb RunJobFlow ~/similarity/lastfm_jobflow.json 
{
  "JobFlowId": "j-I114OYCUSTB1"
}

We can check on the status of this Job in the Console, or by using the API. The DescribeJobFLows command can be passed the id returned from RunJobFlow to check on the Job Status (the response below is truncated to save space). Within a few minutes, our job will be running on a 10 node cluster.

$ ./aws157-client.rb DescribeJobFlows -j j-I114OYCUSTB1
{
  "JobFlows": [
    {
      "Name": "lastfm_large_jobflow_50.json",
      "LogUri": "s3n:\/\/aws-hadoop\/lastfm\/logs\/",
      "ExecutionStatusDetail": {
        "EndDateTime": null,
        "StartDateTime": null,
        "LastStateChangeReason": "Starting master",
        "CreationDateTime": 1238499756.0,
        "State": "STARTING"
      },
      "Steps": [
        {
          "ExecutionStatusDetail": {
            "EndDateTime": null,
            "StartDateTime": null,
            "LastStateChangeReason": null,
            "CreationDateTime": 1238499756.0,
            "State": "PENDING"
          },
          "StepConfig": {
            "Name": "MR Step 1: Count number of ratings for each item, use single reducer",
            "HadoopJarStep": {
              "Jar": "\/home\/hadoop\/contrib\/streaming\/hadoop-0.18-streaming.jar",
              "Args": [
                "-input",
                "s3n:\/\/aws-hadoop\/lastfm\/input\/",
                "-output",
                "s3n:\/\/aws-hadoop\/lastfm\/large-item-counts-50\/",
                "-mapper",
                "python similarity.py mapper1",
                "-reducer",
                "python similarity.py reducer1",
                "-cacheFile",
                "s3n:\/\/aws-hadoop\/similarity.py#similarity.py",
                "-jobconf",
                "mapred.map.tasks=36",
                "-jobconf",
                "mapred.reduce.tasks=1",
                "-jobconf",
                "mapred.compress.map.output=true"
              ],
              "Properties": [

              ],
              "MainClass": null
            },
            "ActionOnFailure": "TERMINATE_JOB_FLOW"
          }
        },
	....

Sample Code Output: Related Item Recommendations

The Last.fm JobFlow launched in the last section finishes in just under an hour using 10 c1.medium EC2 instances and costs $2.00 to run. The final output of the process is a tab delimited file related_artists.tsv containing the top 25 related items for each item in our input data. The first column contains the item ID (item_x) and the second column contains the item ID for a similar item (item_y). The remaining columns are the similarity score, the number of users the items had in common, and the name of the similar item.

Similar Artists in the Audioscrobbler Data

Here is a sample of our Audioscrobbler results, showing the top 10 most similar artists for The Cure, Toby Keith, and 2Pac:

	
	$ grep ^190 related_movies.tsv 	
	item_x	item_y	sim	num	item_y_name
	190	190	1	0	The Cure
	190	464	0.2457	3482	The Smiths
	190	536	0.23	2597	Joy Division
	190	582	0.2219	2947	Depeche Mode
	190	29	0.2063	2393	New Order
	190	235	0.2063	3690	Pixies
	190	980	0.2031	5751	Radiohead
	190	1001910	0.1967	3568	Interpol
	190	314	0.1937	3538	David Bowie
	190	1008486	0.1932	789	Siouxsie and the Banshees
	190	1001647	0.1922	3749	The Smashing Pumpkins
		

	1000143	1000143	1	0	Toby Keith
	1000143	1001001	0.2279	294	Tim McGraw
	1000143	1000414	0.1864	276	Kenny Chesney
	1000143	1000410	0.184	164	George Strait
	1000143	1100370	0.1706	118	Montgomery Gentry
	1000143	1000088	0.1674	249	Alan Jackson
	1000143	1000910	0.1595	193	Lonestar
	1000143	1003557	0.157	238	Garth Brooks
	1000143	1238925	0.1546	191	Brooks & Dunn
	1000143	1062331	0.1532	225	Brad Paisley
	1000143	1000655	0.1323	100	Tracy Byrd
		

	1001820	1001820	1	0	2Pac
	1001820	931	0.2602	2978	Eminem
	1001820	2815	0.2592	2412	50 Cent
	1001820	830	0.2201	1664	Nas
	1001820	1004029	0.217	1226	Notorious B.I.G.
	1001820	1007615	0.2148	2030	Jay-Z
	1001820	740	0.2089	1298	DMX
	1001820	4606	0.2076	2001	Snoop Dogg
	1001820	1003250	0.2076	1499	Ludacris
	1001820	1812	0.1939	1739	Dr. Dre
	1001820	1300643	0.1926	1347	The Game
		

Similar Movies in the Netflix Prize Data

The same JobFlow was run on an input bucket containing the Netflix Prize data. You will need to download a copy of the data yourself from the Netflix site and upload it to S3 to run with the sample code. Scripts are provided in the code sample accompanying this article which will convert the Netflix data into a format ready for loading into Hadoop. Here is a sample of the results for a few movies:

	$ grep ^299 related_movies.tsv 	
	item_x	item_y	sim	num	item_y_name	
	299	299	1	0	Bridget Jones's Diary
	299	16948	0.29	8263	Bridget Jones: The Edge of Reason
	299	4472	0.2231	14818	Love Actually
	299	3320	0.1988	11052	About a Boy
	299	6206	0.1796	18567	My Big Fat Greek Wedding
	299	2000	0.1777	5864	Four Weddings and a Funeral
	299	15058	0.1761	9863	Notting Hill
	299	2660	0.163	10476	When Harry Met Sally
	299	1470	0.1596	12726	Bend It Like Beckham
	299	17308	0.1584	12998	Legally Blonde
	299	1754	0.1581	7171	Sixteen Candles
	

	8846	8846	1	0	Election
	8846	5226	0.2467	12409	Rushmore
	8846	2122	0.2204	14948	Being John Malkovich
	8846	5614	0.2092	12545	Best in Show
	8846	14382	0.2075	11698	Raising Arizona
	8846	8782	0.2045	17208	The Royal Tenenbaums
	8846	5237	0.2034	3140	To Die For
	8846	12232	0.1935	15496	Lost in Translation
	8846	3355	0.1927	3367	The Ice Storm
	8846	571	0.189	17640	American Beauty
	8846	12084	0.187	13274	Adaptation
	

	8782	8782	1	0	The Royal Tenenbaums
	8782	6690	0.3759	1301	The Royal Tenenbaums: Bonus Material
	8782	12232	0.3712	47207	Lost in Translation
	8782	5226	0.3689	22545	Rushmore
	8782	1719	0.3602	14804	The Life Aquatic with Steve Zissou
	8782	1865	0.3224	28605	Eternal Sunshine of the Spotless Mind
	8782	7904	0.3216	20349	Punch-Drunk Love
	8782	12084	0.3211	34926	Adaptation
	8782	2122	0.3026	34167	Being John Malkovich
	8782	14274	0.28	14137	I Heart Huckabees
	8782	15425	0.277	24183	The Big Lebowski
	

	9940	9940	1	0	Night of the Living Dead
	9940	10627	0.3306	2100	Dawn of the Dead
	9940	3168	0.2498	1797	Evil Dead 2: Dead by Dawn
	9940	15465	0.2277	2363	Friday the 13th
	9940	8627	0.2115	1835	The Evil Dead
	9940	17004	0.2047	2111	Halloween
	9940	9368	0.2015	1594	The Texas Chainsaw Massacre
	9940	15597	0.2002	984	Day of the Dead
	9940	16793	0.198	3266	The Exorcist
	9940	4402	0.1943	2725	A Nightmare on Elm Street
	9940	10832	0.1935	2914	Psycho
		

Resources

Pete Skomoroch is a consultant at Data Wrangling in Arlington, VA where he mines large datasets to solve problems in search, finance, and recommendation systems.

©2014, Amazon Web Services, Inc. or its affiliates. All rights reserved.