Processing and Loading Data from Amazon S3 to the Vertica Analytic Database

Sample Code & Libraries>Elastic MapReduce>Processing and Loading Data from Amazon S3 to the Vertica Analytic Database
The Amazon Elastic MapReduce service allows users to create massively distributed data processing tasks built on Map and Reduce functions. Amazon Elastic Compute Cloud allows users to run any software on a scale out compute platform. EC2 can, for example be used for large scale data analysis by running an analytic database managementsystem. Often data analysis tasks start with a processing phase whereunstructured or semi-structured data needs to be processed or transformed before loading into a relational database. In this example we show how to use EMR to process and load a data set from S3 into the Vertica Analytic Database running on EC2.

Details

Submitted By: PeterS@AWS
Created On: May 29, 2009 12:45 AM GMT
Last Updated: May 30, 2009 2:08 PM GMT
Source: hadoop-vertica/demo/FreeBaseLoader.tgz
File Download: hadoop-vertica/demo/FreeBaseLoader.jar
Source License: Apache License, Version 2.0
Terms and Conditions for Associated Sample Data: Freebase Data Dump provided by Freebase.com. Data aggregated, processed and reconciled by freebase.com using data from Wikipedia.org, the freebase community, and many other open data sets. Snapshots prepared by the infochimps.org team using community curated metadata. Released under Creative Commons Attribution (CC-BY) license and the Freebase Terms of Service and Licensing Policy.

Setting up the Input Dataset

In this example we use the freebase TSV export from March of 2009. This is the same dataset used in the FreeBase example which loads data from the football data set into SimpleDB. To prepare the data we first download the bzip2 export, decompress it and then sync to a buck in S3 using the s3cmd command line tool:

$ wget http://download.freebase.com/datadumps/2009-03-23/freebase-datadump-tsv.tar.bz2
$ tar xjf freebase-datadump-tsv.tar.bz2
$ s3cmd mb s3://hadoop-vertica/demo/
$ s3cmd sync data s3://hadoop-vertica/demo/

The data in FreeBase TSV files has the following format:

name	id	attr1	attr2
abc	/guid123	value1	value2a,value2b

Note that values are tab delimited (hence TSV) and that the first line lists the attribute names. Also, some fields have multiple values which are separated by commas. To process these files we create a map job that looks for input files that have the word name on the first line and then use this to define the target table. Since we don't know the data types we're loading, everything uses a maximum sized VARCHAR(in Vertica this is 65000 characters) and we assume that there will be some post processing in the database. The remaining lines are parsed into a SQL INSERT format and send to the reducer to load.

Since this is a simple example we don't do any further processing but additional Map and Reduce jobs could be chained together to check for datatypes, link related files (e.g. teams and coaches) to define constraints or even pre-aggregate interesting statistics such as the current coach for each team and the total number of previous coaches (more on that later).

Setting up the Map Reduce Job

We construct a basic MapReduce java program by implementing Configured and extending Tool. Our run function is similar to the WordCount example with a few tweaks. Our Map task reads Text keys and Values and our Reduce task outputs Text keys and Long values (the table name and loaded record count). We also use the standard File format classes to read and write to S3. Note that in this job the data load is effectively a side-effect. Newer versions of hadoop have introduced database output formats so that the target of a MapReduce job can be a database rather than as a side effect. Different process gives us the same results.

Before running the job we also check the database connection — this keeps us from having dozens of Mappers all complain about a typo or a database that is unreachable. The code snippet is used elsewhere as well so we'll look at it here:

Connection conn = null;
if(args.length > 2)
{
	String jdbcdriver = args[2]; 
	Class.forName(jdbcdriver);
	conf.set("mapred.jdbc.driver.class", jdbcdriver);
	String jdbcurl = args[3];
	conn = DriverManager.getConnection(jdbcurl);
	conf.set("mapred.jdbc.url", jdbcurl);
}

The third argument to the job (args[2]) is the driver class name which for Vertica is "com.vertica.Driver"/ The Class.forName function tells the class loader to load class, which registers with the driver manager. This needs to be run in every virtual machine instance, so you will see it in both the Map and Reduce functions. We could also set this in the configuration as you see in the second line. The fourth argument to our job (args[3]) contains the jdbc connection url. For Vertica this takes a format as follows:

jdbc:vertica://<hostname>:5433/<db>?user=<username>&password=<password>

We pass this to the driver manager, which will throw an exception if the driver does not exist or if the database cannot be reached. Finally we save the url so that the mapper and reducer know where to find our database.

The Mapper

