Containers

Dynamic Spark Scaling on Amazon EKS with Argo Workflows and Events

Introduction

Kubernetes has gained widespread adoption in the field of data processing because of its ability to package and deploy applications as containers with all required dependencies, as well as its support for running data frameworks. This makes it easy for developers to run their Data Analytics/Machine Learning (ML) applications within a Kubernetes cluster and allows them to run multiple versions of these apps within the same cluster. Additionally, Kubernetes offers strong scaling options for both nodes and pods, which enables users to easily scale batch jobs as needed, and a wide variety of tools for observability, logging, batch scheduling, and job orchestration, making it a popular choice for running data platforms.

Effective orchestration is essential for running batch jobs like Spark efficiently. For instance, generating a ready-to-consume golden dataset often requires the execution of multiple Spark jobs, either in parallel or sequentially. Without proper orchestration, managing and automating these jobs can be difficult and time-consuming because it requires coordinating the execution of multiple tasks and ensuring that they are executed in the correct order. One popular tool for managing workflows is Apache Airflow, and Amazon offers a managed version of this tool called Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which allows users to orchestrate workflows without having to worry about managing the underlying infrastructure. Another orchestration tool is Argo Workflows, which is built on top of Kubernetes as a custom resource definition (CRD)-based workflow engine. It allows you to define the workflows where each step in the workflow is a container. Argo Workflows can be managed using kubectl and is natively integrated with other Kubernetes services such as volumes, secrets, and Role-Based Access Control (RBAC). Argo Workflows includes a feature called Argo Events, which allows users to trigger workflow execution based on external events such as the completion of another workflow or the arrival of new data. This can be especially useful for building dynamic and event-driven data pipelines. Argo Workflows is a powerful tool for managing and automating complex workflows within a Kubernetes cluster.

In this post, we demonstrate how to build and deploy a data processing platform on Amazon Elastic Kubernetes Service (Amazon EKS) with Data on EKS Blueprints. The platform includes all the necessary Kubernetes add-ons like Argo Workflows, Argo Events, Spark Operator for managing Spark jobs, Apache YuniKorn for Batch Scheduler, Fluent Bit for logging, and Prometheus for metrics. We demonstrate how to build data processing jobs and pipelines using Argo Workflows and Argo Events, and show how to trigger workflows on-demand by listening to Amazon Simple Queue Service (Amazon SQS).

Solution overview

In this solution, we create an Amazon EKS cluster with one managed node group and install Argo Workflows and Argo Events in their own dedicated namespaces (argo-workflows and argo-events). An Amazon SQS queue is created to receive requests from users, and an SQS event source object in the Argo events namespace is set up to fetch messages from this external queue. When a new message is added to the queue, the sensor triggers the execution of the specified workflow. A sensor in Argo Workflow is a special type of workflow that continually runs and waits for a certain condition to be met before triggering the execution of another workflow. This workflow creates a Spark application with Spark Operator in the data-team-a namespace, consisting of one Spark driver pod and two executor pods.

The following diagram illustrates the high-level design of using Argo Events sources to trigger the execution of Spark jobs.

 high-level architecture diagram

The following diagram shows the detailed design that will be implemented in this post.

low-level architecture diagram

Deploying the Solution

In this post, we provision the followings resources using terraform modules with Data on Amazon EKS (DoEKS) blueprints:

  • A sample Virtual Private Cloud (VPC) with three Private Subnets and 3 Public Subnets.
  • Internet gateway for Public Subnets and Network Address Translation (NAT) Gateway for Private Subnets, VPC endpoints for Amazon Elastic Container Registry (Amazon ECR), Amazon Elastic Compute Cloud (Amazon EC2), Security Token Service (STS), etc.
  • Amazon EKS Cluster Control plane with one managed node group with one application team data-team-a.
  • Amazon EKS Managed Add-ons: VPC_CNI, CoreDNS, Kube_Proxy, EBS_CSI_Driver
  • K8S metrics server, cluster autoscaler, yunikorn scheduler, Spark operator, Fluent Bit, Prometheus, and Argo workflows.
  • K8s and Amazon Identity and Access Management (AWS IAM) roles and rolebindings for Argo Workflows and Argo Events.

