AWS Big Data Blog

How Expedia Implemented Near Real-time Analysis of Interdependent Datasets

This is a guest post by Stephen Verstraete, a manager at Pariveda Solutions. Pariveda Solutions is an AWS Premier Consulting Partner.

Common patterns exist for batch processing and real-time processing of Big Data. However, we haven’t seen patterns that allow us to process batches of dependent data in real-time. Expedia’s marketing group needed to analyze interdependent data sets as soon as all of the data arrived to deliver operational direction to partners. The existing system ran on an on-premises Hadoop cluster, but the team was struggling to meet their internal SLAs. The information was also time-sensitive; getting the data faster meant giving better operational direction to partners.

The Pariveda team working at Expedia engaged with Solutions Architects at AWS to solve three distinct challenges: How to deliver analysis results as rapidly as the source data becomes available;  how to process data sets that are interdependent but are produced at different times; and how to manage dependencies between data sets that arrive at different times.

In this blog post, I describe how Pariveda, working with Expedia, figured out a unique approach to real-time data processing using AWS Lambda, Amazon DynamoDB, Amazon EMR, and Amazon S3 as building blocks. You’ll learn how to implement a similar pipeline without managing any infrastructure.

Resolving interdependencies between data sets

One problem we needed to resolve was the interdependencies between the data sets. Our goal was to provide consolidated, clean feeds to a second system so a more detailed analysis could be performed. To create these feeds – hundreds per day — data sets from multiple partners and internal systems comprising different categories of data had to be ingested, aggregated, and queried. Each type of data and each partner’s data arrived at different times. This meant processing needed to be held until all sets required for a single feed had arrived.

The solution outlined below was the result. We used AWS Lambda attached to an S3 bucket to update a task defined in DynamoDB. The task definition includes the name, the list of dependent files and their status (received or not), and the parameters needed to run that task in EMR. Once all files for a given task arrive, the lambda function updates a work queue and starts a cluster in EMR. EMR pushes the results back to S3 where the consuming application can retrieve them as needed.

Configuring the tasks

The hub of the system is the task. The task object keeps all information necessary to determine the data dependencies, their status, and what processing must occur. By defining tasks, you can configure all of the work that needs to happen. From the task table, it is easy to see what the data dependencies are.

Mapping S3 events to tasks

When we get events from S3, the event raised in AWS Lambda only has the context for the modified object in S3. We don’t have the information about the task directly in the data from S3. We do have one, very valuable piece of information though: the new S3 object’s key. In Node.js that gives us a one-line piece of code:

var srcKey = event.Records[0].s3.object.key;

From there we need a way to get to the task information. For this purpose, we create the FileUnit table. The table essentially turns the task on its head and has the S3 key as the range key into the table, with the task key as the data payload. This lets us take the source key and figure out what task we have with a single DynamoDB query.

function getTaskKey(srcKey, dateKey, callback) {
   // Create FileUnit query

   var path = decodeURIComponent(srcKey);
   console.log('checking for FileUnit {nDate:' + dateKey +'nFile:' + path + 'n}');

   var fileUnitParams = {
      Key: {Date: {S: dateKey}, Filename: {S: path}},
      TableName: 'FileUnit',
      AttributesToGet: ['Task'],
      ConsistentRead: false
   };
   // Get Task key from FileUnit
   dynamodb.getItem(fileUnitParams, function (err, data) {
      if (err) {
         console.log('Error reading file unit: ' + err); // an error occurred
         callback(err);
      } else if(data === undefined || data.Item === undefined || data.Item.Task === undefined) {
         console.log('File Unit Key does not exist: Date: ' + dateKey + 'File: ' + path);
         callback('File Unit Key Does not exist');
      } else {
         console.log('Found key: ' + data.Item);
         var taskKey = data.Item.Task.S;
         callback(null, path, taskKey);
      }
   });
}

From here, we can update the task, determine if all of the dependencies have arrived, and start Amazon EMR.

Resulting workflow

Now that we’ve got a table describing what we want, and a table to help us reference that task, what does the overall flow look like?

Workflow

Creating DynamoDB tables

We create the following three tables in DynamoDB:

The Task table

For the Task table, we’ll use a HashKey/RangeKey Primary Key setup with the date as the HashKey and a string called TaskKey for the range key. This can be any name as long as it is unique among your tasks. For the Expedia project, it was a predictable name that we could use for tracking in other parts of the system. From a HashKey perspective, Date doesn’t conform to guidelines for time series data, so you may create another predictable hash key for yourself. But Date is a nice hash key for searching later on if your tasks are duplicated day to day.

Here’s a sample entry:

{
  "Date": "20150525",
  "input/inputFile1.txt": "NULL",
  "input/inputFile2.txt": "NULL",
  "input/inputFile3.txt": "NULL",
  "ScriptParameters": "['s3://us-west-2.elasticmapreduce/libs/hive/hive-script ', '--run-hive-script', '--hive-versions', '0.13.1', '--args','-f', 's3://YOUR_BUCKET_NAME/input/script/taskQuery.hql']",
  "TaskKey": "Task1",
  "TaskStatus": "New"
}

