AWS Big Data Blog
Manage and process your big data workflows with Amazon MWAA and Amazon EMR on Amazon EKS
Many customers are gathering large amount of data, generated from different sources such as IoT devices, clickstream events from websites, and more. To efficiently extract insights from the data, you have to perform various transformations and apply different business logic on your data. These processes require complex workflow management to schedule jobs and manage dependencies between jobs, and require monitoring to ensure that the transformed data is always accurate and up to date.
One popular orchestration tool for managing workflows is Apache Airflow. Airflow is a platform created by the community to programmatically author, schedule, and monitor workflows. At AWS re:Invent 2020, we announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA), a managed orchestration service for Airflow that makes it easy to build data processing workflows on AWS.
To perform the various data transformations, you can use Apache Spark, a widely used cluster-computing software framework that is open source, fast, and general purpose. One of the most popular cloud-based solutions to process vast amounts of data is Amazon EMR.
You may be looking for a scalable, containerized platform to run your Apache Spark workloads. With the proliferation of Kubernetes, it’s common to migrate your workloads to the platform in order to orchestrate and manage your containerized applications and benefit from the scalability, portability, extensibility, and speed that Kubernetes promises to offer.
Amazon EMR on Amazon EKS and Amazon MWAA help you remove the cost of managing your own Airflow environments and use Amazon Elastic Kubernetes Service (Amazon EKS) to run your Spark workloads with EMR runtime for Apache Spark. This post talks about the architecture design of the environment and walks through a demo to showcase the benefits these new AWS services can offer.
In this post, we show you how to build a Spark data processing pipeline that uses Amazon MWAA as a primary workflow management service for scheduling the job. The compute layer is managed by the latest EMR on EKS deployment option, which allows you to configure and run Spark workloads on Amazon EKS.
In our demo, we use the New York Citi Bike dataset, which includes rider demographics and trip data, etc. The data pipeline is triggered on a schedule to analyze ridership as new data comes in. The results allow up-to-date insights into ridership with regards to demographic groups as well as station utilization. This kind of information helps the city improve Citi Bike to satisfy the needs of New Yorkers.
Architecture overview
The following diagram shows the high-level architecture of our solution. First, we copy data from the Citi Bike data source Amazon Simple Storage Service (Amazon S3) bucket into the S3 bucket in our AWS account. Then we submit a Spark job through EMR on EKS that creates the aggregations from the raw data. The solution allows us to submit Spark jobs to the Amazon EKS cluster with Amazon Elastic Compute Cloud (Amazon EC2) node groups as well as with AWS Fargate.
Orchestrate the big data workflow
With Airflow, data engineers define Directed Acyclic Graphs (DAGs). Airflow’s rich scheduling support allows you to trigger this DAG on a monthly basis according to the Citi Bike dataset update frequency.
This DAG includes the following tasks:
- PythonOperator downloads an updated Citi Bike dataset from a public repository and uploads it to an S3 bucket.
- The
EmrContainersStartJobRun
operator submits a Spark job to an Amazon EKS cluster through the new Amazon EMR containers API. The Spark job converts the raw CSV files into Parquet format and performs analytics with SparkSQL. EmrContainersJobRunSensor
monitors the Spark job for completion.
For more information about operators, see Amazon EMR Operators.
Submit the Apache Spark job
AWS has recently launched an Airflow plugin for EMR on EKS that you can use with Amazon MWAA by adding it to the custom plugin location or with a self-managed Airflow. The plugin includes an operator and a sensor that interact with the new Amazon EMR containers API, which was introduced as part of the new EMR on EKS deployment option.
The Amazon EKS namespace is registered with an Amazon EMR virtual cluster. To submit a Spark job to the virtual cluster, the Airflow plugin uses the start-job-run command offered by the Amazon EMR containers API.
For each job submitted to a virtual cluster, Amazon EMR creates a container with everything that is required and submits a Spark application to an Amazon EKS cluster through Spark on Kubernetes support.
The EmrContainersStartJobRun
Airflow operator exposes the arguments of the start-job-run
command and can override the default Spark properties such as driver memory or number of cores. You can configure these properties in the DAG by specifying the sparkSubmitParameters
in the jobDriver
or the configuration-overrides
argument.
Choose Amazon EKS data plane options
The Amazon EKS data plane supports EC2 node groups as well as Fargate (for more information, see Amazon EKS on AWS Fargate Now Generally Available). In our demo, the Amazon EKS cluster contains an EC2 node group, a Fargate profile, and two Kubernetes namespaces. One of the namespaces is declared in the Fargate profile pod selector and the pods deployed into this namespace are launched on Fargate. The pods deployed into the other namespace are launched on the EC2 node group. We register each Amazon EKS namespace with a virtual cluster.
When you choose a data plane option, you should consider the following:
- Compute provisioning
- Storage provisioning
- Job initialization time
Compute provisioning
With EC2 node groups, you can share EC2 resources like vCPUs and memory between different Spark jobs or with other workloads within the same Amazon EKS cluster.
Running a Spark job on Fargate removes the need to keep running EC2 worker nodes and allows you to provision right-sized capacity as the job is submitted. Unlike Amazon EC2, each Kubernetes pod is allocated a virtual machine, so the pod runs on the dedicated resources of the VM. When job is complete and the pod exits, you’re not billed for any resources.
For EC2 node groups, you can also use Kubernetes node selectors to run a Spark driver and executor pods on a subset of available nodes such as running nodes in a single Availability Zone.
Storage provisioning
By default, with EC2 node groups Spark uses ephemeral storage for intermediate outputs and data that doesn’t fit in RAM. If you need more storage or need to share data across applications, you can use Kubernetes persistent volumes to mount a volume on Spark pods.
With Fargate, local storage disk space is limited and fits those Spark workloads that don’t need a significant amount of storage for shuffling operations.
Job initialization time
Using EC2 node groups allows the Spark jobs to start immediately as you submit them.
Spark jobs on Fargate add several minutes to the startup time. You may choose to run a Spark driver on Amazon EC2 for quicker startup time while running Spark executors on Fargate. In this case, configure the Fargate profile pod selector to include pods that match the Kubernetes label emr-containers.amazonaws.com/component: executor
.
Deploy the resources with AWS CloudFormation
This post provides an AWS CloudFormation template for a one-click deployment experience to set up all the necessary resources for our demo.
The following diagram illustrates the infrastructure of our solution.
The Amazon EKS EC2 node group is deployed into two private subnets in a VPC, and we have a NAT gateway in the public subnets. The Fargate profile is configured to launch pods into the same private subnets. An AWS EC2 host instance pre-configured with relevant tools is launched into one of the private subnets to help you configure the EKS cluster post the CloudFormation deployment.
The main stack creates an S3 bucket to store the plugins and Python libraries for the Amazon MWAA environment and to store the result files from the Citi Bike analytics workflow. The main stack then calls three nested stacks sequentially to create the following resources:
- Amazon EKS stack – Creates a VPC with two public and two private subnets, and an Amazon EKS cluster with three worker nodes in the private subnets and a control plane with a public API server.
- Prep stack – Creates a host EC2 instance preinstalled with the AWS Command Line Interface (AWS CLI), kubectl, and eksctl in one of the two private subnets, and can only be accessed through Session Manager in AWS System Manager. The host EC2 instance also has scripts available to configure the kubectl client and register the Amazon EC2 and Fargate namespaces with EMR virtual clusters.
- Amazon MWAA Stack – Creates an Amazon MWAA environment with the EMR on EKS plugin and appropriate Boto3 library installed.
To get started, you need to have an AWS account. If you don’t have one, sign up for one before completing the following steps:
- Sign in to the AWS Management Console as an AWS Identity and Access Management (IAM) power user, preferably an admin user.
- Choose Launch Stack:
This template has been tested in the US East (N. Virginia), US West (Oregon), and Europe (Ireland) Regions. If you want to deploy in a Region other than these three, check service availability.
- Choose Next.
- For Stack name, enter a name for the stack, for example,
mwaa-emr-on-eks-blog
.
Don’t use a stack name longer than 20 characters.
- You can specify your choice of Amazon EKS cluster name, Amazon EKS node instance type, Fargate namespace, and Amazon MWAA environment name, or use the default values.
- Choose Create Stack.
The stack takes about 35 minutes to complete. After the stack is deployed successfully, navigate to the Outputs tab of the stack details in the main stack and save the key-value pairs for reference.
- On the Systems Manager console, choose Session Manager in the navigation pane.
- Choose Start session.
- Select the EC2 host instance named
<stack name>-xxxx-PrepStack-xxxx-jumphost
. - Choose Start session.
A terminal of the host instance opens up.
- Export the
AWS_SECRET_ACCESS_KEY
andAWS_ACCESS_KEY_ID
environment variables for the user that deployed the CloudFormation stack.
For more information, see Environment variables to configure the AWS CLI.
- At the prompt, enter the following command to configure the Kubernetes client (kubectl) and download the certificate from the Amazon EKS control plane for authentication:
You should see the script running with an Amazon EKS node availability status update.
Fargate logging is configured in this step too. You can download the kubeconfig.sh script for reference.
Next, we register an Amazon EKS namespace with an EMR virtual cluster.
- Issue a command by entering
sh /tmp/emroneks.sh <namespace> <EMR virtual cluster name>
. For example:
This command automates the steps required to setup EMR on EKS. It also generates a DAG file called citibike_all_dag.py
and copies it to the dags
folder of the S3 bucket provisioned by the CloudFormation stack.
The DAG file is picked up by Airflow scheduler and displayed in the Airflow UI. Choose the output URL link in the terminal to go to the Amazon MWAA console and open the Airflow UI.
You can download the emroneks.sh script for reference. It takes about 10–15 seconds for the DAG to show up on the Airflow UI; refresh the browser if it doesn’t appear.
At this point, if you go to the Amazon EMR console and choose Virtual clusters, you should see a virtual cluster created accordingly.
You can also submit the job to an Amazon EKS namespace (fargate-ns
) backed by Fargate. To do so, go back to the terminal session and enter the command sh emroneks-fargate.sh <EMR Fargate virtual cluster>
. For example:
This command registers the Fargate namespace on Amazon EKS with an EMR virtual cluster and uploads the DAG file to the S3 bucket for Airflow to pick up. You can download emroneks-fargate.sh for reference.
Build the environment manually in your account
Alternatively, you can build this solution manually in your AWS account by following the instructions in this section. You can skip this section and go directly to the next section if you want to start exploring the Airflow UI right away to run the DAG.
This post uses an AWS Cloud9 IDE, but you can use any machine with access to AWS. Use an IAM user or role with the AdministratorAccess
policy in your AWS credentials chain. To use AWS Cloud9, complete the following steps:
- Set up a workspace.
- Create an IAM role with administrator access and either attach the IAM role to the EC2 instance or export your AWS credentials before running the commands.
- Set the environment variables that are used throughout this guide: the EKS cluster name, EMR virtual cluster names, and Kubernetes namespaces:
Now you’re ready to set up EMR on EKS.
- Install the AWS CLI (already preinstalled on AWS Cloud9) and kubectl:
- Install eksctl:
- Create an Amazon EKS cluster with an EC2 node group, Fargate profile, and OIDC provider (this process takes 15 minutes):
To make access easier in the manual steps, the nodes are in public subnets, but you can modify private networking to make them private.
- Create two namespaces:
- Create a Kubernetes role, bind the role to a Kubernetes user, and map the Kubernetes user with the service linked role
AWSServiceRoleForAmazonEMRContainers
: - Create a job execution role for Amazon EMR:
- Update the trust policy:
- Create two virtual clusters:
- On the Amazon S3 console, create a new bucket called
airflow-bucket-<your-account-id>-my-mwaa-env
.
Make sure that the bucket has Block Public Access enabled.
- In AWS Cloud9, export the name of the new bucket:
- Populate the bucket with the Airflow custom operator plugin, the
requirements.txt
for dependencies to be installed on Airflow worker nodes, thedags
folder with two DAGs, and the Spark application code.
Before we copy the DAG, we replace the placeholder IDs in the DAG template file with the actual bucket name and virtual cluster IDs for Fargate and EC2 namespaces.
At this point, your bucket should be populated with everything it needs.
- On the Amazon MWAA console, choose Create environment.
- For Name, enter
my-mwaa-env
. - For S3 bucket, enter
s3://<your_airflow_bucket_name>
. - For DAGs folder, enter
s3://<your_airflow_bucket_name>/dags
. - For Plugins file, enter
s3://<your_airflow_bucket_name>/emr_containers_airflow_plugin.zip
. - For Requirements file, enter
s3://<your_airflow_bucket_name>/requirements.txt
. - Choose Next.
- On the Configure advanced settings page, under Networking, choose the VPC of the EKS cluster.
To find the VPC, navigate to Amazon EKS console, choose your cluster, then choose Configuration and Networking.
- Under Subnets, select the private subnets.
- Under Web server access, select Public Network.
- For Execution Role and Security Group, select Create New.
- Keep the remaining values at their defaults and choose Create new environment.
For more details, see Create an Amazon MWAA environment.
As part of the Amazon MWAA environment, an IAM role is created. You need to add permissions to this IAM role in order to access the public tripdata
bucket as well as permission to invoke jobs on EMR on EKS. You can find the role on the Edit environment page under Permissions and Execution role.
- Replace the bucket name below and add the following privileges to the policy attached to the role.
{ "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject*" ], "Resource": [ "arn:aws:s3:::tripdata", "arn:aws:s3:::tripdata/*" ] }, { "Effect": "Allow", "Action": [ "s3:*" ], "Resource": [ "arn:aws:s3:::<your bucket name>", "arn:aws:s3:::<your bucket name>/*" ] }, { "Action": [ "emr-containers:StartJobRun", "emr-containers:ListJobRuns", "emr-containers:DescribeJobRun", "emr-containers:CancelJobRun" ], "Resource": "*", "Effect": "Allow" }
Run the Citi Bike ridership analytics DAG on Airflow
On the Airflow UI, you can see two DAGS: Citibike_Ridership_Analytics
and Citibike_ridership_Analytics_Fargate
, which were copied to the S3 bucket. Switch them on or off by choosing On or Off.
Choose the DAG name then Graph View to visualize the job workflow (see the following screenshot). Citi Bike publishes their monthly datasets in .zip format to a public S3 bucket (s3://tripdata
), so Airflow spins up 12 parallel tasks to copy and unzip the files to the csv
folder in the S3 bucket (each handles a month of data), then kicks off a PySpark task that transforms the CSV files into the Parquet columnar storage format (saved in the parquet
folder). Then it spins up a task named start_citibike_ridership_analytics
that uses SparkSQL to query the dataset, and finally it saves the results to the results
folder.
You can download the PySpark script citibike-spark-all.py for your reference.
Choose the Code tab to see the sample source code of the DAG. You can also download the DAG citibike_all_dag.py for reference.
By default, no schedule is created to run the DAG, so we need to manually trigger the DAG. Choose the Trigger DAG tab to start the job flow. Switch to Tree View and monitor the progress on the Refresh tab . You can see the task squares change colors indicating different status (light green for running and dark green for success status). You should see all squares in dark green when the job completes successfully.
While the start_citibike_ridership_analytics
task is running, you can go back to the host instance terminal and enter the command watch kubectl get pod --namespace ec2-ns
to see the Spark driver and executor containers get spun up to process the data:
You can also tail the log of job progress with the following command:
kubectl logs -c spark-kubernetes-driver -n ec2-ns -f <spark driver pod name from above command>
These logs are also sent to the Amazon CloudWatch log group named /emr-containers/jobs/
. To view them, go to the CloudWatch console and choose Log groups in the navigation pane, You can find /emr-container/jobs/
in the list and choose it to see the detailed logs produced by this job run.
You can change the log group name by modifying the logGroupName
of the CONFIGURATION_OVERRIDES
section in the JOB_DRIVER
definition (see the following code snippet of the DAG citibike_all_day
.py). You have to resubmit the DAG (copying the modified DAG to the dags
folder of the S3 bucket) for Airflow to pick it up.
Finally, to view the analytics results, go to the Amazon S3 console and choose the bucket that was provisioned by CloudFormation, usually by the name pattern airflow-bucket-xxxxx-<stack name>-xxxxx
. Go to the citibike folder, and further into the results subfolder. Choose the ridership
subfolder to see a CSV file with the name pattern part-xxxxx.csv
. This is the query result of total trips by month in 2020. You can see March and April have the lowest numbers when the city was hit hardest by the COVID-19 pandemic.
To view the Spark history server UI, navigate to the Virtual clusters section on the Amazon EMR console, choose the job, and choose View logs.
This launches the web UI in the Spark History server with jobs, stages, Spark event logs, and other details. The Spark history server UI is available during job runs and is stored for 30 days after job creation.
Clean up the resources deployed by CloudFormation stack
You may want to clean up the demo environment and any resources you deployed when you’re done. On the AWS CloudFormation console, select the template and choose Delete. Make sure that the delete operation is successful and all the resources were removed.
This action also deletes the S3 bucket and any data in it. If you want to retain the data for future use, you should make a copy of the bucket before you delete it. However, for virtual clusters and related resources that were created by scripts, delete them with the code in manual Step 3 in below section.
Clean up the resources deployed by the manual procedure
- On the Amazon MWAA Console, delete the environment created in step 11
- On the Amazon S3 console, empty and delete the bucket created in step 8
- In your terminal, run the following script to delete the resources created by the manual steps
aws iam delete-role-policy --role-name EMROnEKSExecutionRole --policy-name Permissions-Policy-For-EMR-EKS
aws iam delete-role --role-name EMROnEKSExecutionRole
Conclusion
In our post, we showed how you can orchestrate an ETL pipeline using Amazon MWAA with EMR on EKS. We created an Airflow DAG to trigger scheduled periodic Spark jobs that process data using a custom Airflow operator.
You can use the provided CloudFormation template or the manual procedure to get started today runnning your Spark jobs on Amazon EKS.
About the Authors
James Sun is a Senior Solutions Architect with Amazon Web Services. James has several years of experience in information technology. Prior to AWS, he held several senior technical positions at MapR, HP, NetApp, Yahoo, and EMC. He holds a PhD from Stanford University.
Dima Breydo is a Senior Solutions Architect with Amazon Web Services. He helps Startups to architect solutions in the cloud. He is passionate about container-based solutions and big data technologies.
Alon Gendler is a Senior Startup Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high-performance applications in the cloud.