Analyze Log Data with Apache Hive, Windows PowerShell, and Amazon EMR

In this tutorial, you will learn how to use Apache Hive to join ad impression logs with click-through logs to determine which advertisement a given user is most likely to click on. This article also demonstrates how to manage Amazon Web Services using Windows PowerShell.


Submitted By: Chris Keyser
AWS Products Used: Elastic MapReduce
Created On: February 21, 2013


The example used in this tutorial is known as contextual advertising, and it is one example of what you can do with Amazon Elastic MapReduce (Amazon EMR). It is an adaptation of an earlier article at https://aws.amazon.com/articles/2855 that used the Amazon EMR Command Line Interface (CLI) and the AWS Management Console instead of Windows PowerShell.

Storing Logs on Amazon S3

An ad server produces two types of log files: impression logs and click logs. Every time the server displays an advertisement to a customer, it adds an entry to the impression log. Every time a customer clicks on an advertisement, it adds an entry to the click log.

Every five minutes, the ad server pushes a JSON file containing the latest set of logged data to Amazon S3. Pushing logs in a five-minute interval allows us to produce a timely analysis of the logs.

The ad server machines push their impression logs into Amazon S3. For example:

   s3://elasticmapreduce/samples/hive-ads/tables/impressions/dt=2009-04-14-06-15/ec2-50-32-48-14.amazon.com-2009-04-14-06-15.log

Log data for the tutorial is available in the elasticmapreduce Amazon S3 bucket in a subdirectory called /samples/hive-ads/tables/impressions. The /impressions directory contains additional directories named such that you can access the data as a partitioned table within Hive. The naming syntax is [Partition column]=[Partition value]. For example: dt=2009-04-14-06-15.

Importing Logs and Creating Tables

First, you need to describe the click and impression log data from Amazon S3. This log data is later combined into a single table that lists clicks for specific ads and click information. However, if you are going to process log data on a schedule, it makes sense to automate the process. In the next few sections, you'll verify the Hive commands, use them to create a script, store the script on Amazon S3, and create a job flow that executes the script. Script execution can be automated using the timer service or other scheduling software.

Launching an Interactive Job Flow

Before you process the log data, start an interactive job flow so that you can test the validity of Hive commands one at a time. To run an interactive Hive job flow, you need an Amazon EC2 key pair so you can connect over SSH into the master node. If you don't have an Amazon EC2 key pair, you can create one using the AWS Management Console.

  1. Sign in to the AWS Management Console.
  2. Select EC2 from Services.
  3. Select Key Pairs on the navigation pane.
  4. In the region selector, select the region where you will run your cluster.
  5. Click Create Key Pair.
  6. Enter a name for your key pair and click Submit.
  7. Download the private key file. Save your secret key PEM file in a secure location; you need it later. AWS does not store the key, so you will not be able to retrieve the key after this step.

There are several ways to start an interactive job flow:

Starting a Job Flow with Windows PowerShell