In addition, we deploy Argo Events and Event Bus using kubectl commands and create Amazon SQS queue using AWS CLI.

Prerequisites

Ensure that you have the following tools installed locally:

  1. AWS CLI
  2. kubectl
  3. Terraform >=1.0.0
  4. Argo CLI

Deploy infrastructure

Let’s start by cloning the code repo to your local machine.

git clone https://github.com/awslabs/data-on-eks.git
cd data-on-eks/schedulers/terraform/argo-workflow

You need to setup your AWS credentials profile locally before running Terraform or AWS CLI commands. Update the region and execute the following commands to deploy the blueprint.

region=<your region> #set region
terraform init 
terraform apply -var region=$region --auto-approve #defaults to us-west-2

Verify the resources

This process may roughly take 20 minutes. Once done, you’ll see the output similar to the following. You need to modify your deployment yaml files with those output values in the following steps.

terraform apply output screenshot

The following command will update the kubeconfig on your local machine and allow you to interact with your Amazon EKS Cluster using kubectl to validate the deployment.

aws eks --region $region update-kubeconfig --name argoworkflows-eks

List the nodes and the namespaces in Amazon EKS cluster.

kubectl get nodes
# Output should look like below
NAME                                        STATUS   ROLES    AGE   VERSION
ip-10-1-131-99.us-west-2.compute.internal   Ready    <none>   26h   v1.23.9-eks-ba74326
ip-10-1-16-117.us-west-2.compute.internal   Ready    <none>   26h   v1.23.9-eks-ba74326
ip-10-1-80-41.us-west-2.compute.internal    Ready    <none>   26h   v1.23.9-eks-ba74326
kubectl get ns
# Output should look like below
NAME                 STATUS   AGE
argo-events          Active   73m
argo-workflows       Active   73m
aws-for-fluent-bit   Active   73m
data-team-a          Active   73m
default              Active   78m
kube-node-lease      Active   78m
kube-public          Active   78m
kube-system          Active   78m
prometheus           Active   73m
spark-operator       Active   73m
yunikorn             Active   73m

Deploy Argo Events and EventBus

Let’s install the Argo Events with yaml files which is recommended by the Argo Events project (i.e., the helm chart installation is possible but maintained solely by the community). The installation includes two steps. First step is to install Argo Events controller-manager and events-webhook.

# install Argo Events Controllers
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml

kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install-validating-webhook.yaml

kubectl  get pods -n argo-events

# Output should look like below
NAME                                      READY STATUS RESTARTS AGE
controller-manager-6f9c6cffc8-jmb4n        1/1  Running   0    3h45m
events-webhook-7f6dcd9bb-lfkp7             1/1  Running   0    3h45m

And the second step is to set up the EventBus. EventBus is a Kubernetes Custom Resource that’s used for event transmission from EventSources to Sensors. Currently, EventBus is backed by (Networked Approach To Technology Services) NATS, including both their NATS Streaming service and their newer Jetstream service. NATS is a high-performance messaging system that’s designed to be simple, lightweight, and easy to use. It’s often used as a messaging system for microservices architectures, and it’s well-suited for use in cloud-native environments. In the yaml file below, you will create a NATS service with three replicas.

File Location: argo-events/eventbus.yaml.

code sample

Let’s apply the manifest to deploy the EventBus.

kubectl apply -f argo-events/eventbus.yaml

You should get the following resources in namespace argo-events so far.

kubectl get pods -n argo-events
# Output should look like below
NAME                                      READY STATUS RESTARTS AGE
controller-manager-6f9c6cffc8-jmb4n        1/1  Running   0    3h45m
eventbus-default-stan-0                    2/2  Running   0    3h45m
eventbus-default-stan-1                    2/2  Running   0    3h45m
eventbus-default-stan-2                    2/2  Running   0    3h45m
events-webhook-7f6dcd9bb-lfkp7             1/1  Running   0    3h45m

Set up Amazon SQS Queue as eventsource

