Operating a Data Warehouse with Hive, Amazon Elastic MapReduce and Amazon SimpleDB

This article shows how to use Amazon Elastic MapReduce and Hive toprocess logs uploaded to Amazon S3 from a fleet of boxes which areserving online advertising. The logs are processed and the resultinginformation is stored in a collection of relational tables persistedin Amazon S3 and queryable using Hive. Summaries of the data arepushed to Amazon SimpleDB where they are accessible to monitoring tools.


Submitted By: Richard@AWS
AWS Products Used: Amazon Elastic MapReduce
Created On: September 25, 2009


The data used in this article was artificially generated and the use case is fictional, never the less this article is intended to be illustrative of how to use Amazon Elastic MapReduce and Hive in practice.

Use Case

Our business consists of serving online advertisements. We receive two types of web request (i) requests for advertisements within a context, and (ii) clicks on advertisements which we must record and redirect to the advertised site.

Our fleet is deployed within Amazon EC2. Both types of web requests are served by boxes running Jetty. Each time a request is received an entry is written to a log file on local disk. The log files are rotated every 5 minutes and pushed into S3.

An Amazon Elastic MapReduce job flow pulls the logs from S3 every 5 minutes and processes them calculating statistics such as the average response time, the number of calls per second, the number of clicks per second.

For an example of how to use this data to generate a statistically motivated model for contextual advertising see https://developer.amazonwebservices.com/connect/entry.jspa?externalID=2855 That article also explains how to get started using Hive on Amazon Elastic MapReduce.

Data Sources

The web server that handles requests for advertisements produces a file in JSON format that looks like this:

  {
    requestBeginTime: "19191901901",  
    requestEndTime:   "19089012890",
    browserCookie:    "xFHJK21AS6HLASLHAS",
    userCookie:       "ajhlasH6JASLHbas8",
    searchPhrase:     "digital cameras"
    adId:             "jalhdahu789asashja",
    impresssionId:    "hjakhlasuhiouasd897asdh",
    referrer:         "https://cooking.com/recipe?id=10231",
    hostname:         "ec2-12-12-12-12.ec2.amazonaws.com",
    modelId:          "asdjhklasd7812hjkasdhl",
    processId:        "12901",
    threadId:         "112121",
    timers: {
      requestTime: "1910121",
      modelLookup: "1129101"
    }
    counters: {
      heapSpace: "1010120912012"
    }
  }

The json records are stored one per line in the log file. This makes it easier to import the log files into Hive as they can be read using the default TextFormat. The above example was reformatted to make it easier to read.

These log files are pushed to S3 in 5 minute intervals at the following location:

  s3://elasticmapreduce/samples/hive-ads/tables/impressions/
    dt=$time/$hostname-$time.log

The form "dt=" is required for Hive to be able to import the log files as a partitioned table. Using a partitioned table allows us to specify queries that only read a subset of the data rather than causing Hive to do a full table scan.

The web server boxes also produce click logs which have the following form.

  {
    requestBeginTime: "19191901901",
    requestEndTime:   "19089012890",
    browserCookie:    "xFHJK21AS6HLASLHAS",
    userCookie:       "ajhlasH6JASLHbas8",
    adId:             "jalhdahu789asashja",
    impresssionId:    "hjakhlasuhiouasd897asdh",
    clickId:          "ashda8ah8asdp1uahipsd",
    referrer:         "https://recipes.com/",
    directedTo:       "https://cooking.com/"
  }

Like the impression logs, these files are also pushed into Amazon S3 at the following location:

  s3://elasticmapreduce/samples/hive-ads/tables/clicks/
    dt=$time/$hostname-$time.log

Starting an Interactive Job Flow

In this article we'll develop a Hive script. To follow along with an interactive Hive session go the Elastic MapReduce AWS Management Console and click "Run Job Flow" then choose Hive and then "Interactive Session". Then once the job flow has entered the waiting state ssh to the master node. A more detailed description of this process is described https://developer.amazonwebservices.com/connect/entry.jspa?externalID=2729&categoryID=265 simply substitute Pig for Hive.

