Containers
Best practices for running Spark on Amazon EKS
Amazon EKS is becoming a popular choice among AWS customers for scheduling Spark applications on Kubernetes. It’s fully managed but still offers full Kubernetes capabilities for consolidating different workloads and getting a flexible scheduling API to optimize resources consumption.
But Kubernetes is complex, and not all data engineers are familiar with how to set up and maintain Kubernetes. Following Spark best practices requires advanced configuration of both Kubernetes and Spark applications.
In a previous blog post, we reviewed how to deploy a Spark job on Amazon EKS using a Kubernetes Job. Here is another blog post; in it, you can find performance optimizations and considerations. In this blog post, we will go through the different best practices related to Amazon EKS scheduling and provide an end-to-end Spark application example that implements them. We will cover different ways to configure Kubernetes parameters in Spark workloads to achieve resource isolation with dedicated nodes, flexible single Availability Zone deployments, auto scaling, high speed and scalable volumes for temporary data, Amazon EC2 Spot usage for cost optimization, fine-grained permissions with AWS Identity and Access Management (IAM), and AWS Fargate integration.
We will use Spark 3.1.2 for this blog post because it comes with useful features for Kubernetes deployment like:
- Deeper capabilities to customize Kubernetes scheduling parameters
- A new dynamic allocation algorithm optimized for Kubernetes
- A new S3A committer to efficiently write data to S3
- Decommissioning mechanism adapted to Amazon EC2 Spot nodes
Pre-requisite: build a Spark 3 image optimized for Amazon S3 and Amazon EKS
When Spark workloads are writing data to Amazon S3 using S3A connector, it’s recommended to use Hadoop > 3.2 because it comes with new committers. Committers are bundled in S3A connector and are algorithms responsible for committing writes to Amazon S3, ensuring no duplicate and no partial outputs.
One of the new committers, the magic committer, is a good choice for Spark on Amazon EKS because it’s simple, scalable, and compatible with the Amazon S3 strong consistency model. With the magic committer, each Spark task is using a multipart upload to write its output, and the Spark driver is committing all the individual task writes by finalizing all multipart uploads. A bucket policy is recommended to ensure incomplete multipart uploads are deleted from S3 after some delay.
By default, Spark 3.1.2 distribution available on the official website is using Hadoop 3.2 but doesn’t contain the required libraries to use the S3A magic committer. Therefore, you need to build a custom Spark distribution with -Phadoop-cloud and -Phadoop-3.2 profiles.
Additionally, if the Amazon EKS cluster is running multiple workloads requiring different permissions, Spark needs to be customized to support fine-grained permissions via the IAM roles for service accounts feature. This Amazon EKS feature maps Kubernetes service accounts with Amazon IAM roles, providing fine-grained permissions at the Pod level, which is mandatory to share nodes across multiple workloads with different permissions requirements. If fine-grained access control is required, Spark 3.1.2 needs to be built with Hadoop 3.3.1 to meet the minimum version requirement of the AWS Java SDK for this feature.
To get an optimized Spark base image, you can:
- Download the Apache Spark source code (select Source Code in package type) and extract it.
- If using Spark < 3.2, edit the xml file in the root folder and bump the Hadoop version from 3.2.0 to 3.3.1.
<hadoop.version>3.3.1</hadoop.version>
- Build a Spark distribution with the Hadoop Cloud maven profile:
./dev/make-distribution.sh --name spark-cloud --pip --tgz -Phadoop-cloud -Phadoop-3.2 -DskipTests -Phive -Phive-thriftserver -Pkubernetes
- From the dist folder created, build a docker Spark base image. Before building, edit the Docker base image in the Dockerfile to match Java version used for building the distribution (by default Java 8 but Java 11 can be used).
cd dist
docker build -f kubernetes/dockerfiles/spark/Dockerfile -t <REPO>/spark:v3.1.2 .
The Spark base image is now ready to be used in the application, and all dependencies are available to use S3A magic committer and IAM role for service accounts.
Configure Spark workloads for Amazon EKS
Native Kubernetes integration in Spark provides two different options to set Kubernetes parameters:
- The Spark-submit command allows defining some but not all Kubernetes parameters. For example, Kubernetes labels can be set for the Spark driver specifically, but Kubernetes Node Selector can only be set for the entire Spark application. It will be the same for the driver and executors.
"/opt/spark/bin/spark-submit \
...
--conf spark.kubernetes.driver.label.spark-app=spark-eks \
--conf spark.kubernetes.node.selector.noderole=spark \
...
- The Spark drivers and executors Pods can be fully customized using a Pod template. Pod templates are YAML files overriding Kubernetes Pod specifications for Spark driver and/or executors. Pod templates can be defined using a Kubernetes ConfigMap, mounted as a volume in the spark-submit Pod and passed to the spark-submit as parameters
kind: ConfigMap
apiVersion: v1
metadata:
name: spark-eks-pod-template
data:
driver: |-
apiVersion: v1
kind: Pod
spec:
nodeSelector:
disk: none
executor: |-
apiVersion: v1
kind: Pod
spec:
nodeSelector:
disk: nvme
# In the spark-submit Container spec
volumeMounts:
- name: spark-pod-template
mountPath: /opt/spark/conf/driver_pod_template.yml
subPath: driver
- name: spark-pod-template
mountPath: /opt/spark/conf/executor_pod_template.yml
subPath: executor
volumes:
- name: spark-pod-template
configMap:
name: spark-eks-pod-template
defaultMode: 420
# In the spark-submit command
"/opt/spark/bin/spark-submit \
...
--conf spark.kubernetes.driver.podTemplateFile='/opt/spark/conf/driver_pod_template.yml' \
--conf spark.kubernetes.executor.podTemplateFile='/opt/spark/conf/executor_pod_template.yml' \
...
For this example, we will use a combination of the two approaches, using spark-submit parameters when possible and using Pod Template when the feature is not integrated in Spark configuration:
Features | Spark-submit conf | Pod Template |
Different labels for Spark driver and executors | Yes |
Yes |
Identical NodeSelector for Spark driver and executor | Yes |
Yes |
Dedicated node selector for Spark executors | No |
Yes |
Taints and Tolerations | No |
Yes |
PodAntiAffinity | No |
Yes |
NodeAffinity with AZ and node TopologyKey | No |
Yes |
Kubernetes volumes | Yes |
Yes |
InitContainer for preparing volume and IAM role for service accounts token | No |
Yes |
Deploy Spark cluster in the same Availability Zone
When using an Amazon EKS cluster deployed across multiple Availability Zones, it’s important to schedule all the components of a Spark job in the same one to avoid performance degradation and additional costs due to cross-zone network flows.
The first component to be deployed is the Spark driver pod, and then the driver requests additional Pods for Spark executors. The driver labels can be used in Pod affinity rules to enforce all the executors to be scheduled in the same Availability Zone (via the TopologyKey) without configuring a static Availability Zone. A dynamic Availability Zone is more flexible, for example, to handle failover:
- The driver can be configured with a unique name via the spark-submit command in the Job command:
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.driver.label.spark/app=<UNIQUE_APPLICATION_NAME> \
…
- The executors can be configured with a Pod affinity rule in the ConfigMap used for the executor Pod template (the Pod template is mounted in a Kubernetes volume as described previously). The affinity rule enforces executors’ co-location with the driver:
executor: |-
apiVersion: v1
kind: Pod
spec:
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: spark/app
operator: In
values:
- <UNIQUE_APPLICATION_NAME>
topologyKey: topology.kubernetes.io/zone
Dedicate Amazon EKS node groups to Spark workloads
Resources isolation is critical with Spark workloads. Spark is consuming lots of disks and network I/O during the shuffle phase (reduce step) when executors re-distribute the data across the cluster to process the next stage. But Kubernetes doesn’t provide resource isolation for disks and network I/O. Depending on the SLA requirements, it can be valuable to:
- Isolate Spark workloads on dedicated nodes to not interfere with other kinds of workloads
- Isolate Spark applications on dedicated nodes to not interfere with other Spark workloads with different SLA
Amazon EKS provides lots of flexibility to implement these requirements. For example:
- Amazon EKS node groups can be labeled with a Spark specific node role
nodeGroups:
- name: spark-nodes
labels:
noderole: spark
- The Spark application can be configured with Node Selector in the spark-submit command
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.node.selector.noderole=spark \
…
Amazon EKS would schedule both Spark driver and executors on targeted nodes, but other workloads might be scheduled on these nodes if they don’t select other specific nodes using Selectors. Taints and Tolerations can provide stronger constraints by enforcing Pods to tolerate Spark nodes.
Isolate Spark applications
To isolate Spark workloads, Pods anti-affinity rules can be used to reject nodes where a different Spark application is running:
- The executor can be configured with a unique label and with the type of component via the spark-submit command in the Job command:
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.executor.label.spark/app=<UNIQUE_APPLICATION_NAME> \
--conf spark.kubernetes.executor.label.spark/component=executor \
…
- The executors can be configured with a Pod AntiAffinity rule in the ConfigMap used for the executor Pod Template with the hostname TopologyKey
executor: |-
apiVersion: v1
kind: Pod
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: spark/app
operator: NotIn
values:
- <UNIQUE_APPLICATION_NAME>
- key: spark/component
operator: In
values:
- executor
topologyKey: kubernetes.io/hostname
The executor Pods of this Spark application won’t be scheduled on a node if there is already a Spark executor (spark/component label) with a different application name (spark/app label).
Use Kubernetes volumes for temporary data
Spark is a distributed compute engine, and it requires exchanging data between nodes when performing joins, aggregations, and sorts across multiple executors. It uses a shuffle process that writes data on local temp disks before sending it over the network when other executors require that data.
This mechanism is critical for Spark. Having bad performance disks would degrade the overall performance of the job, and disks being full would make the job fail. By default, Amazon EKS creates and mounts a temporary file system in the Spark Pods, but this file system is located on the root volume of the node on Amazon EBS with a default size of 20GB.
It’s recommended to use faster and bigger disks for Spark applications that require optimized performance. This can be achieved using Amazon EC2 nodes with NVME instance stores and mounting the disks as HostPath Volumes in the Spark Pods:
- You can configure Amazon EKS node groups dedicated to Spark to use instance stores like the R5d, R5ad, and R5dn, and to format and mount these NVME disks during the bootstrap
nodeGroups:
- name: spark-nvme
instancesDistribution:
instanceTypes: ["r5d.xlarge", "r5ad.xlarge", "r5dn.xlarge"]
preBootstrapCommands:
- "IDX=1 && for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV};mkdir -p /pv-disks/local${IDX};echo ${DEV} /pv-disks/local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done"
- "mount -a"
- The Spark executors are configured to mount the local disks as HostPath Volumes and to use this mount point for Spark local directory
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.mount.path='/tmp/spark' \
--conf spark.kubernetes.executor.volumes.hostPath.spark-local-dir-1.options.path=/pv-disks/local \
--conf spark.local.dir='/tmp/spark' \
…
- For security reasons, it’s not recommended to run Spark as root. An init container is used to set the permissions of the mounted volumes in driver and executors Pods for the Spark user. This configuration is provided in the ConfigMap used for the executors Pod Template (the Pod template is mounted in a Kubernetes Volume as described previously)
executor: |-
apiVersion: v1
kind: Pod
spec:
initContainers:
- name: volume-permissions
image: public.ecr.aws/y4g4v0z7/busybox
command: ['sh', '-c', 'chown -R 185 /tmp/spark']
volumeMounts:
- mountPath: /tmp/spark
name: spark-local-dir-1
Autoscale Spark applications on Amazon EKS
Auto scaling Spark applications is a common requirement to adapt the resources consumption to unpredictable workloads. With auto scaling, AWS customers don’t pay for resources when they don’t need them because the Spark application consumes transient resources dynamically sized for the workload. They don’t size for peaks and still enforce processing SLA whatever the amount of data to process is.
Auto scaling Spark applications involve scale-out and scale-in mechanisms in two different layers:
- The Spark driver can request additional Amazon EKS Pod resources to add Spark executors based on the number of tasks to process in each stage of the Spark job
- The Amazon EKS cluster can request additional Amazon EC2 nodes to add resources in the Kubernetes pool and answer Pod requests from the Spark driver
This solution can be achieved using the Kubernetes Cluster Autoscaler and Spark dynamic resource allocation:
- Spark application is configured with dynamic resource allocation settings in the spark-submit command. Spark 3.x provides fine control over auto scaling on Kubernetes: it allows – a precise minimum and maximum number of executors, tracks executors with shuffle data so it doesn’t scale-in and delete shuffle data to avoid re-computation or job failure, adds a safeguard timeout to avoid keeping these executors indefinitely, and finally allows to precise the speed of scale
"/opt/spark/bin/spark-submit \
…
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
--conf spark.dynamicAllocation.shuffleTracking.timeout=120 \
--conf spark.dynamicAllocation.minExecutors=10 \
--conf spark.dynamicAllocation.maxExecutors=40 \
--conf spark.kubernetes.allocation.batch.size=10 \
--conf spark.dynamicAllocation.executorAllocationRatio=1 \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1 \
…
- The Kubernetes Cluster Autoscaler is deployed in the Amazon EKS cluster and configured with auto-discovery tags so it can manage Spark specific Amazon EKS node groups
command:
- ./cluster-autoscaler
…
- --node-group-auto-discovery=asg:tag=k8s.io/cluster-autoscaler/enabled,k8s.io/cluster-autoscaler/spark
…
- The Amazon EKS node groups are tagged with the corresponding tags so they are discovered and considered during the auto scaling process. For example, with eksctl cluster configuration file:
nodeGroups:
- name: spark-nvme-1a
…
tags:
k8s.io/cluster-autoscaler/spark: owned
k8s.io/cluster-autoscaler/enabled: "true"
…
Use Amazon EC2 Spot with Spark workloads
Amazon EC2 Spot is an efficient solution to reduce the costs of Spark workloads by leveraging unused compute resources with a huge discount. It comes with the risk of instances being terminated in two minutes when on-demand instances requests increase, but Spark is resilient to executor loss. It has a built-in mechanism in the Spark driver using job metadata to reprocess lost data.
For critical Spark workloads, it’s recommended to schedule the Spark driver on On-Demand Instances to secure the job metadata, but Spark executors can be scheduled on Spot Instances as described in Running cost optimized Spark workloads on Kubernetes using EC2 Spot Instances. Because not all workloads are resilient to node loss, it’s also recommended to enforce the Spot consideration with Kubernetes Taints and Tolerations:
- Amazon EKS node groups using Amazon EC2 Spot are automatically labeled by Amazon EKS with node-lifecycle: on-demand or node-lifecycle: ec2Spot
- The Spot Nodegroups are also tainted via the eksctl configuration with a NoSchedule taint effect to ensure no Pods are scheduled if they don’t explicitly tolerate Spot:
nodeGroups:
- name: spark-spot-1c
…
taints:
spot: "true:NoSchedule"
…
- The ConfigMap used for both driver and executors Pod Template contains two Pod specs specifying different Node Selector for Spark driver and executors, and a Spot Toleration for the executors (the Pod template is mounted in a Kubernetes Volume as described previously)
kind: ConfigMap
apiVersion: v1
metadata:
name: spark-eks-pod-template
data:
driver: |-
apiVersion: v1
kind: Pod
spec:
nodeSelector:
node-lifecycle: on-demand
executor: |-
apiVersion: v1
kind: Pod
spec:
nodeSelector:
node-lifecycle: spot
tolerations:
spot: true:NoSchedule
Enable fine-grained permissions with IAM role for service accounts
Using a multi-tenant Amazon EKS cluster to schedule multiple Spark workloads allows optimization of resource consumption and reduces costs, but it comes with the challenge of setting fine-grained permissions at the Spark job level to ensure security isolation between workloads with different requirements. Spark jobs can leverage the IAM role for service accounts feature to get temporary credentials and access AWS resources with the right permissions:
- An OIDC provider is set up for the Amazon EKS cluster. eksctl can do it automatically with this cluster configuration
iam:
withOIDC: true
- An Amazon IAM policy is created with the permissions required to run the job
- A Kubernetes service account and an Amazon IAM role are created via eksctl and point to the namespace that is used to submit Spark jobs
eksctl create iamserviceaccount \
--name <service_account_name> \
--namespace <service_account_namespace> \
--cluster <cluster_name> \
--attach-policy-arn <IAM_policy_ARN> \
--approve \
--override-existing-serviceaccounts
- The Spark job is configured to use the Kubernetes Namespace and the service account associated with the IAM role for service accounts configuration provided above, and the S3A connector is configured to use the WebIdentityTokenCredentialsProvider
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.namespace=spark \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
…
Use AWS Fargate for Spark executors
Amazon EKS integrates Kubernetes with AWS Fargate for running serverless Pods and removing the configuration, provisioning, and maintenance of compute resources. AWS Fargate can be used to dynamically provide compute resources for Spark drivers and/or executors.
AWS Fargate comes with some limitations and shouldn’t be used for all Spark workloads:
- The default disk space in AWS Fargate Pod is 20GB. It offers limited space for Spark temporary data. Storage can be extended using Amazon EFS volume, but the performance baseline is limited to approximately 0.05 MiB/s/GiB. If storage space is required, it’s preferable to use small executors and horizontal scalability so the disk/compute ratio is higher
- The Pod bootstrap time is longer than managed or self-managed Amazon EKS node groups and so it can add extra latency to highly elastic Spark workloads when auto scaling is required
An example is to provide on-demand Spark resources to data engineers or data scientists via Jupyter notebooks. Amazon EKS needs to be configured with an AWS Fargate Profile:
- Amazon EKS cluster is configured with an AWS Fargate profile attached to a specific Kubernetes namespace and optionally to specific labels for fine-grained selection, for example, scheduling only Spark executors
fargateProfiles:
- name: spark-fargate
selectors:
# Workloads in the "spark-fargate" Kubernetes namespace matching
# the following label selectors will be scheduled onto Fargate:
- namespace: spark-fargate
labels:
spark/component: executor
- Spark executors are labeled with a different label from the driver to allow AWS Fargate to schedule only the Spark executors
"/opt/spark/bin/spark-submit \
…
--conf spark.kubernetes.driver.label.spark/component=driver \
--conf spark.kubernetes.driver.label.spark/component=executor \
…
End-to-end example walkthrough
Now that we have covered the best practices for running Spark on Amazon EKS, we will go through an end-to-end example that demonstrates these best practices. In this example, we will process the New York taxi public data set and analyze the most profitable pickup locations for drivers so they can search for customers around those locations. We will launch a Spark job that will read the CSV files for the Amazon S3 public bucket, process the data into Spark, and write two versions of the data: the raw records cleaned and parsed into Parquet format and the aggregated records analyzing profitability per geolocation, also in Parquet format.
To run this demo, we have previously created an Amazon S3 Bucket where we will write the results.
We will use the code available in this repository.
- First, we need to build the Spark application image and upload it to a docker repository. Before building it, we can edit the Dockerfile to use a custom Spark base image built as described here
cd spark-application
vim Dockerfile
docker build -t <YOUR_REPO>/spark-eks:v3.1.2 .
docker push <YOUR_REPO>/spark-eks:v3.1.2
- Then we create the Amazon EKS cluster using eksctl. To install eksctl, we have followed these instructions. We can change the region in the eksctl.yaml file to install the cluster in a different one
eksctl create cluster -f ../kubernetes/eksctl.yaml
- After the Amazon EKS cluster has been created, we deploy the Kubernetes Cluster Autoscaler. This autoscaler is configured to automatically discover the Amazon EKS node groups we have previously created with eksctl
kubectl apply -f ../kubernetes/cluster_autoscaler.yaml
- Before we can launch the Job, we need to configure the IAM role for service accounts:
- We create a policy in the IAM console providing read access to the source bucket and read/write access to the target bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SourcePermissions",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::nyc-tlc/*",
"arn:aws:s3:::nyc-tlc"
]
},
{
"Sid": "TargetPermissions",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:ListMultipartUploadParts",
"s3:listBucketMultipartUploads",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<RESULTS_BUCKET>/*",
"arn:aws:s3:::<RESULTS_BUCKET>"
]
}
]
}
-
- We create the IAM roles and the Kubernetes service accounts via eksctl (one for self-managed Amazon EKS node groups and one for AWS Fargate) using the ARN from the previously created Amazon IAM policy
eksctl create iamserviceaccount \
--name spark \
--namespace spark \
--cluster spark-eks-best-practices \
--attach-policy-arn <POLICY_ARN> \
--approve --override-existing-serviceaccounts
eksctl create iamserviceaccount \
--name spark-fargate \
--namespace spark-fargate \
--cluster spark-eks-best-practices \
--attach-policy-arn <POLICY_ARN> \
--approve --override-existing-serviceaccounts
- We can now launch the Spark job using Amazon EKS self-managed node groups. This Spark job is configured for:
- Using dedicated nodes for Spark
- Using the same Availability Zone for all its components
- Colocating executors on the same nodes
- Using on-demand nodes for the Spark driver and Spot nodes for the Spark executors
- Using NVMe instance stores for Spark temporary storage in the executors
- Using IAM role for service account to get the least privileges required for processing
Before submitting the job, we modified the file for our environment:
-
- The image repository in the container image field
- The image repository in the kubernetes.container.image parameter
- The bucket name in the Spark job parameter s3a://<RESULTS_BUCKET>/nyctaxi
kubectl apply -f examples/spark-job-hostpath-volume.yaml
- We can monitor Kubernetes Nodes and Pods via the Kubernetes Dashboard
- We can also check the Spark job progress via the Spark UI. To do that, we can forward the Spark UI port to localhost and access it via our browser
- Get the Spark driver Pod name
kubectl get pod -n=spark
-
- Forward the 4040 port from the Spark driver Pod
kubectl port-forward -n=spark <SPARK_DRIVER_NAME> 4040:4040
-
- Access the Spark UI via this URL https://localhost:4040
Clean up
To avoid any additional costs, we clean up the resources used to run this example with the following commands:
eksctl delete cluster -n spark-eks-best-practices
aws s3 rm --recursive s3://<RESULTS_BUCKET>/nyctaxi
Summary
In this blog post, we have seen how to configure Apache Spark and Amazon EKS to support common requirements, including resources isolation, cost reduction, dynamic scaling, performance optimization, and fine-grained access control. We also have seen that configuring these best practices requires customization and maintenance effort. AWS also provides a managed product with Amazon EMR on EKS that supports all these features. Additionally, it removes the maintenance effort on the docker image and provides additional features including optimized Spark runtime for performance, automatic logging with Amazon CloudWatch, debugging with a serverless Spark History Server, Amazon S3 integration with EMRFS optimized connector, AWS Glue Data Catalog integration for synchronizing catalog tables, and Apache Airflow Operator for data pipeline.