You can use Windows PowerShell to start an interactive job flow. For more information about installing and configuring AWS Tools for Windows PowerShell, see the "Setting up the AWS Tools for Windows PowerShell" article ( https://docs.aws.amazon.com/powershell/latest/userguide/pstools-getting-set-up.html). The article describes how to enable script execution for AWS and set up your AWS credentials.

The remainder of this tutorial assumes that your Windows PowerShell environment is installed and configured to use the default credentials for AWS. The PowerShell module for AWS must be loaded before AWS commands are issued. The easiest way to load the modules is to use the Windows PowerShell for AWS shortcut available in the Start menu under the Amazon Web Services folder. Alternatively, you can load the module in a PowerShell window manually as shown before you execute any AWS commands.

 import-module AWSPowerShell
 

At any time, you can get details about Amazon EMR commands within PowerShell after the module is loaded, by using the get-help emr command. If the command doesn't work, then your module probably did not load correctly in your environment. The following example lists all of the cmdlets for Amazon EMR in PowerShell.

 PS C:> get-help emr
 
 Name                              Category  Module                    Synopsis
 ----                              --------  ------                    --------
 Add-EMRInstanceGroup              Cmdlet    AWSPowerShell             Invokes the AddInstanceGroups method against A...
 Add-EMRJobFlowStep                Cmdlet    AWSPowerShell             Invokes the AddJobFlowSteps method against Ama...
 Edit-EMRInstanceGroup             Cmdlet    AWSPowerShell             Invokes the ModifyInstanceGroups method agains...
 Get-EMRJobFlow                    Cmdlet    AWSPowerShell             Invokes the DescribeJobFlows method against Am...
 Set-EMRTerminationProtection      Cmdlet    AWSPowerShell             Invokes the SetTerminationProtection method ag...
 Set-EMRVisibleToAllUsers          Cmdlet    AWSPowerShell             Invokes the SetVisibleToAllUsers method agains...
 Start-EMRJobFlow                  Cmdlet    AWSPowerShell             Invokes the RunJobFlow method against Amazon E...
 Stop-EMRJobFlow                   Cmdlet    AWSPowerShell             Invokes the TerminateJobFlows method against A...
 

To list details about a specific instance, use get-help with the command name, for example, get-help Start-EMRJobFlow. Now you can start an Amazon EMR job flow using PowerShell.

 PS C:> $jobid = Start-EMRJobFlow -Name "Hive Job Flow" `
                           -Instances_MasterInstanceType "m1.large" `
                           -Instances_SlaveInstanceType "m1.large" `
                           -Instances_KeepJobFlowAliveWhenNoSteps $true `
                           -Instances_Placement_AvailabilityZone us-east-1b `
                           -Instances_InstanceCount 1 `
                           -LogUri "s3n://my-log-file-bucket/" `
                           -VisibleToAllUsers $true `
                           -AmiVersion "latest" `
                           -Instances_Ec2KeyName "my-key-pair-name"
       Write-Host "Job started with JobFlowID"  $jobid
 
 Job started with JobFlowID [JobFlowID]
 

In the above example, replace my-log-file-bucket and my-key-pair-name with a valid bucket and keypair in your account. Make a note of the [JobFlowID] value that is returned, as you use this value in subsequent operations. The job flow takes a few minutes to transition from the STARTING to the WAITING states. Specifying Instances_KeepJobFlowAliveWhenNoSteps keeps the job flow running even if no steps are executing; otherwise, the job flow would immediately complete because no steps were defined. An interactive Hive session requires the job to remain running in order to open a SSH shell to the master node and issue commands. You can monitor the progress of the job flow by checking the status of the JobFlowDetail object. For more information about the objects returned by the PowerShell cmdlets under the Amazon.ElasticMapReduce.Model namespace section, see the AWS SDK documentation at https://docs.amazonwebservices.com/sdkfornet/latest/apidocs/Index.html. The $jobid variable was created in the previous example when the job flow was started, and contains the job flow ID. This variable is used in many of the subsequent examples. In this case, you are inspecting an instance of the JobFlowDetail class.

 PS C:>$jobs = Get-EMRJobFlow -JobFlowId $jobid
       Write-Host $jobs[0].ExecutionStatusDetail.State
 
 WAITING
 

If the job flow had not completed initializing, then the above example would have displayed STARTING rather than WAITING.

The Amazon EMR CLI has additional logic built in to simplify several operations, including installing Hive to the Amazon EMR instance. An Amazon EMR job is composed of steps, and the Amazon EMR CLI runs an additional step to execute a script which installs Hive. When using PowerShell, you need to run this step explicitly.

The Amazon EMR API lists two components used to define a job flow step to execute: StepConfig and HadoopJarStepConfig. A StepConfig object contains the HadoopJarStepConfig object to execute. The StepConfig object also defines additional metadata for the job flow to execute the step. There are two types of HadoopJarStepConfig objects defined by the API. The first type,HadoopJarStepConfig, is used to define a Java-based step to execute. The second type, StreamingStep, is a specialized version of the HadooopJarStepConfig object that executes scripts or even logic coded in other programming languages rather than a Java-based implementation. For more information on these classes, see the API definitions in theAmazon.ElasticMapReduce.Model documentation ( https://docs.aws.amazon.com/sdkfornet/latest/apidocs/Index.html#).

The following PowerShell helper functions simplify the process of adding steps to a job flow in subsequent examples.

 Function CreateJavaStep
 {
     param([string]$jar, 
         [string[]] $jarargs 
     )
     
     $jarstep=New-Object Amazon.ElasticMapReduce.Model.HadoopJarStepConfig
     $jarstep.Jar=$jar
     
     # add arguments and values as individual items
     foreach($jararg in $jarargs)
     {
         $jarstep.Args.Add($jararg);
     }
     
     return $jarstep
 }
 
 Function CreateStepConfig
 {
     param([string]$name, 
             [Amazon.ElasticMapReduce.Model.HadoopJarStepConfig] $stepToAdd, 
             [string]$actionOnFailure="CANCEL_AND_WAIT"
     )
      
     $stepconfig=New-Object  Amazon.ElasticMapReduce.Model.StepConfig
     $stepconfig.HadoopJarStep=$stepToAdd
     $stepconfig.Name=$name
     $stepconfig.ActionOnFailure=$actionOnFailure
 
     return $stepconfig
 }
 
 Function AddJavaStep
 {
     param([string]$name, 
         [string]$jar, 
         [string]$jobid, 
         [string[]] $jarargs, 
         [string]$actionOnFailure="CANCEL_AND_WAIT"
     )
 
     $step = CreateJavaStep $jar $jarargs
     $stepconfig = CreateStepConfig $name $step $actionOnFailure
     Add-EMRJobFlowStep -JobFlowId $jobid -Steps $stepconfig
 }

The PowerShell helper function CreateJavaStep creates a HadoopJarStepConfig object that executes Java code contained in a custom JAR file. CreateStepConfig creates and configures a StepConfig object for a provided HadoopJarStepConfig object. Finally, the AddJavaStep function uses both functions to create and add a Java-based step to a job flow. The following code shows how to use these functions to add a step that sets up Hive for the interactive job flow.

 
 $scriptjar = "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar"
 $setuphiveargs = @("s3://us-east-1.elasticmapreduce/libs/hive/hive-script", 
     "--base-path", 
     "s3://us-east-1.elasticmapreduce/libs/hive", 
     "--install-hive", 
     "--hive-versions","latest")
 
 AddJavaStep "Test Interactive Hive" $scriptjar $jobid $setuphiveargs

The API also contains a StepFactory class for creating commonly used steps. Hive is a common environment for Amazon EMR, and the factory simplifies the creation of this step. The following example is the equivalent approach to run the step for installing Hive.

 
 $stepFactory = New-Object  Amazon.ElasticMapReduce.Model.StepFactory
 $hiveVer = [Amazon.ElasticMapReduce.Model.StepFactory+HiveVersion]::Hive_Latest
 $hiveSetupStep = $stepFactory.NewInstallHiveStep($hiveVer)
 $hiveStepConfig = CreateStepConfig "Test Interactive Hive" $hiveSetupStep
 Add-EMRJobFlowStep -JobFlowId $jobid -Steps $hiveStepConfig

The job flow is in the running state while the Hive create step executes. To monitor for completion, retrieve the job status and check for the state to change from RUNNING to WAITING.

 
 do {
     Start-Sleep 10
     $running = Get-EMRJobFlow -JobFlowStates ("RUNNING") -JobFlowId $jobid
     $waitcnt = $waitcnt + 10
     Write-Host "Setting up Hive..." $waitcnt
 }while($running.Count -eq 1)

In this example, as long as the job flow is in the RUNNING state, a job flow is returned. After the step completes, the job flow goes to a WAITING state and no job is returned by Get-EMRJobFlow, causing the loop to exit. After the step completes, you can connect over SSH to the master node using the PEM file that you downloaded when you created your Amazon EC2 key pair. To connect to the master node from Microsoft Windows, you need to use a tool like PuTTY. For more information about how to connect the master node of a job flow using PuTTY, see Connect to the Master Node Using SSH in the Amazon Elastic Map Reduce Developer Guide ( https://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html ).

To connect to the master node using either PuTTY or SSH, you need the DNS name of the master node. You can obtain this by running the following PowerShell command:

  
 $jobflow = Get-EMRJobFlow -JobFlowStates ("WAITING") -JobFlowId $jobid
 $masterdns = $jobflow[0].Instances.MasterPublicDnsName
 Write-Host "Ready for interactive session, master dns: " $masterdns

Running a Hive Session on the Master Node

After you are connected to the master node using SSH, run Hive with the following command. Replace mybucket with the name of an Amazon S3 bucket that you own. You can paste into the PuTTY console using the right mouse button.

 
   hadoop@domU-12-31-39-07-D2-14:~$ hive \
     -d SAMPLE=s3://elasticmapreduce/samples/hive-ads \
     -d DAY=2009-04-13 -d HOUR=08 \
     -d NEXT_DAY=2009-04-13 -d NEXT_HOUR=09 \
     -d OUTPUT=s3://mybucket/samples/output
   hive>

In addition to launching Hive, this command sets values for the variables SAMPLE, DAY, NEXT_DAY, NEXT_HOUR and OUTPUT. These variables store values you use in future interactive Hive commands. Later in this tutorial, when you run the Hive statements from a script stored in Amazon S3, you pass these variables into the job flow.

Check whether Hive has any tables defined by running the following command:

 
   hive> show tables;
   OK
   Time taken: 3.51 seconds
   hive>

The empty output indicates that there are no Hive tables defined. Next, create a table from the impression and click-through logs stored in Amazon S3.

Declaring Tables in Amazon S3

Hadoop reads and splits your data into records for processing by your mapper logic. This data may be streamed from Amazon S3 or read from the Hadoop file system (HDFS) for processing by the EMR mapper and reducer tasks when you execute your Hive query. The default loader processes input text files where each line in the file is a record, and fields on the record are separated by tabs. The impression and click data for this tutorial are stored in JSON format, which requires the use of a custom SerDe (serializer-deserializer) to process. A SerDe enables Hive to read data stored in a custom format like JSON and parse it into records that your mapper can process. The tutorial provides a custom SerDe located in Amazon S3 to split the JSON data that you can tell Hive about it with the following statement.

  
   ADD JAR ${SAMPLE}/libs/jsonserde.jar ;

Notice that you are using the variable ${SAMPLE} defined when you invoked the Hive interpreter in the previous section. Unlike the syntax for other Hive statements, the JAR location supplied in the ADD statement is not a quoted string.

Now that your SerDe is defined, you tell Hive about your clicks and impressions data by creating an external table. An external table tells Hive that the data is owned by another system, and therefore Hive does delete the data if the table is dropped.

 
   CREATE EXTERNAL TABLE impressions (
     requestBeginTime string, adId string, impressionId string, referrer string, 
     userAgent string, userCookie string, ip string
   )
   PARTITIONED BY (dt string)
   ROW FORMAT 
     serde 'com.amazon.elasticmapreduce.JsonSerde'
     with serdeproperties ( 'paths'='requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip' )
   LOCATION '${SAMPLE}/tables/impressions' ;

The data for the impressions table resides in Amazon S3. Creating the table is a quick operation because you are telling Hive about the existence of the data, not copying it. Amazon EMR implements optimizations for the AWS infrastructure, including high speed parallel streaming of data from and to Amazon S3 when Amazon S3 is used as the data store for an external table. When you execute your queries on the external table, Amazon EMR streams the data from Amazon S3 directly to the SerDe implementation as the first step in processing the query. This approach bypasses disk storage and produces better performance than writing the data out to disk as an intermediate stage. However, it also means that it doesn’t take advantage of data locality offered by HDFS. Storage of intermediate results (like storing mapper output and shuffling results between the mapper and reducer stage) take place using HDFS.

The impressions table is partitioned based on time. As yet, Hive doesn't know which partitions exist in the table. You can tell Hive about the existence of a single partition using the following statement.

   ALTER TABLE impressions ADD PARTITION (dt='2009-04-13-08-05') ;

If you query the table at this point, the results contain data from just this partition. You can instruct Hive to recover all partitions by inspecting the data stored in Amazon S3 using the RECOVER PARTITIONS statement.

   ALTER TABLE impressions RECOVER PARTITIONS ;

You follow the same process to recover clicks.

   CREATE EXTERNAL TABLE clicks (
     impressionId string
   )
   PARTITIONED BY (dt string)
   ROW FORMAT 
     SERDE 'com.amazon.elasticmapreduce.JsonSerde'
     WITH SERDEPROPERTIES ( 'paths'='impressionId' )
   LOCATION '${SAMPLE}/tables/clicks' ;
   
   ALTER TABLE clicks RECOVER PARTITIONS ;
 

Combining the Clicks and Impressions Tables

You combine the clicks and impressions tables so that you have a record of whether each impression resulted in a click. This data should be stored in Amazon S3 so that it can be used as input to other job flows.

   CREATE EXTERNAL TABLE joined_impressions (
     requestBeginTime string, adId string, impressionId string, referrer string, 
       userAgent string, userCookie string, ip string, clicked Boolean
     )
     PARTITIONED BY (day string, hour string)
     STORED AS SEQUENCEFILE
     LOCATION '${OUTPUT}/joined_impressions'
   ;

This table is partitioned as well. An advantage of partitioning tables stored in Amazon S3 is that if Hive needs only some of the partitions to answer the query, then only the data from these partitions will be downloaded from Amazon S3.

The joined_impressions table is stored in SEQUENCEFILE format, which is a native Hadoop file format that is more compressed and has better performance than JSON files.

Next, you create some temporary tables in the job flow's local HDFS partition to store intermediate impression and click data. The HDFS table is considered temporary in this case because after you have defined the queries and automated execution, the job flow runs periodically and terminates, at which point any data stored in HDFS is destroyed.

 CREATE TABLE tmp_impressions (
     requestBeginTime string, adId string, impressionId string, referrer string, 
     userAgent string, userCookie string, ip string
   )
   STORED AS SEQUENCEFILE;

You insert data from the impressions table for the time duration you're interested in. Note that because the impressions table is partitioned, only the relevant partitions are read.

   INSERT OVERWRITE TABLE tmp_impressions 
     SELECT 
       from_unixtime(cast((cast(i.requestBeginTime as bigint) / 1000) as int)) requestBeginTime, 
       i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, i.ip
     FROM 
       impressions i
     WHERE 
       i.dt >= '${DAY}-${HOUR}-00' and i.dt < '${NEXT_DAY}-${NEXT_HOUR}-00'
   ; 

The start of the time period is DAY-HOUR and the end of the period is NEXT_DAY-NEXT_HOUR. NEXT_DAY is the day of the next time period. It differs from ${DAY} only when you're processing the last hour of a day. In this case, the time period ends on the next day.

For clicks, you extend the period of time over which you join by 20 minutes (meaning that you accept a click that occurred up to 20 minutes after the impression).

  CREATE TABLE tmp_clicks (
     impressionId string
   ) STORED AS SEQUENCEFILE;
 
  INSERT OVERWRITE TABLE tmp_clicks 
     SELECT 
       impressionId
     FROM 
       clicks c  
     WHERE 
       c.dt >= '${DAY}-${HOUR}-00' AND c.dt < '${NEXT_DAY}-${NEXT_HOUR}-20'
   ;

Now you combine the impressions and clicks tables using a left outer join. This way, any impressions that did not result in a click are preserved. This join also enables you to search for clicks that occurred after the time period; however, the query excludes any clicks that did not originate from an impression in the selected time period.

   INSERT OVERWRITE TABLE joined_impressions PARTITION (day='${DAY}', hour='${HOUR}')
   SELECT 
     i.requestBeginTime, i.adId, i.impressionId, i.referrer, i.userAgent, i.userCookie, 
     i.ip, (c.impressionId is not null) clicked
   FROM 
     tmp_impressions i LEFT OUTER JOIN tmp_clicks c ON i.impressionId = c.impressionId
   ;

Because the joined_impressions table is located in Amazon S3, the results were streamed into the Amazon S3 location, and this data is now available for other job flows to use. Note that Hadoop creates an output file for each reducer, and as a result you may have multiple files in your Amazon S3 location after the query executes.

Terminate an Interactive Session

This tutorial returns to an interactive session in subsequent sections, so if you plan to continue on with the article, you can choose to leave your job flow running. Because the job flow is interactive, it does not shut down until you terminate it.

At any time, you can terminate your job flow by using the Stop-EMRJobFlow command in PowerShell.

   Stop-EMRJobFlow -JobFlowID $jobid
 

Running in Script Mode

Typically, you create a script with these queries that execute periodically to generate metrics. Because you only pay for your cluster for as long as it takes the script to execute, this saves money. You pay by the hour, so if you have multiple, related queries that in total take less than an hour, consider consolidating them into one script. For example, if you run two job flows that take 25 minutes each, then you pay for two hours of processing. If you run one job flow that executes a consolidated script for 50 minutes, then you only pay for one hour of processing.

You can collect all of the Hive statements developed so far in this tutorial and place them into a script file. For your convenience, a script consolidating these statements is provided in Amazon S3 in a file called join-clicks-to-impressions.

   s3://elasticmapreduce/samples/hive-ads/libs/join-clicks-to-impressions.q

You can launch a job flow to join clicks to impressions for a particular time period using (replace the values for my-output-bucket, my-log-bucket, and my-key-pair-name for the OUTPUT argument with your bucket). For a production system, you would typically use a script after you’ve developed your logic to automate the process.

To create an automation script based upon PowerShell, you need to add the function definitions and AWS PowerShell module import described previously.

 
 $stepFactory = New-Object  Amazon.ElasticMapReduce.Model.StepFactory
 $hiveVersion = [Amazon.ElasticMapReduce.Model.StepFactory+HiveVersion]::Hive_Latest
 $hiveSetupStep = $stepFactory.NewInstallHiveStep($hiveVersion)
 $createHiveStepConfig = CreateStepConfig "Test Interactive Hive" $hiveSetupStep
 
 $runhivescriptargs = @("s3://us-east-1.elasticmapreduce/libs/hive/hive-script", `
             "--base-path", "s3://us-east-1.elasticmapreduce/libs/hive", `
             "--hive-versions","latest", `
             "--run-hive-script", `
             "--args", `
             "-f", "s3://elasticmapreduce/samples/hive-ads/libs/join-clicks-to-impressions.q", `
             "-d", "SAMPLE=s3://elasticmapreduce/samples/hive-ads",`
             "-d", "DAY=2009-04-13", `
             "-d", "HOUR=08", `
             "-d", "NEXT_DAY=2009-04-13", `
             "-d", "NEXT_HOUR=09",`
             "-d", "INPUT=s3://elasticmapreduce/samples/hive-ads/tables", `
             "-d", "OUTPUT=s3://my-output-bucket/joinclick1", `
             "-d", "LIB=s3://elasticmapreduce/samples/hive-ads/libs")
                       
 $adsProcessingStep = CreateJavaStep $scriptjar $runhivescriptargs
 $runAdsScriptStepConfig = CreateStepConfig "Processing Ads" $adsProcessingStep
 
 $jobsteps = @($createHiveStepConfig, $runAdsScriptStepConfig)
 
 $jobid = Start-EMRJobFlow -Name "Join Clicks PS" `
                           -Instances_MasterInstanceType "m1.large" `
                           -Instances_SlaveInstanceType "m1.large" `
                           -Instances_KeepJobFlowAliveWhenNoSteps $false `
                           -Instances_Placement_AvailabilityZone us-east-1b `
                           -Instances_InstanceCount 1 `
                           -Steps $jobsteps `
                           -LogUri "s3://my-log-bucket/" `
                           -VisibleToAllUsers $true `
                           -AmiVersion "latest" 

To run job flows regularly—such as every hour—you use a workflow or task scheduling system.

Contextual Advertising Model

In the previous section, you created a regular process to extract clicks and impressions data from log files and to join that data in a table called joined_impressions. Now you can experiment with a new algorithm that implements contextual advertising. In this scenario, you create a simple, statistically inspired, model for ad serving.

Given an advertising context consisting of user agent, user IP, and page URL, you would like to predict which of your available advertisements is most likely to result in a click. Assume that an advertising context consists of a number of features that are true. For example, a feature could be the user agent containing the keyword Mozilla or that the IP address began with the prefix 23.12.

You'd like to estimate the probability of a click given the context.

   P[click|context]

One heuristic for doing this is the following formula.

   product_{f in context} Pr[click|f=true]

This heuristic multiplies the probability of a click for each feature that is true in the advertising context. If you take the negative log of this formula, you get the following formula.

   - sum_{f in context} log ( count[click,f=true] / count[f=true] )

Because the log of zero is –infinity, you want to exclude from the sum any features for which the click through probability is zero. For these cases, you insert a minimum value of 0.0001.

Declaring External Tables in the Interactive Job Flow

For this part of the tutorial, you run again in interactive mode. If you terminated the interactive job flow you created earlier, you can start another one; otherwise, continue with the job flow started earlier.

After the interactive session is running, connect over SSH into the master node and start Hive again with the following command:

   hadoop@domU-12-31-39-07-D2-14:~$ hive \
     -d SAMPLE=s3://elasticmapreduce/samples/hive-ads

Your first task is to declare the joined_impressions table again and to recover partitions.

  CREATE EXTERNAL TABLE IF NOT EXISTS joined_impressions (
   request_begin_time string, ad_id string, impression_id string, 
   page string, user_agent string, user_cookie string, ip_address string,
   clicked boolean 
  )
  PARTITIONED BY (day STRING, hour STRING)
  STORED AS SEQUENCEFILE
  LOCATION '${SAMPLE}/tables/joined_impressions';

    ALTER TABLE joined_impressions RECOVER PARTITIONS;

Check that the partitions are in order.

  SHOW PARTITIONS joined_impressions;

Producing the Feature Matrix

You need to do some transformation on our impression data to produce Boolean features. For user agent, you extract keywords. For IP addresses, take only the top two bytes. For page URLs, convert them to lower case. In this section, you examine each of these transformations in turn.

User Agent

Every time a customer visits a website, their browser identifies itself with a user agent. The user agent contains information about the browser and machine the customer is using, for example "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.1) Gecko/20090624 Firefox/3.5".

An easy way to convert the user agent string into a sequence of keywords is to use a Python script. As you'll see shortly, you can call this script directly from within a Hive statement.

   #!/usr/bin/python
  
   import sys
   import re
  
   for line in sys.stdin:
     user_agent, ad, clicked = line.strip().split('\t')
     components = re.split('[;/,\(\) ]', user_agent)
     for component in components:
       if len(component) != 0:
          print '\t'.join([component, ad, clicked])

This script reads table rows passed to sys.stdin one line at a time. Each line is tab separated and has three columns: user_agent, ad, and clicked. The script outputs one record per keyword found in the user agent field.

The output of this script is a table with columns: keyword, ad, and clicked. The script outputs multiple records if a keyword occurs more than one time in an impression. Possible improvements to the script include removing duplicate keywords and sharpening the recognition of keywords.

To call this script from within a Hive, issue a MAP statement.

 
   MAP 
    joined_impressions.user_agent, joined_impressions.ad_id, 
    joined_impressions.clicked
   USING 
    '${SAMPLE}/libs/split_user_agent.py' AS 
    feature, ad_id, clicked
   FROM 
     joined_impressions
   LIMIT 10;

The columns user_agent, ad_id, and clicks from the joined_impressions table are added to the script and the result is a table with the columns feature, ad_id, and clicked.

The output of the statement is displayed on the console so you should limit the number of lines output to ten. You can see from the output that the keywords contain spaces and are not lowercased. To normalize the output, apply the user-defined functions trim and lower and prefix each keyword by 'ua:' so these features can be mixed with other features.

   SELECT concat('ua:', trim(lower(temp.feature))) as feature, temp.ad_id, temp.clicked
   FROM (
     MAP joined_impressions.user_agent, joined_impressions.ad_id, joined_impressions.clicked
     USING '${SAMPLE}/libs/split_user_agent.py' as feature, ad_id, clicked
     FROM joined_impressions
   ) temp
   LIMIT 10;

IP Address

To normalize the IP address, extract the first two octets of the IP address. Using a regular expression (regex) makes this easier. The regex below says start matching at the beginning of the field, find one to three digits followed by a period, and then one to three more digits and capture that pattern. The regexp_extract UDF takes the string to match, the regex to use, and then the capturing group to return. In this case, you want the first captured group.

   SELECT 
     concat('ip:', regexp_extract(ip_address, '^([0-9]{1,3}\.[0-9]{1,3}).*', 1)) AS 
       feature, ad_id, clicked
   FROM 
     joined_impressions
   LIMIT 10;

URL

To extract a feature from the URL of the page on which the advertisement displays, make the URLs all lowercase and add "page:" to the beginning.

SELECT concat('page:', lower(page)) as feature, ad_id, clicked FROM joined_impressions LIMIT 10;

Combining the Features

Now that you have written queries to normalize each of the feature types, combine them into one table. You can do this using Hive's UNION operator. Keep in mind that all subqueries in the union must have the same number of columns that have the same, exact names.

   SELECT *
   FROM (
     SELECT concat('ua:', trim(lower(ua.feature))) as feature, ua.ad_id, ua.clicked
     FROM (
       MAP joined_impressions.user_agent, joined_impressions.ad_id, joined_impressions.clicked
       USING '${SAMPLE}/libs/split_user_agent.py' as (feature STRING, ad_id STRING, clicked BOOLEAN)
       FROM joined_impressions
     ) ua
    
    UNION ALL
     
    SELECT concat('ip:', regexp_extract(ip_address, '^([0-9]{1,3}\.[0-9]{1,3}).*', 1)) as feature, ad_id, clicked
    FROM joined_impressions
     
    UNION ALL
     
    SELECT concat('page:', lower(page)) as feature, ad_id, clicked
      FROM joined_impressions
    ) temp
    limit 50;
 

Note that you had to modify the user agent query slightly. Passing data through a mapper strips the columns of their types and returns them as strings. To merge with the other tables, define clicked as a Boolean.

Index Table

Now that you have compiled a logical table of tuples (feature, ad_id, clicked), it is time to process these to form the heuristic table. Logically, this is a sparse matrix with the axes, features and ad_id. The value represents the percentage of times an ad was clicked. This percentage is represented by the following table.

   CREATE TABLE feature_index (
     feature STRING,
     ad_id STRING,
     clicked_percent DOUBLE )
   STORED AS SEQUENCEFILE;

Now, extend the query from above:

   INSERT OVERWRITE TABLE feature_index
     SELECT
       temp.feature,
       temp.ad_id,
       sum(if(temp.clicked, 1, 0)) / cast(count(1) as DOUBLE) as clicked_percent
     FROM (
       SELECT concat('ua:', trim(lower(ua.feature))) as feature, ua.ad_id, ua.clicked
       FROM (
         MAP joined_impressions.user_agent, joined_impressions.ad_id, joined_impressions.clicked
         USING '${SAMPLE}/libs/split_user_agent.py' as (feature STRING, ad_id STRING, clicked BOOLEAN)
       FROM joined_impressions
     ) ua
     
     UNION ALL
     
     SELECT concat('ip:', regexp_extract(ip_address, '^([0-9]{1,3}\.[0-9]{1,3}).*', 1)) as feature, ad_id, clicked
     FROM joined_impressions
     
     UNION ALL
     
     SELECT concat('page:', lower(page)) as feature, ad_id, clicked
     FROM joined_impressions
   ) temp
   GROUP BY temp.feature, temp.ad_id;

There are a few new aspects to this Hive statement. The first is the GROUP BY at the end of the query. You group by feature and ad_id because these are the keys of your output.

To find the percentage, you need to find the total number of rows in the grouping and the number of rows in which clicked is True. For the count, use the standard SQL function, count. However, this returns an integer and you want a double for division, so use the cast function

   cast(count(clicked = 'true') as DOUBLE)

To calculate the number of impressions for each feature that resulted in a click, use the conditional function "if". The function "if" takes three parameters: the conditional, the value to return when true, and the value to return when false. In this case, you want to return 1 when the value is true and 0 when false, and then sum these values.

   sum(if(clicked = 'true', 1, 0))

Finally, divide the number where clicked is true by the total count to obtain Pr[click|feature].

Applying the Heuristic

Now that you have the heuristic table, try a few sample tests to see how it performs for the features 'ua:safari' and 'ua:chrome'.

   SELECT 
     ad_id, -sum(log(if(0.0001 > clicked_percent, 0.0001, clicked_percent))) AS value
   FROM 
     feature_index
   WHERE 
     feature = 'ua:safari' OR feature = 'ua:chrome'
   GROUP BY 
     ad_id
   ORDER BY 
     value ASC
   LIMIT 100
   ;

The result is advertisements ordered by a heuristic estimate of the chance of a click. At this point, you could look up the advertisements and see, perhaps, a predominance of advertisements for Kindle products.

If your interactive Hive job flow is still running at this point, don't forget to terminate it. For more information about cleaning up a job flow, seeStep 8: Clean Up in the Analyzing Big Data with AWS Getting Started Guide ( https://docs.aws.amazon.com/gettingstarted/latest/emr/getting-started-emr-clean-up.html ).

Summary

In this tutorial, you have seen how to develop a job flow to process impression and click logs uploaded to Amazon S3 by an ad server. The result of this job flow is a table in Amazon S3 that can be analyzed to develop and test a model for contextual advertising. The Hive statements collected by an analyst could be used within a job flow to generate a model file. The analyst could upload the file to Amazon S3 and thus make it available to ad servers to serve ads contextually.

Additional Hive Resources

For more information, see "Running Hive on Amazon Elastic MapReduce" (https://aws.amazon.com/articles/2857 ).

You can download two PowerShell scripts that demonstrate the commands used in this article at https://s3.amazonaws.com/emr-website/articles/powershell/Create-InteractiveHiveJob.ps1 and https://s3.amazonaws.com/emr-website/articles/powershell/RunHiveDemoJob.ps1 . Depending on your security settings, you may need to unblock the file after download because the scripts are not signed. To unblock the file, you need to navigate to the file in Windows Explorer, right-click on the file and select Properties, and then click Unblock.