To follow along right to the end of this tutorial, you'll also need to be signed up for Amazon SimpleDB because the last steps in this sample upload data to a table in SimpleDB. You can sign up for Amazon SimpleDB on the Amazon Web Services website at https://aws.amazon.com

Alternatively if you have the Elastic MapReduce Command Line Client installed you can start an interactive hive session with:

  $ elastic-mapreduce --create --alive \
      --name "My Hive Job Flow" --hive-interactive \
      --num-instances 5 --instance-type m1.large

Wait for the job flow to enter the waiting state by checking the state of the jobflow from time to time with:

 
   $ elastic-mapreduce --list --active

Once the job flow has entered the waiting state you can ssh to the master with

  $ elastic-mapreduce --jobflow  --ssh

replacing with the job flow id of the job flow you started earlier.Once you're logged onto the master node you start hive with the following:

  $ hive \
      -d SAMPLE=s3://elasticmapreduce/samples/hive-ads \
      -d DATE=2009-04-13-08-05

This will make the variables ${SAMPLE} and ${DATE} available to us as we develop the script.

Data Declarations

To import the impressions and clicks tables into Hadoop we need to make use of a Serde. Serde stands for Serializer/Deserializer format and is a mechanism that allows Hive to act on data that is stored in a custom format.

We start our Hive script by loading a jar containing our Serde.

  add jar ${SAMPLE}/libs/jsonserde.jar ;

This serde is designed for processing files consisting of one json object per line. Two things to note about this statement. First we're using the variable ${SAMPLE} which we'll pass in when we run this script as a job flow step. Second the path for the jar is not a quoted string.

Adding a jar is local to a Hive session and so you must add the jar at the beginning of all scripts which use tables that use the serde.

Next we declare the impressions table:

  create external table impressions (
    requestBeginTime string, requestEndTime string, hostname string
  )
  partitioned by (
    dt string
  )
  row format 
    serde 'com.amazon.elasticmapreduce.JsonSerde'
    with serdeproperties ( 
      'paths'='requestBeginTime, requestEndTime, hostname'
    )
  location '${SAMPLE}/tables/impressions' ;

This declaration makes use of the JsonSerde to extract fields from the Json input. The JsonSerde has a property called "paths" which is a list of path expressions that are used to provide values for the columns declared in the table. There is one path expression for each column in the table.

Next we recover the partitions from the directories which exist in the table directory in Amazon S3.

  alter table impressions recover partitions ;

Since we are going to use a single partition in this task we could also have just declared that one partition:

  alter table impressions add partition (dt='${DATE}') ;

Data Transformation

Next we declare a local table on the job flow to store intermediate data.

  create table local_metrics (requestTime double, hostname string) ;

And we insert into this table the requestTime rows for each host:

  from impressions
    insert overwrite table local_metrics 
      select 
        ((cast(requestEndTime as bigint)) - 
          (cast(requestBeginTime as bigint)) / 1000.0
        ) as requestTime, 
        hostname
      where 
        dt = '${DATE}' 
  ;

Next we aggregate the data from this table to extract maximum, minimum, and average response times per host and insert the result into SimpleDB using a reducer script.

  from local_metrics
    select 
      transform ( 
        '${DATE}', 
        max(requestTime), min(requestTime), avg(requestTime),
        hostname 
      )
      using 
        '${SAMPLE}/libs/upload-to-simple-db hostmetrics metrics date,hostname date tmax tmin tavg hostname' 
      as (output string)
    group by hostname
  ;

This query calculates aggregates of the request time per host for the the time interval stored in local_metrics. It passes these aggregates to a reducer script called upload-to-simple-db. The reducer script uploads the aggregates into a Amazon SimpleDB domain called hostmetrics.

The arguments passed to upload-to-simple-db are of the form:

  domain name_prefix name_fields field_name_1 ... field_name_n

Each row passed to the script will be uploaded to Amazon SimpleDB as an object whose name is formed from the name_prefix and the values of the name fields. The attributes of the Amazon SimpleDB object are taken from the field_name arguments which label the columns in the rows passed to the script via STDIN.