Both our mapper and reducer contain basic configuration information. The mapper looks for the input file name and uses this as the table name. It also checks for the table and, if it exists, counts the columns in the table using the database meta data (you'll see why next).

We write our mapper to look for the first line in a file (or file fragment) and, if it begins with the word "name" check that the table exists or creates it. The map function keeps track of the line count and, if creating the table counts the columns as well. We need to know the column count because trailing attributes are optional in TSV files. For example, if a TSV file header contains 5 attributes, a line that contains only 4 values implicitly has a NULL value for the final attribute. We need the column count so that if there are not enough values we can load NULLs for the missing values.

For every line (excluding the title line if it exists), the mapper converts the tab delimited values and formats them for INSERT into the database. Since every value is a string, it gets quoted with single quotes as required by SQL and any single quotes in the string get escaped with a double backslash (in java we need to use four since backslash is itself the escape character). That conversion looks like this:

"'" + token.replaceAll("'", "\\\\'") + "'"

We also do a simple conversion for a common pattern in the TSV input where there are only three attributes (name, id and an attr) and the third attribute is comma delimited. For example, the football_positions file includes a comma delimited list of each player that has played in a particular position. It is common to represent this in a database using multiple records, one for each position/player pair.

Finally, any line that does not have as many values as there are attributes is filled with NULL in the final positions. This formatted line is send to the reducer as a value with the key as the table name.

The Reducer

Our reduce function now has a very simple job. Since the key output from the mapper is the table name, each reducer will see all of the values for a given table in value iterator. Using a simple SQL statement It takes the formatted input and for each values, adds an INSERT batch to the statement. When a table completes it then executes the batch.

There's a little bit of magic going on under the covers. Since Vertica is designed to load large volumes of data, the JDBC driver automatically converts batch statements into bulk load operations. Each addBatch is transformed into a stream that gets sent to the database and the final executeBatch flushes the remaining buffer.

The reducer collects the table name and total rows loaded, which is stored in the output file on S3.

Putting it all Together

Now that we have our source data uploaded to S3 and we have constructed our MapReduce job, we're ready to packing it up and run the job. There's a bit of a trick to running on EMR since our job cannot run standalone — it requires the Vertica JDBC driver. In order to package everything together we use ant to create a jar file that includes both our built class file and the JDBC driver jar. The task that pulls everything together looks like this.

<target depends="build-project" name="dist">
	<mkdir dir="dist/bin"/>
	<jar jarfile="dist/bin/FreeBaseLoader.jar" basedir="bin">
		<fileset dir="${VERTICA_JDBC_DIR}" includes="**/*.jar"/>
		<manifest>
			<attribute name="Built-By" value="${user.name}"/>
		   	<attribute name="Implementation-Vendor" 
				value="Vertica Systems, Inc."/>
			<attribute name="Implementation-Title" 
				value="FreeBaseLoader"/>
			<attribute name="Implementation-Version" 
				value="0.1"/>
			<attribute name="Main-Class" 
				value="com.vertica.demo.FreeBaseLoader"/>
  			<attribute name="Class-Path" 
				value="lib/vertica_3.0_jdk_5.jar"/>
		</manifest>
	</jar>
</target>

In this build.xml the VERTICA_JDBC_DIR points to a directory that contains a subdirectory "lib" and within lib contains "vertica_3.0_jdk_5.jar." Running "ant dist" will create a FreeBaseLoader.jar in the dist/bin directory, which we then upload to S3 using s3cmd.

$ s3cmd put dist/bin/FreeBaseLoader.jar s3://hadoop-vertica/demo/

Since the mapper takes care of creating tables that don't exist, all you need is a running Vertica database. If you don't have a Vertica database instance you can get one by going to http://www.vertica.com/cloud or by starting a Vertica ami.

To run the EMR job using the ruby client tools, issue the following command:

$ elastic-mapreduce --create --num-instances 10 
--instance-type m1.small 
--jar s3n://hadoop-vertica/demo/FreeBaseLoader.jar
--arg s3n://hadoop-vertica/demo/data/football,...
--arg s3n://hadoop-vertica/demo/output/
--arg com.vertica.Driver
--arg "jdbc:vertica://<hostname>:5433/<db>?user=<user>&password=<pass>"

This assumes you have configured your credentials.json file to include the key-pair name and location for logs. We are asking EMR to create a job with ten small instances using our uploaded jar file. The first argument is the list of input directories. Since the EMR S3 file system class does not recurse subdirectories we need to list each of the directories we wish to load (it will process all files in each directory).

Note: if you have an object named s3://bucket/dir/ with the trailing slash, this throws off the S3 file system reader and you should remove that object to avoid premature termination of your job. Also, do not include directories with no files in them as this seems to stall the job on occasion.

The second argument is the name of an object that does not exist which the S3 class will create and use to store output. The final argument is the jdbc url we described earlier.

Your own FreeBase

If everything runs as described the result will be your very own Vertica database loaded with the contents of FreeBase in a SQL query-able format. So now if you want to know the current coach for each team and the total number of previous coaches you can run the following query:

SELECT name, current_head_coach, cnt FROM football_team t JOIN (
	SELECT COUNT(DISTINCT coach) as cnt, team 
	FROM football_historical_coach_position GROUP BY team) c 
	ON t.name = c.team;

Comments

Good theory, where is the source code?
It sounds good in theory, but where is the source code???
omardr on October 11, 2010 7:01 PM GMT
Files?
The information is good, however, it'd be nice to see the files? Unless I'm missing something, they're on s3, however trying to download them results in a not authorized message.
Andrew Watkins on October 29, 2009 7:08 PM GMT
We are temporarily not accepting new comments.
©2013, Amazon Web Services, Inc. or its affiliates. All rights reserved.