Node.js Streaming MapReduce with Amazon EMR
Learn how to build and deploy a Node.js application on Amazon EMR
Submitted By: Ian Meyers
Language(s): Node.js
Created On: August 22, 2013
Node.js Streaming MapReduce with Amazon EMR
Introduction
Node.js is a JavaScript framework for running high performance server-side applications based upon non-blocking I/O and an asynchronous, event-driven processing model.
When customers need to process large volumes of complex data, Node.js offers a runtime that natively supports the JSON data structure. Languages such as Python and Ruby have excellent support for JSON data, but typically wrap structures with lists and arrays. Node.js offers a high performance and scalable alternative with native support for JSON data as objects. An AWS SDK is now available for Node.js (https://aws.amazon.com/sdkfornodejs), which allows for integration between Node.js applications and AWS services.
In this article, you'll see how to install Node.js on Amazon Elastic MapReduce (Amazon EMR); how to build and integrate a Node.js application with the Hadoop Streaming parallel processing architecture; and finally, how to deploy and run our Node.js MapReduce application on AWS. For more information about Amazon EMR and Hive, see the tutorials at https://aws.amazon.com/articles/2854.
This article assumes you have a good understanding of Hadoop, Hive, and Amazon EMR.
Use Case
In this article, we will consume data from Twitter that contains a complex graph of information about tweets, re-tweets, replies, and direct messages. Each file contains a single block of Twitter interactions that we read and write to Amazon Simple Storage Service (Amazon S3). We want to transform the data for Hive to aggregate metrics such as re-tweet rate, tweets per second, and direct messages per user.
Sample Input Data
Our data represents a complex interaction graph between multiple users of Twitter, for both re-tweets and replies, as shown here:
-
Data
- Interaction[]
- Demographic Information
- Interaction
- Author
- Name, Avatar, ID, etc
- Content, ID, Link
- Twitter Information
- ID, Reply to Tweet, Reply to Status
- Mentions[]
- User
- Description, Name, Date, Friend Count, Language
For the purposes of data discovery, we could investigate this data using Hive with the JsonSerde (as outlined in https://aws.amazon.com/articles/2854). However, because this graph is complex and self-referential, most Hive JsonSerdes are unable to expose this data as a table. By processing this data in Node.js, we can easily navigate the data graph with simple syntax.
Installing Node.js
We can install Node.js on an Amazon EMR cluster node with a bootstrap action ( https://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/Bootstrap.html ). We install Node.js with a Linux binary distribution that lets us simply download the package, unzip it, and use it without compiling or linking, to help the cluster start faster. This install script is now available from Amazon EMR directly, using a bootstrap action. Add the following line to your cluster start-up script:
--bootstrap-action s3n://elasticmapreduce/samples/node/install-node-bin-x86.sh
Writing MapReduce Code
Now that Node.js is on the Amazon EMR cluster and Hadoop Streaming is correctly configured, we need to be able to run our Node.js application for map and reduce operations. Using the Hadoop Streaming processing architecture ( https://hadoop.apache.org/docs/stable/streaming.html), we can specify the mapper and reducer code to use for a streaming job.
Like any other Hadoop Streaming compatible language, we must read data from the Amazon S3 data storage or HDFS file system using standard input (stdin). In
Node.js, stdin is accessible using the process global object (https://nodejs.org/api/process.html). This
gives us access to several controls to manage the input and output streams for reading and writing data, such as process.stdin
and process.stdout
.
There are five main functions that our MapReduce program must perform to read data from Hadoop Streaming and output results:
Configure Standard Input
By default, the process.stdin input channel is paused and does not raise events. Before enabling it, we must configure the desired character set encoding. For non-multi-byte character sets, we can use UTF-8. So, the main processing of our mapper or reducer should start with:
process.stdin.setEncoding('utf8');
Alternatively, we can use UTF-16 for multi-byte support. Afterward, we must enable stdin to raise events by resuming it:
process.stdin.resume();
Handle Input from STDIN
The process.stdin channel notifies our Node.js application using the stdin.data event, which fires when some amount data is available to be read. Our Node.js application must buffer this data for later use, because each chunk of data provided in the event may only be a portion of all data available from standard input. Because we configured Hadoop Streaming to use the non-splittable FileInputFormat, we receive complete JSON data into a single mapper, and we can process the file as a whole. Therefore, we buffer chunks of data in the data event by:
var buffer = ''; process.stdin.on('data', function(chunk) { buffer += chunk });
This appends all chunks of data to the buffer variable for use when all standard input has been read.
Process Completed stdin Data
After reading all data from stdin, the process raises the stdin.end event. We've collected all the data into a buffer, so we can now generate a JSON object for the file contents using:
process.stdin.on('end', function() { var obj; try { obj = JSON.parse(buffer); } catch (err) { process.stderr.write('Error Processing Content ' + buffer + '\n') process.stderr.write(err); // any other logic for error handling } });
Afterward, the obj variable is a fully-fledged JSON object containing the contents of the input file. We may choose to flatten the JSON data output for loading by a Hive JsonSerde, or we might choose to generate CSV or XML data instead.
Write Data in Hadoop-Compatible Format
For certain types of MapReduce operations, we want to ensure that a reducer receives all data for a certain type. To enable this, we must indicate a key value on which Hadoop sorts the output before calling the reducer. In plain text processing, this value is indicated by a string followed by a tab (\t).
To write data for storage or reduction, write to stdout using process.stdout.write().
Make the File Executable
Amazon EMR runs mapper and reducer scripts by calling them with command line syntax, such as './mapper.js';. As such, we need to ensure that the Node.js module we've built can be called from the command line. To do this, we add a standard 'shebang' comment at the top of the mapper or reducer files so that it calls Node.js to run the contents of the script:
#!/usr/bin/env node
Next, you can test your mapper code (in a file called Mapper.js, for example) by calling on the command line:
./mapper.js < input-file-path
Deploying & Running
After we've written our mappers and reducers, we transfer them to Amazon S3, and run MapReduce against some input data using Amazon EMR.
The following example shows how to perform the steps using the Amazon EMR CLI (https://aws.amazon.com/developertools/2264), but you can also achieve the same results using the equivalent commands on the Amazon EMR console (console.aws.amazon.com/elasticmapreduce) or the Amazon EMR API ( https://docs.aws.amazon.com/ElasticMapReduce/latest/API/Welcome.html?r=8857).
We build our Amazon EMR CLI launch command by calling elastic-mapreduce or ruby elastic-mapreduce, and use the --create command to start a cluster using Hadoop Streaming:
elastic-mapreduce --create --stream --name NodeJSMapReduce
We add the location of the input and output files:
--input s3n://path-to-input-files --output s3n://path-to-output-files
And we add our mapper and reducer JavaScript files:
--mapper s3n://path-to-mapper.js --reducer s3n://path-to-reducer.js
and last but not least, we tell Amazon EMR to install Node.js using a bootstrap action:
--bootstrap-action s3n://elasticmapreduce/samples/node/install-node-bin-x86.sh
This gives us the full command line invocation:
elastic-mapreduce --create --stream / --name NodeJSMapReduce / --input s3n://path-to-input-files / --output s3n://path-to-output-files / --mapper s3n://path-to-mapper.js / --reducer path-to-reducer.js / --bootstrap-action s3n://elasticmapreduce/samples/node/install-node-bin-x86.sh
This command performs the following steps:
- Start an EMR cluster.
- Install Node.js.
- Process all input data files by writing the output of mappers and reducers to the output path.
- Shut down the cluster.
You only pay for the amount of time the process is running, and no other interaction with the Hadoop cluster is required to generate data.
Conclusion
Node.js can provide fast execution of MapReduce applications with terse, native syntax for processing complex JSON data. With Amazon EMR configuration options, you can easily run Node.js-based applications that scale over time or with input data volumes.
Appendix - Sample Map Reduce Application
The following MapReduce program outputs the tweet count by day for complex JSON-structured data; in this case, Twitter data collected using DataSift (datasift.com). It also escapes special characters such as newlines and tabs, and outputs the tweet created_at field as the key. The reducer then rolls up this data by date, outputting the total number of tweets.
Mapper
#!/usr/bin/env node var events = require('events'); var emitter = new events.EventEmitter(); var line = ''; // escape all control characters so that they are plain text in the output String.prototype.escape = function() { return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"') .replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t') .replace('\b', '\\b').replace('\f', '\\f'); } // append an array to the this one Array.prototype.appendArray = function(arr) { this.push.apply(this, arr); } // data is complete, write it to the required output channel emitter.on('dataReady', function(arr) { var dateComponents = arr[9].split(' '); var d = [dateComponents[1],dateComponents[2],dateComponents[3]].join(' '); var interaction = { key_date : d, content: { objectId : arr[0], hash : arr[1], id : arr[2], author_id : arr[3], author_avatar : arr[4], author_link : arr[5], author_name : arr[6], author_username : arr[7], content : arr[8], created_at : arr[9], link : arr[10], schema_version : arr[11], source : arr[12] } }; process.stdout.write(interaction.key_date + '\t' + JSON.stringify(interaction) + '\n'); }); // generate a JSON object from the captured input data, and then generate // the required output buildOutputSet = function() { var obj; // create the JSON object from the input file. if we cannot, then we discard // this file // // TODO Generate an exception here instead? if (!line || line == '') { return; } try { obj = JSON.parse(line); } catch (err) { process.stderr.write('Error Processing Line ' + line + '\n'); process.stderr.write(err); return; } // generate an output set per interaction object for ( var i = 0; i < obj.interactions.length; i++) { // create some convenience objects for syntax var int = obj.interactions[i]; var a = int.interaction.author; // pull out the bits of the object model we want to retain var output = [ obj.id, obj.hash, int.interaction.id, a.id, a.avatar, a.link, a.name, a.username, int.interaction.content.escape(), int.interaction.created_at, int.interaction.link, int.interaction.schema.version, int.interaction.source ]; // raise an event that the output array is completed emitter.emit('dataReady', output); } } // fires on every block of data read from stdin process.stdin.on('data', function(chunk) { line += chunk; }); // fires when stdin is completed being read process.stdin.on('end', function() { buildOutputSet(); }); // set up the encoding for STDIN process.stdin.setEncoding('utf8'); // resume STDIN - paused by default process.stdin.resume();
Reducer
#!/usr/bin/env node var events = require('events'); var emitter = new events.EventEmitter(); var remaining = ''; var interactionSummary = { day : '', count : 0 }; // escape all control characters so that they are plain text in the output String.prototype.escape = function() { return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"') .replace('\&', '\\&').replace('\r', '\\r').replace('\t', '\\t') .replace('\b', '\\b').replace('\f', '\\f'); } // append an array to the this one Array.prototype.appendArray = function(arr) { this.push.apply(this, arr); } // data is complete, write it to the required output channel emitter.on('dataReady', function(o) { process.stdout.write(JSON.stringify(o) + '\n'); }); // generate a JSON object from the captured input data, and then generate // the required output emitter.on('lineReady',function(data) { if (!data || data == '') { return; } try { obj = JSON.parse(data.split('\t')[1]); } catch (err) { process.stderr.write('Error Processing Line ' + data + '\n') process.stderr.write(err); return; } if (interactionSummary.day == '') { interactionSummary.day = obj.key_date; interactionSummary.count = 1; } else { if (obj.key_date != interactionSummary.day) { // raise an event that the reduced array is completed emitter.emit('dataReady', interactionSummary); interactionSummary.day = obj.key_date; interactionSummary.count = 1; } else { interactionSummary.count += 1; } } }); // fires on every block of data read from stdin process.stdin.on('data', function(chunk) { var capture = chunk.split('\n'); for (var i=0;iPlease note that this code is for example purposes only and should not be used in a production context.
Running the Example
We've published some sample input files that you can use to run the above map/reduce program on EMR, which can be located at s3://elastic-mapreduce/samples/node/data. To run the application, use the elastic-mapreduce command line client as follows:
elastic-mapreduce --create --stream / --name NodeJSMapReduce / --input s3n://elastic-mapreduce/samples/node/data/*.json / --output s3n:///node/output / --mapper s3n://elastic-mapreduce/samples/node/sample-mapper.js / --reducer s3n://elastic-mapreduce/samples/node/sample-reducer.js / --bootstrap-action s3n://elasticmapreduce/samples/node/install-node-bin-x86.sh / --instance-type m1.large --instance-count 3 Where
is the name of the bucket where you would like output to be created. When completed, multiple files will reside in the configured output bucket and path, and will contain a rollup of the number of tweets for the single day in the sample data set: {"day":"14 Feb 2013","count":1071}