The Argo Events system requires an eventsource object to monitor external events and initiate subsequent operations. As an example, we will configure an Amazon SQS eventsource in Argo Events and connect it to an external Amazon SQS queue. The following image shows the SQS event source config yaml.

File location: argo-events/eventsource-sqs.yaml.

code sample

Let’s run the following command to create the Amazon SQS event source.

kubectl apply -f argo-events/eventsource-sqs.yaml
kubectl  get pods -n argo-events
# Output should look like below
NAME                                                  READY   STATUS    RESTARTS   AGE
aws-sqs-eventsource-9l8kd-ff6dd646d-bwlx2             1/1     Running   0          3h44m
controller-manager-6f9c6cffc8-jmb4n                   1/1     Running   0          3h45m
eventbus-default-stan-0                               2/2     Running   0          3h45m
eventbus-default-stan-1                               2/2     Running   0          3h45m
eventbus-default-stan-2                               2/2     Running   0          3h45m
events-webhook-7f6dcd9bb-lfkp7                        1/1     Running   0          3h45m

For this example, we have set up an eventsource to monitor the test1 Amazon SQS queue in the us-east-1 Region. It’s important to note that the eventsource is capable of monitoring events across regions, so the Amazon EKS cluster and Amazon SQS queue don’t need to be located in the same Region.

Now, let’s create that queue in your account and capture the QueueUrl output for the next steps.

# create a queue
aws sqs create-queue --queue-name test1 --region us-east-1

# result
QueueUrl: https://sqs.us-east-1.amazonaws.com/xxxxxxxxxx/test1

In order to enable the eventsource to retrieve messages from the Amazon SQS queue, we need to update the argo-events/sqs-accesspolicy.json file by replacing both <QueueArn> and <your_event_irsa_arn> (which can be found in the Terraform output values). After making these replacements, run the following command, making sure to first replace <QueueUrl> in the command with the URL of your queue. This updates the access policy of the Amazon SQS queue to allow the eventsource to fetch messages from it.

aws sqs set-queue-attributes --queue-url <QueueUrl> --attributes file://argo-events/sqs-accesspolicy.json --region us-east-1 

To confirm that the necessary permissions have been granted, you can go to the AWS Console, navigate to the test1 SQS queue in the us-east-1 Region, and then go to the Access Policy tab. Here, you should see that the queue has been granted access permissions to IAM Roles for Service Accounts (IRSA) in your Amazon EKS cluster.

SQS policy screenshot

Create an Argo workflow for running Spark application

The last step is to create a sensor object (of Kind: Sensor) to trigger a workflow that runs a spark application. This sensor object is run by a service account called operate-workflow-sa (defined in sensor-rbac.yaml) with the permission to run the workflow in argo-workflows namespace. The workflow creates a Spark application by submitting a SparkApplication Object to the Spark Operator. The following image shows the sample code yaml code snippet for creating Argo Workflow for running multiple Jobs.

File location: argo-events/sensor-sqs-sparkjobs.yaml

Let’s run the following commands to deploy Argo Worklfow and the RBAC permissions.

kubectl apply -f argo-events/sensor-sqs-sparkjobs.yaml 
kubectl apply -f argo-events/sensor-rbac.yaml

Trigger the data pipeline

To test the data pipeline, we trigger it by inserting a message into the Amazon SQS queue. To do this, open the Amazon SQS console, navigate to the Queues page, and then select the test1 queue that you created. From the Actions menu, Select the queue then select Send and receive messages, enter the message in JSON format{“message”: “hello”} in the Message body, and select Send message.

SQS send message UI

Go back to the Terminal and execute the following command to verify the new Argo workflow.

kubectl get wf -A
# Output should look like below
NAMESPACE        NAME                           STATUS      AGE    MESSAGE
argo-workflows   aws-sqs-spark-workflow-qr88j   Succeeded   4m9s   

Run the command below to check spark application driver pods and executor pods under data-team-a namespace.

kubectl get po -n data-team-a
# Output should look like below
NAME                               READY   STATUS      RESTARTS   AGE
event-wf-sparkapp-26xr6-driver     1/1     Running     0          13s
pythonpi-880138851d302cc2-exec-1   1/1     Running     0          3s
pythonpi-880138851d302cc2-exec-2   1/1     Running     0          3s

