AWS Big Data Blog

Build a Concurrent Data Orchestration Pipeline Using Amazon EMR and Apache Livy

Many customers use Amazon EMR and Apache Spark to build scalable big data pipelines. For large-scale production pipelines, a common use case is to read complex data originating from a variety of sources. This data must be transformed to make it useful to downstream applications, such as machine learning pipelines, analytics dashboards, and business reports. Such pipelines often require Spark jobs to be run in parallel on Amazon EMR. This post focuses on how to submit multiple Spark jobs in parallel on an EMR cluster using Apache Livy, which is available in EMR version 5.9.0 and later.

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. Apache Livy lets you send simple Scala or Python code over REST API calls instead of having to manage and deploy large jar files. This helps because it scales data pipelines easily with multiple spark jobs running in parallel, rather than running them serially using EMR Step API. Customers can continue to take advantage of transient clusters as part of the workflow resulting in cost savings.

For the purpose of this blog post, we use Apache Airflow to orchestrate the data pipeline. Airflow is an open-sourced task scheduler that helps manage ETL tasks. Customers love Apache Airflow because workflows can be scheduled and managed from one central location. With Airflow’s Configuration as Code approach, automating the generation of workflows, ETL tasks, and dependencies is easy. It helps customers shift their focus from building and debugging data pipelines to focusing on the business problems.

High-level Architecture

Following is a detailed technical diagram showing the configuration of the architecture to be deployed.

We use an AWS CloudFormation script to launch the AWS services required to create this workflow. CloudFormation is a powerful service that allows you to describe and provision all the infrastructure and resources required for your cloud environment, in simple JSON or YAML templates. In this case, the template includes the following:

The Airflow server uses a LocalExecutor (tasks are executed as a subprocess), which helps to parallelize tasks locally. For production workloads, you should consider scaling out with the CeleryExecutor on a cluster with multiple worker nodes.

For demonstration purposes, we use the movielens dataset to concurrently convert the csv files to parquet format and save it to Amazon S3. This dataset is a popular open-source dataset, which is used in exploring data science algorithms. Each dataset file is a comma-separated file with a single header row. The following table describes each file in the dataset.

Dataset Description
movies.tsv Has the title and list of genres for movies being reviewed.
ratings.csv Shows how users rated movies, using a scale from 1-5. The file also contains the time stamp for the movie review.
tags.csv Shows a user-generated tag for each movie. A tag is user-generated metadata about a movie. A tag can be a word or a short phrase. The file also contains the time stamp for the tag.
links.csv Contains identifiers to link to movies used by IMDB and MovieDB.
genome-scores.csv Shows the relevance of each tag for each movie.
genome-tags.csv Provides the tag descriptions for each tag in the genome-scores.csv file.

Building the Pipeline

Step 0: Prerequisites

Make sure that you have a bash-enabled machine with AWS CLI installed.

Step 1: Create an Amazon EC2 key pair

To build this ETL pipeline, connect to an EC2 instance using SSH. This requires access to an Amazon EC2 key pair in the AWS Region you’re launching your CloudFormation stack. If you have an existing Key Pair in your Region, go ahead and use that Key Pair for this exercise. If not, to create a key pair open the AWS Management Console and navigate to the EC2 console. In the EC2 console left navigation pane, choose Key Pairs.

Choose Create Key Pair, type airflow_key_pair (make sure to type it exactly as shown), then choose Create. This downloads a file called airflow_key_pair.pem. Be sure to keep this file in a safe and private place. Without access to this file, you lose the ability to use SSH to connect with your EC2 instance.

Step 2: Execute the CloudFormation Script

Now, we’re ready to run the CloudFormation script!

Note: The CloudFormation script uses a DBSecurityGroup, which is NOT supported in all Regions.

On the next page, choose the key pair that you created in the previous step (airflow_key_pair) along with a S3 bucket name. The S3 bucket should NOT exist as the cloudformation creates a new S3 bucket. Default values for other parameters have been chosen for simplicity.

After filling out these parameters to fit your environment, choose Next. Finally, review all the settings on the next page. Select the box marked I acknowledge that AWS CloudFormation might create IAM resources (this is required since the script creates IAM resources), then choose Create. This creates all the resources required for this pipeline and takes some time to run. To view the stack’s progress, select the stack you created and choose the Events section or panel.

It takes a couple of minutes for the CloudFormation template to complete.

Step 3: Start the Airflow scheduler in the Airflow EC2 instance

To make these changes, we use SSH to connect to the EC2 instance created by the CloudFormation script. Assuming your local machine has an SSH client, this can be accomplished from the command line. Navigate to the directory that contains the airflow_key_pair.pem file you downloaded earlier and insert the following commands, replacing your-public-ip and your-region with the relevant values from your EC2 instance. The public DNS Name of the EC2 instance can be found on the Outputs tab

Type yes when prompted after the SSH command.

chmod 400 airflow_key_pair.pem
ssh -i "airflow_key_pair.pem" ec2-user@ec2-your-public-ip.your-region.compute.amazonaws.com

For more information or help with issues, see Connecting to Your Linux Instance Using SSH.

Now we need to run some commands as the root user.

# sudo as the root user
sudo su
# Navigate to the airflow directory which was created by the cloudformation template – Look at the user-data section.
cd ~/airflow
source ~/.bash_profile

Below is an image of how the ‘/root/airflow/’ directory should look like.