At table creation, we need only worry about the TaskKey and Date, but the input files (notice the S3 paths), and the ScriptParameters are necessary for the full system to function. This task was created in the console. In production, this configuration should be loaded from a file on a set frequency before the data files themselves are loaded.

The FileUnit table

The FileUnit table is our reference in to the Task table using an S3 path. It has three attributes:

  • Date (HashKey)
  • Filename (RangeKey) – the S3 path for the file
  • Task – the task to reference

In all practicality, Date isn’t a necessity for this table. In fact, it is much better to do without it if you can avoid it, but it suits our illustration. If your task names don’t repeat on a daily basis, a better option is to use the S3 path as the HashKey and the task name as RangeKey. This makes it easy to manage data sets that are dependencies for multiple tasks while keeping your query on the HashKey only.

The Batch Table

The batch table should be created with a “Date” HashKey and a “Task” RangeKey. The Task value will be the same as the TaskKey value in the Task table. For querying, we also add a Global Secondary Index to the table with a HashKey of date and a RangeKey of “ProcessingState”. This helps us easily query for unprocessed items .

Test Data

For testing, create an entry in the Task table similar to the one above. Be sure to use the input paths as attribute names and enter NULL for their values. Next, take these same input paths and make entries in your FileUnit table. The path names must match the value in the Filename column exactly to work (including case). The Task value in the FileUnit table must match the TaskKey of the task.  Using the Task example from above, you would create the following three FileUnit entries:

Testing

With the tables created and some test data loaded, we can implement the AWS Lambda function.

Write the AWS Lambda function

Code skeleton

The code skeleton looks a lot like the workflow shown earlier.

function processDataFiles(srcKey, date, completionCallback) {
    async.waterfall([
            function (callback) {
                getTaskKey(srcKey, date, callback);
            },
            function (path, taskKey, callback) {
                updateTask(path, date, taskKey, callback);
            },
            function(taskItem, callback) {
                sendToQueue(taskItem, callback);
            },
            function (callback) {
                getUnprocessedItems(date, callback);
            },
            function(batchData, callback) {
                processBatch(batchData, callback);
            }
        ],
        function (err) {
            if (err) {
                console.error('Processing stopped: ' + err);
                completionCallback(err);
            } else {
                console.log('Success');
                completionCallback();
            }// Success
        });
}

Query and Update Dynamo Tables

Querying the DynamoDB tables is straightforward; see the code in the Mapping S3 events to tasks section above. Updating the task table is also straightforward, but we place a small twist on the call to get back the new values so we can check for completeness. This way we only have to make one query to DynamoDB for the task, and not two.

function updateTask(path, date, taskKey, updateTaskCallback)
{
   var taskUpdateExpression = 'SET TaskStatus = :val1, #P = :val2' ;
   var expressionAttributeNames = {'#P': path};
   var taskUpdateExpressionValues = {
      ':val1': {'S': 'FilesReceived'},
      ':val2': {'S': 'Received'}
   };
   var taskParams = {
      Key: {Date: {S: date}, TaskKey: {S: taskKey}},
      TableName: 'Task',
      UpdateExpression: taskUpdateExpression,
      ExpressionAttributeNames: expressionAttributeNames,
      ExpressionAttributeValues: taskUpdateExpressionValues,
      ReturnValues: 'ALL_NEW'
   };

   dynamodb.updateItem(taskParams, function(err, data){
   if (err) {
      console.log('Error updating tasks' + err, err.stack); // an error occurred
      updateTaskCallback(err);
   }
   else {
      console.log("received: ", util.inspect(data, {depth:4}));
      updateTaskCallback(null, data.Attributes);
      }
   });
}//updateTask

We’ll gloss over the code to add completed items to the batch table as it is very similar to the updateTask function above. We do use the putItem function instead of the updateItem function.

Start EMR

Starting the task in EMR is straightforward. We start an EMR cluster and then add a job flow step. There’s a lot of configuration code, but the essence is simple. You must install the appropriate applications for your work, in this case Hive, so you’ll see the job flow step being added when we start EMR.

