AWS Big Data Blog
Use Apache Oozie Workflows to Automate Apache Spark Jobs (and more!) on Amazon EMR
Mike Grimes is an SDE with Amazon EMR
As a developer or data scientist, you rarely want to run a single serial job on an Apache Spark cluster. More often, to gain insight from your data you need to process it in multiple, possibly tiered steps, and then move the data into another format and process it even further. Perhaps you have a constant stream of data coming in and must kick off a job periodically, with dependent jobs that aggregate or further process the output. Something like this:
This problem is easy to solve, right? You can write scripts that run jobs in sequence, and use the output of one program as the input to another—no problem. But what if your workflow is complex and requires specific triggers, such as specific data volumes or resource constraints, or must meet strict SLAs? What if parts of your workflow don’t depend on each other and can be run in parallel?
Building your own infrastructure around this problem can seem like an attractive idea, but doing so can quickly become laborious. If, or rather when, those requirements change, modifying such a tool isn’t easy . And what if you need monitoring around these jobs? Monitoring requires another set of tools and headaches.
Enter Apache Oozie. Oozie is a workflow engine that can execute directed acyclic graphs (DAGs) of specific actions (think Spark job, Apache Hive query, and so on) and action sets. Oozie can also send notifications through email or Java Message Service (JMS) on events such as job state changes, or hit or missed SLAs.
Amazon EMR offers Oozie 4.2.0, the latest version, with examples completely set up and ready to go right out of the box. We’ve added a number of user experience improvements that make starting and checking up on Oozie jobs simple. Also, our tools allow you to easily install the Oozie examples, which lets you quickly bootstrap your learning. In a few minutes, you can have everything you need to start prototyping (or just playing with) Oozie workflows.
We’ve made a few improvements that make Oozie great on EMR:
- Installing the Oozie examples is as easy as running a single command on-cluster.
- Examples are pre-configured for your cluster, meaning most of them can be run immediately after installation.
- The Oozie CLI is automatically configured to point to the correct Oozie Server URL, so you don’t have to repeatedly add long arguments.
In this post, I’ll show how to get up and running on an EMR cluster with Oozie 4.2.0 and set up a simple Spark workflow. I’ll provide the steps needed to explore the on-cluster examples, which will put you on track toward building robust, automated workflows that are notify you of progress and errors and are extremely flexible and effortlessly scalable—all while remaining maintainable and modifiable.
Anatomy of an Oozie workflow
Most Oozie workflows consist of a .properties file (in this blog post we’ll refer to it as job.properties) and a workflow.xml file. These files are two key components to defining any Oozie workflow. The .properties entries are like arguments that get brought into the workflow.xml flow definition, allowing you to more easily parameterize your workflows.
In workflow.xml, the workflow logic is defined. Oozie has its own XML schema for defining jobs, which can be found in detail in the Apache Oozie documentation.
At the most basic level, all workflows are made up of a graph of nodes. Each of these has specific criteria that define how it should behave, such as what will initiate it, where its input and output should come from and go to, what to do in case of failure, and whether or not an SLA exists for it.
Running the Oozie Spark example on EMR
First off, if you haven’t already, start up a small EMR cluster and select the Oozie-Sandbox and Spark applications (there will be a small cost associated), along with any other apps you’d like to try running through Oozie. For more information on starting an EMR cluster, see the EMR documentation.
The Oozie distribution on EMR comes with a number of great example workflows to help you build your skillset and understand how Oozie jobs are composed. Another upside of using Oozie on EMR is that Oozie workflow examples are easy to install. Normally, if you maintain your own cluster, you have to extract the examples tarball, configure the job properties files to match the specifics of your cluster, and then upload those files into HDFS. We’ve made this very simple: When you’ve connected to the master node of the EMR cluster by SSH, if you haven’t already, go ahead and install the Oozie examples:
$ install-oozie-examples … Oozie examples installed to /usr/share/doc/oozie-4.2.0/examples
To make it easier, and to avoid having to use sudo to edit these files, copy them into your home directory:
$ cp –r /usr/share/doc/oozie-4.2.0/examples ~/ $ cd ~/examples/apps/spark
Take a look at the workflow.xml file to get a sense of how the example works, then we’ll start editing it to make it run a Spark job in yarn-cluster mode.
Edit the job.properties file to look like the following. Just edit the master property; the nameNode and jobTracker fields will already be configured to the correct address of your cluster’s master node.
nameNode=hdfs://ip-10-146-23-45.ec2.internal:8020 jobTracker=ip-10-146-23-45.ec2.internal:8032 master=yarn-cluster queueName=default examplesRoot=examples oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/spark
Before, the example was set up to only run in local mode, meaning the job ran only on the master node (you can read more about the different cluster modes in the Spark documentation). By swapping the mode out for yarn-cluster, you can coordinate Spark jobs that run on the entire cluster using Oozie.
One final piece is missing to be able to run spark jobs in yarn-cluster mode via Oozie. Oozie’s Sharelib is a set of libraries that live in HDFS which allow jobs to be run on any node (master or slave) of the cluster. Oozie’s Sharelib by default doesn’t provide a Spark Assembly jar that is compiled with support for YARN, so we need to give Oozie access to the one that’s already on the cluster. However you prefer, find where spark-assembly.jar is on the cluster. Here I’ve done it using the locate command:
$ sudo yum install -y mlocate 2>&1 > /dev/null && sudo updatedb && locate spark -assembly.jar /usr/lib/spark/lib/spark-assembly.jar
Oozie creates its Sharelib in a time-stamped directory in HDFS during cluster start-up. In order to learn where this directory is we can tell Oozie to update it, which will cause it to output the current Sharelib directory.
$ oozie admin -sharelibupdate [ShareLib update status] host = http://ip-10-236-0-121.ec2.internal:11000/oozie status = Successful sharelibDirOld = hdfs://ip-10-236-0-121.ec2.internal:8020/user/oozie/share/lib/lib_20160602184804 sharelibDirNew = hdfs://ip-10-236-0-121.ec2.internal:8020/user/oozie/share/lib/lib_20160602184804
Put the spark-assembly jar into spark/ under the Sharelib directory:
$ hdfs dfs -put /usr/lib/spark/lib/spark-assembly.jar hdfs://ip-10-236-0-121.ec2.internal:8020/user/oozie/share/lib/lib_20160602184804/spark/
And finally, instruct Oozie to update the Sharelib so it can make use of the new jar.
$ oozie admin -sharelibupdate
Now that our cluster is set up to execute Spark jobs with Oozie, go ahead and run the example to verify you have spark-on-yarn capabilities enabled with Oozie:
$ oozie job -config ~/examples/apps/spark/job.properties -run job: 0000004-160412001348535-oozie-oozi-W
Wait a few moments, then run this command:
$ oozie job -info 0000004-160412001348535-oozie-oozi-W
You’ll see that the job is running.
Wait a few more moments, and run that same oozie job command to see that the job completed successfully. Let’s take a closer look at what happened on the cluster:
- Oozie took the job.properties file we submitted, and used the oozie.wf.application.path property to look for a job definition named workflow.xml.
- All the other parameters from the job.properties file were then interpolated into this workflow definition.
- Oozie found a start node and followed that through to the Spark node.
- Oozie fired up a YARN application on a random node of the cluster, and invoked a spark-submit command with the arguments provided through the job.properties and workflow.xml files.
- Once this application completed, Oozie looked at its return status, and moved to the ok node as the application finished successfully.
- Oozie then followed this through to the end node, denoting the end of the workflow execution.
Writing your own Oozie workflow to run a simple Spark job
Here, we’ll work from scratch to build a different Spark example job, to show how a simple spark-submit query can be turned into a Spark job in Oozie.
Somewhere in your home directory, create a folder where you’ll build your workflow and put a lib directory in it. Let’s call this folder emr-spark.
$ mkdir -p ~/emr-spark/lib
Create the file workflow.xml in emr-spark:
<workflow-app xmlns='uri:oozie:workflow:0.4' name='SparkWordCount'> <start to='spark-node'/> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>${master}</master> <mode>${mode}</mode> <name>Spark on Oozie - WordCount</name> <class>org.apache.spark.examples.JavaWordCount</class> <jar>${nameNode}/user/${wf:user()}/emr-spark/lib/spark-examples.jar</jar> <arg>${wordCountInput}</arg> </spark> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end'/> </workflow-app>
Create a job.properties file:
nameNode=hdfs://ip-10-102-176-38.ec2.internal:8020 jobTracker=ip-10-102-176-38.ec2.internal:8032 master=yarn-cluster mode=cluster wordCountInput=s3://my-bucket/word-count-input.txt oozie.use.system.libpath=true oozie.wf.application.path=/user/${user.name}/emr-spark
The values you use for the nameNode and jobTracker properties will be different for your cluster. To find those addresses, use hostname:
$ echo hdfs://$(hostname –f):8020 hdfs://ip-10-5-171-2.ec2.internal:8020 $ echo $(hostname –f):8032 ip-10-5-171-2.ec2.internal:8032
Next, we need to make the spark-examples.jar file that comes with the Spark distribution (which contains the JavaWordCount class referenced in our workflow definition) available to Oozie. Using locate, find where spark-examples.jar is on the cluster, and copy it into emr-spark/lib.
$ locate spark-examples.jar /usr/lib/spark/lib/spark-examples.jar $ cp /usr/lib/spark/lib/spark-examples.jar ~/emr-spark/lib/
Finally, load the entire workspace up into HDFS so all the nodes of the cluster have access to it:
$ hdfs dfs -put -f ~/emr-spark .
Now you should be able to run the job and watch it succeed:
$ oozie job –config ~/emr-spark/job.properties –run job: 0000003-160505232152319-oozie-oozi-W $ oozie job –info 0000003-160505232152319-oozie-oozi-W ...
Now that you’ve built your own workflow from scratch, let’s link it up with another job.
Linking jobs with sequential, parallel, or other types of logic
Suppose you have another job that you want to run once the Spark job described earlier finishes. Perhaps you want to kick off a separate map-reduce job afterwards. Linking the two jobs is as easy as pointing the ok node from the Spark node to another action node. Here’s the full workflow XML with the two jobs linked together:
<?xml version="1.0" encoding="UTF-8"?> <workflow-app xmlns="uri:oozie:workflow:0.4" name="SparkWordCount"> <start to="spark-node" /> <action name="spark-node"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>${master}</master> <mode>${mode}</mode> <name>Spark on Oozie - WordCount</name> <class>org.apache.spark.examples.JavaWordCount</class< <jar>${nameNode}/user/${wf:user()}/emr-spark/lib/spark-examples.jar</jar> <arg>{wordCountInput}</arg> </spark> <ok to="mr-node" /> <error to="fail" /> </action> <action name="mr-node"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/output-data" /> <configuration> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>mapreduce.map.class</name> <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value> </property> <property> <name>mapreduce.reduce.class</name> <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value> </property> <property> <name>mapreduce.combine.class</name> <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value> </property> <property> <name>mapred.output.key.class</name> <value>org.apache.hadoop.io.Text</value> </property> <property> <name>mapred.output.value.class</name> <value>org.apache.hadoop.io.IntWritable</value> </property> <property> <name>mapred.map.tasks</name> <value>1</value> </property> <property> <name>mapred.input.dir</name> <value>${wordCountInput}> </property> <property> <name>mapred.output.dir</name> <value>/user/${wf:user()}/output-data> </property> </configuration> </map-reduce> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
And the updated job.properties file which includes the new parameters:
nameNode=hdfs://ip-10-102-176-38.ec2.internal:8020 jobTracker=ip-10-102-176-38.ec2.internal:8032 master=yarn-cluster mode=cluster wordCountInput=s3://my-bucket/word-count-input.txt queueName=default oozie.use.system.libpath=true oozie.wf.application.path=/user/${user.name}/emr-spark
When you run this code and check the status using the oozie job –info command, you should see that once the Spark job finishes, the map-reduce job will start.
Oozie’s powerful DAG engine also can run jobs in parallel using fork nodes. Here’s an example that shows the basic structure. Try tossing the jobs we defined earlier in here and see both of them get executed concurrently.
<start to="fork-node"/> <fork name="fork-node"> <path start="mr-node"/> <path start="spark-node"/> </fork> <action name=”spark-node”/> … <ok to=”join-node”/> </action> <action name=”mr-node”/> … <ok to=”join-node”/> </action> <join name="join-node" to=”end”/> <end name=”end”/>
This post provided an introduction to using Oozie on EMR. If you like what you see and want more examples of what Oozie can help you accomplish, I highly recommend looking through the examples installed on-cluster. Each one demonstrates the use of an action or tool that Oozie brings to the table.
If you have questions or suggestions, please leave a comment below.
—————————————————
Related
Sharpen your Skill Set with Apache Spark on the AWS Big Data Blog
Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.