Now we need to start the airflow scheduler. The Airflow scheduler monitors all tasks and all directed acyclic graphs (DAGs), and triggers the task instances whose dependencies have been met. In the background, it monitors and stays in sync with a folder for all DAG objects that it may contain. It periodically (every minute or so) inspects active tasks to see whether they can be triggered.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To get it started, run the airflow scheduler. It will use the configuration specified in airflow.cfg.

To start a scheduler, run the below command in your terminal.

airflow scheduler

Your screen should look like the following with scheduler running.

Step 4: View the transform_movielens DAG on the Airflow Webserver

The Airflow webserver should be running on port 8080. To see the Airflow webserver, open any browser and type in the <EC2-public-dns-name>:8080. The public EC2 DNS name is the same one found in Step 3.

You should see a list of DAGs on the Airflow dashboard. The example DAGs are left there in case you want you experiment with them. But we focus on the transform_movielens DAG for the purposes of this blog. Toggle the ON button next to the name of the DAG.

The following example shows how the dashboard should look.

Choose the transform_movielens DAG, then choose Graph View to view the following image.

This image shows the overall data pipeline. In the current setup, there are six transform tasks that convert each .csv file to parquet format from the movielens dataset. Parquet is a popular columnar storage data format used in big data applications. The DAG also takes care of spinning up and terminating the EMR cluster once the workflow is completed.

The DAG code can also be viewed by choosing the Code button.

Step 5: Run the Airflow DAG

To run the DAG, go back to the Airflow dashboard, and choose the Trigger DAG button for the transform_movielens DAG.

When the Airflow DAG is run, the first task calls the run_job_flow boto3 API to create an EMR cluster. The second task waits until the EMR cluster is ready to take on new tasks. As soon as the cluster is ready, the transform tasks are kicked off in parallel using Apache Livy, which runs on port 8998. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. To run more tasks in parallel (multiple spark sessions) in Airflow without overwhelming the EMR cluster, you can throttle the concurrency.

How does Apache Livy run the Scala code on the EMR cluster in parallel?

Once the EMR cluster is ready, the transform tasks are triggered by the Airflow scheduler. Each transform task triggers Livy to create a new interactive spark session. Each POST request brings up a new Spark context with a Spark interpreter. This remote Spark interpreter is used to receive and run code snippets, and return back the result.

Let’s use one of the transform tasks as an example to understand the steps in detail.

# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
    # ti is the Task Instance
    ti = kwargs['ti']
    cluster_id = ti.xcom_pull(task_ids='create_cluster')
    cluster_dns = emr.get_cluster_dns(cluster_id)
    headers = emr.create_spark_session(cluster_dns, 'spark')
    session_url = emr.wait_for_idle_session(cluster_dns, headers)
    statement_response = emr.submit_statement(session_url,   '/root/airflow/dags/transform/movies.scala')
    emr.track_statement_progress(cluster_dns, statement_response.headers)
    emr.kill_spark_session(session_url)

The first three lines of this code helps to look up the EMR cluster details. This is used to create an interactive spark session on the EMR cluster using Apache Livy.

create_spark_session

Apache Livy creates an interactive spark session for each transform task. The code for which is shown below. SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. The Spark session is created by calling the POST /sessions API.

Note: You can also change different parameters like driverMemory, executor Memory, number of driver and executor cores as part of the API call.

# Creates an interactive scala spark session. 
# Python(kind=pyspark), R(kind=sparkr) and SQL(kind=sql) spark sessions can also be created by changing the value of kind.
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

submit_statement

Once the session has completed starting up, it transitions to the idle state. The transform task is then submitted to the session. The scala code is submitted as a REST API call to the Livy Server instead of the EMR cluster, to have good fault tolerance and concurrency.

# Submits the scala code as a simple JSON command to the Livy server
def submit_statement(session_url, statement_path):
    statements_url = session_url + '/statements'
    with open(statement_path, 'r') as f:
        code = f.read()
    data = {'code': code}
    response = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
    logging.info(response.json())
    return response

track_statement_progress

The progress of the statement can also be easily tracked and the logs are centralized on the Airflow webserver.

# Function to help track the progress of the scala code submitted to Apache Livy
def track_statement_progress(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)

        #logging the logs
        lines = requests.get(session_url + '/log', headers={'Content-Type': 'application/json'}).json()['log']
        for line in lines:
            logging.info(line)

        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    logging.info('Final Statement Status: ' + final_statement_status)

The below is a snapshot of the centralized logs from the Airflow webserver.

Once the job is run successfully, the Spark session is ended and the EMR cluster is terminated.

Analyze the data in Amazon Athena

The output data in S3 can be analyzed in Amazon Athena by creating a crawler on AWS Glue. For information about automatically creating the tables in Athena, see the steps in Build a Data Lake Foundation with AWS Glue and Amazon S3.

Summary

In this post, we explored orchestrating a Spark data pipeline on Amazon EMR using Apache Livy and Apache Airflow. We created a simple Airflow DAG to demonstrate how to run spark jobs concurrently. You can modify this to scale your ETL data pipelines and improve latency. Additionally, we saw how Livy helps to hide the complexity to submit spark jobs via REST by using optimal EMR resources.

For more information about the code shown in this post, see AWS Concurrent Data Orchestration Pipeline EMR Livy. Feel free to leave questions and other feedback in the comments.

 


Additional Reading

If you found this post useful, be sure to check out Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Binal Jhaveri is a Big Data Engineer at Amazon Web Services. Her passion is to build big data products in the cloud. During her spare time, she likes to travel, read detective fiction and loves exploring the abundant nature that the Pacific Northwest has to offer.