function startEMR(callback)
{
   var installHiveArgs = [
      's3://' + fconst.zone_name + '.elasticmapreduce/libs/hive/hive-script',
      '--base-path',
      's3://' + fconst.zone_name + '.elasticmapreduce/libs/hive/',
      '--install-hive', '--hive-versions', '0.13.1' ];

   var installHiveStep = {
      Jar : scriptRunnerJar,
      Args : installHiveArgs
   };

   var params = {
      Instances: { /* required */
         HadoopVersion: '3.8.0',
         InstanceGroups: [
            {
               InstanceCount: 1, /* required */
               InstanceRole: 'MASTER', /* required */
               InstanceType: 'm1.medium', /* required */
               Market: 'ON_DEMAND',
               Name: 'Master'
            },
            {
               InstanceCount: 1, /* required */
               InstanceRole: 'CORE', /* required */
               InstanceType: 'm1.medium', /* required */
               Market: 'ON_DEMAND',
               Name: 'Core'
            }
         ]
      },
      Name: "Near Realtime Big Data Test",
      AmiVersion: 'latest',
      LogUri: 's3://big-data-blog-logs/',
      ServiceRole: 'EMR_DefaultRole' ,
      JobFlowRole: 'EMR_EC2_DefaultRole',
      Steps: [
         {
            HadoopJarStep: installHiveStep,
            Name: 'Hive setup',
            ActionOnFailure: 'TERMINATE_CLUSTER'
         }
      ],
      VisibleToAllUsers: true
   };

   console.log('Starting job: ' + params.Name);
   console.log(util.inspect(params, {depth:4}));
   emr.runJobFlow(params, callback);
}

From here we add the processing tasks to the job flow by calling the ‘addJobFlowSteps’ function with our script parameters. There’s a small transformation step that needs to take place here. You can find the transformation code in the GitHub repository.

Deploying the AWS Lambda function

To deploy this application in AWS Lambda:

  1. Download the source code from GitHub.
  2. Use “npm install” to install the async dependency.
  3. Update the ‘logsPath’ value in the FunctionConstants.js file to point to a bucket and prefix where you’d like EMR to place log files.
  4. Package and deploy the function as shown in this example or these walkthroughs.
  5. Ensure your Lambda Execution  IAM role has the following permissions:
    1. DynamoDB – getItem, updateItem, putItem
    2. EMR – startJobFlow, addJobFlowItem
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:*"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::*"
    },
	{
      "Action": [
        "dynamodb:*",
      ],
      "Effect": "Allow",
      "Resource": "*"
    },
	{        
		"Effect": "Allow",
		"Action": [
			"elasticmapreduce:AddJobFlowSteps",
			"elasticmapreduce:RunJobFlow"
		],
		"Resource": [
			"*"
		]
	},
		{
		"Sid": "Stmt1432573611000",
		"Effect": "Allow",
		"Action": [
			"iam:PassRole"
		],
		"Resource": [
			"arn:aws:iam::XXXXXXXXXXXX:role/EMR_DefaultRole"
		]
	}
  ]
}

Run a quick test in the console

To make sure everything is configured correctly, you can simulate new files being added to S3 with the Edit/Test page for your AWS Lambda function in the AWS Console:

Simulate new files being added to S3

Use the S3 sample event and change the parameters in the “s3” section and the “object” section to fire an event that simulates the files in your task.

{
  "Records": [
    {
      ...
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "testConfigRule",
        "bucket": {
          "name": "YOUR_BUCKET_NAME",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          },
          "arn": "arn:aws:s3:::YOUR_BUCKET_NAME"
        },
        "object": {
          "key": "input/inputFile3.txt",
          "size": 1024,
          "eTag": "d41d8cd98f00b204e9800998ecf8427e"
        }
      }
    }
  ]
}

In the Execution results window you should see the message “Files processed successfully.”

Publish the S3 bucket events to the AWS Lambda function

From the console, connect the S3 bucket “Object Created” events to your AWS Lambda function by selecting “Add event source” function from the Actions menu (shown above), filling in the event source information S3 bucket:

Publish the S3 bucket events to the AWS Lambda function

Test end-to-end

Now that you’ve got everything wired up, you can create new files in your S3 bucket. You should see something similar to the screen below:

Test end-to-end

If an EMR Cluster is not started, you can review the CloudWatch Logs for your function to see what went wrong. You can also simulate all of the file arrivals through the console to see where the function may receive an error.

Congratulations! You’ve got a working system!

Optimizations

There are a few ways to optimize this system depending on your use case. Below are a few examples.

One-File to Many Tasks

If one file is a dependency for many tasks, you must adjust the FileUnit table appropriately and then adjust the get-tasks query to handle the strucuture change. You can use the format described above where the filename is the hashKey, or you can keep the same format but make the Task entries a set of values instead of a single value.

Sweeper

If your task size is fairly small and you’re expecting some frequency of arrival, you can adjust the batch size up to be more than 1 (it’s in the configuration file). If you do this, you might want to add a sweeper function that occurs on a timer to clean up any tasks that might not have run. That way you get the extra efficiency, but tasks that are ready for processing don’t wait too long for the batch to be full before running.

Conclusion

By using DynamoDB and Lambda, we’re able to track the status of incoming files and relate them to tasks. We can track which files have arrived for the task, and once all of the files necessary to complete it have arrived we can process them in EMR. This lets us process interdependent data sets as quickly as the data arrives. For Expedia, this means the potential for improved partner direction and revenue.

If you have questions or suggestions, please leave a comment below

———————

Related:

Powering Gaming Applications with DynamoDB

 

—————————————————————-

Love to work on open source? Check out EMR’s careers page.

—————————————————————-