Access Argo Workflows web user interface

To monitor the progress of an Argo Workflow job, you can use the web interface provided by Argo Workflows. This graphical user interface (GUI) allows you to submit workflows and view their status and logs. To access the web UI, follow these steps. Alternatively, you can also manage workflows using the Argo Workflows CLI.

Execute the following command to extract the bearer token.

$ argo auth token # get login token

# example result:
Bearer k8s-aws-v1.aHR0cHM6Ly9zdHMudXMtd2VzdC0yLmFtYXpvbmF3cy5jb20vP0FjdGlvbj1HZXRDYWxsZXJJZGVudGl0eSZWZXJzaW9uPTIwMTEtMDYtMTUmWC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNWNFhDV1dLUjZGVTRGMiUyRjIwMjIxMDEzJTJGdXMtd2VzdC0yJTJGc3RzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyMjEwMTNUMDIyODAyWiZYLUFtei1FeHBpcmVzPTYwJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCUzQngtazhzLWF3cy1pZCZYLUFtei1TaWduYXR1cmU9NmZiNmMxYmQ0MDQyMWIwNTI3NjY4MzZhMGJiNmUzNjg1MTk1YmM0NDQzMjIyMTg5ZDNmZmE1YzJjZmRiMjc4OA

Let’s run port-forward command to access the Argo Workflow Web UI from localhost.

$ kubectl -n argo-workflows port-forward deployment.apps/argo-workflows-server 2746:2746

Open browser and enter http://localhost:2746/ and paste the token (whole text including Bearer) into the yellow circle as shown in the following details.

Argo workflows login UI

To view the entire data pipeline, click the Event Flow button on the left panel, as illustrated below. This displays the data pipeline visually.

data pipeline in Argo Workflows UI

In the image below, you can see that the jobs helloworld-job1 and helloworld-jobs2 are executed sequentially, while helloworld-jobs2 and spark-operator-job are executed in parallel. When a job is successful, it’s indicated by a green circle with a tick, indicating that the job is complete. To view the detailed steps within a job, click on the green circle. This is shown in the following image.

data processing job running status in Argo Workflows UI

Cleaning up

To clean up the resources created in the post, you can follow these steps:

aws sqs delete-queue --queue-url <QueueUrl> --region us-east-1 #Delete sqs test1 

kubectl delete -f argo-events/. #Delete the Argo Events resources

terraform destroy -target="module.eks_blueprints_kubernetes_addons" -target="module.irsa_argo_events" -auto-approve -var region=$region
terraform destroy -target="module.eks_blueprints" -auto-approve -var region=$region
terraform destroy -auto-approve -var region=$region

Conclusion

In this post, we explored the use of Argo Workflows and Argo Events for managing and scaling Spark applications on Amazon EKS. These tools offer a number of benefits, including the ability to define and automate complex pipelines with Argo Workflows and the ability to trigger actions in response to events with Argo Events. By utilizing these features, it’s possible to optimize the performance of Spark applications and ensure that they are running at optimal capacity. Additionally, we looked at the benefits of using Data on EKS blueprints to quickly deploy a full infrastructure using Terraform. If you would like to learn more, you can visit the Data on EKS GitHub repository to collaborate with others and access additional resources.

Victor Gu

Victor Gu

Victor Gu is a Containers & Serverless Architect at Amazon Web Services. He works with AWS customers to design microservices and cloud native solutions using Amazon EKS/ECS and AWS serverless services. His specialties are Kubernetes, Spark on Kubernetes, MLOps and DevOps.

Vara Bonthu

Vara Bonthu

Vara Bonthu is a dedicated technology professional and Worldwide Tech Leader for Data on EKS, specializing in assisting AWS customers ranging from strategic accounts to diverse organizations. He is passionate about open-source technologies, Data Analytics, AI/ML, and Kubernetes, and boasts an extensive background in development, DevOps, and architecture. Vara's primary focus is on building highly scalable Data and AI/ML solutions on Kubernetes platforms, helping customers harness the full potential of cutting-edge technology for their data-driven pursuits.