The reducer script will create objects in Simple DB of the form:

  { 'ItemName': 'metrics:2009-03-01:ec-23-23-23-23-101.amazon.com', 
    'Attributes': { 
       'date': '2009-03-01', 'hostname': 'ec-23-23-23-23-101.amazon.com', 
       'tmax': '302', 'tmin': '101', 'tavg': '212'
    }
  }

Once these metrics have been uploaded to Amazon SimpleDB they can be retrieved by monitoring tools that plot charts and trigger alarms in response to various conditions.

Our final statement will compute the same metrics but aggregate across all hosts.

  from local_metrics
    select 
      transform ( 
        '${DATE}', 
        max(requestTime), min(requestTime), avg(requestTime),
        'ALL'
      )
      using 
        '${SAMPLE}/libs/upload-to-simple-db hostmetrics metrics date,hostname date tmax tmin tavg hostname' 
      as (output string)
  ;

Running the Script

We put these statements together in a script and upload it to S3 at the location:

  s3://elasticmapreduce/samples/hive-ads/scripts/response-time-stats.q 

We can create a job flow to execute this script using the Amazon Elastic MapReduce Command Line Client

  $ SAMPLE=s3://elasticmapreduce/samples/hive-ads
  $ LIB=$SAMPLE/lib
  $ elastic-mapreduce --create                   \
      --name "Calculate Monitoring Summary"      \
      --instance-type m1.large --num-instances 5 \
      --hive-script                              \
      --arg $SAMPLE/libs/response-time-stats.q   \
      --args -d,SAMPLE=$SAMPLE                   \
      --args -d,DATE=2009-03-01-22-10-05

This will use a job flow to calculate aggregates for a single 5 minute period. Instead we would like to run a job flow that will calculate all 5 minute periods in an hour.

To do this we create a job flow and then add steps to it:

  $ SAMPLE=s3://elasticmapreduce/samples/hive-ads
  $ LIB=$SAMPLE/lib
  $ JOBFLOW_ID=$(
      elastic-mapreduce --create \
        --name "Calculate Monitoring Summary"      \
        --instance-type m1.large --num-instances 5 | \
      sed -e 's|.*\(j-[^ ]*\).*|\1|'
    )
  $ for i in {0..11} ; do
      sleep 20
      elastic-mapreduce --jobflow $JOBFLOW_ID         \
        --jar s3://elasticmapreduce/libs/script-runner/script-runner.jar \
          --arg $SAMPLE/libs/wait-for.sh                      \
          --arg 2009-03-01-22-10-$((($i+1) * 5)) \
      sleep 20
      elastic-mapreduce --jobflow $JOBFLOW_ID                  \
        --hive-script --arg $LIB/response-time-stats.q         \
        --args -d,LIB=$SAMPLE/libs,-d,INPUT=$SAMPLE/tables/impressions \
        --args -d,DATE=2009-03-01-22-10-$(($i * 5))
    done

Before each Hive script step we add a step that will wait for the time period to be over before performing its calculation.

The example so far has run job flows and added steps to them from the command line. It is likely however that in practice you'll want to run job flows and add steps from within a workflow engine. To do this you'll want to make calls directly to the Amazon Elastic MapReduce web service. For this purpose there are several libraries available. The command line client includes a Ruby library and is available from https://developer.amazonwebservices.com/connect/entry.jspa?externalID=2264 Libraries for other languages such as Java and Perl are available at https://developer.amazonwebservices.com/connect/entry.jspa?externalID=2305

Operating in a Multi-User Environment

By having Hive tables stored in S3 it is possible to have many batch processes interacting. In this article we've seen an example process that calculates performance metrics across the fleet. One can imagine many more processes calculating a diverse range of results from statistical models supporting contextual advertising, to billing and payment summaries.

By operating several accounts within AWS and setting bucket permissions appropriately it is possible to make some table read-only to some accounts. For example one account could be responsible for creating some tables and another account given read access to the bucket containing these tables. This way dependent processes are not able to modify important data sources.

Additional Hive Resources

Additional Hive and Amazon Elastic MapReduce resources are listed at https://developer.amazonwebservices.com/connect/entry.jspa?externalID=2857