Containers
Optimizing Spark performance on Kubernetes
Apache Spark is an open source project that has achieved wide popularity in the analytical space. It is used by well-known big data and machine learning workloads such as streaming, processing wide array of datasets, and ETL, to name a few.
Kubernetes is a popular open source container management system that provides basic mechanisms for deployment, maintenance and scaling of applications. Amazon EKS is a managed Kubernetes service that offers a highly available control plane to run production-grade workloads on AWS. Customers can run variety of workloads such as microservices, batch, machine learning on EKS.
This blog is for engineers and data scientists who prefer to run Spark workloads on Kubernetes as a platform.
Why you should consider running Spark on Kubernetes
Let’s understand why customers should consider running Spark on Kubernetes.
Kubernetes is a native option for Spark resource manager
Starting from Spark 2.3, you can use Kubernetes to run and manage Spark resources. Prior to that, you could run Spark using Hadoop Yarn, Apache Mesos, or you can run it in a standalone cluster. By running Spark on Kubernetes, it takes less time to experiment. In addition, you can use variety of optimization techniques with minimum complexity.
Advantages of running in containers and Kubernetes ecosystem
By packaging Spark application as a container, you reap the benefits of containers because you package your dependencies along with your application as a single entity. Concerns around library version mismatch with respect to Hadoop version compatibility become easier to maintain using containers. Another advantage is you can version control and apply tags to your container images. This way, if you need to experiment using different versions of Spark or its dependencies, you can easily choose to do so.
By leveraging Kubernetes in your stack, you can tap into the advantages of the Kubernetes ecosystem. You can leverage Kubernetes add-ons for things like monitoring and logging. Most Spark developers chose to deploy Spark workloads into an existing Kubernetes infrastructure that is used by wider organization, so there is less maintenance and uplift to get started. Cluster operators can give you access to the cluster by applying resource limits using Kubernetes namespace and resource quotas. This way, you get your own piece of infrastructure and avoid stepping over other teams’ resources. You can also use Kubernetes node selectors to secure infrastructure dedicated to Spark workloads. In addition, since driver pods create executor pods, you can use Kubernetes service account to control permissions using Role or ClusterRole to define fine-grained access control and run your workload securely with other workloads.
Whether you run a Spark workload that is transient or you have a business requirement to run a real-time Spark workload, leveraging multi-tenant Kubernetes infrastructure helps avoid cluster spin-up time. In addition, it’s better to run Spark along with other data-centric applications that manage lifecycle of your data rather than running siloed clusters. This way, you can build an end-to-end lifecycle solution using single orchestrator and easily reproduce the stack in other Regions or even run in on-premises environment.
Having said that, Kubernetes scheduler support for Spark is still experimental. According to the documentation, end users need to be aware that there may be behavioral changes around configuration, container images, and entrypoints.
TPC-DS benchmark
TPC-DS is the de-facto standard benchmark for measuring the performance of decision support solutions. It defines decision support systems as those that examine large volumes of data, give answers to real-world business questions, execute SQL queries of various operational requirements and complexities (e.g., ad hoc, reporting, iterative OLAP, data mining), and are characterized by high CPU and I/O load.
We ran the TPC-DS benchmark on Amazon EKS and compared it against Apache Yarn. This benchmark includes 104 queries that uses large part of the SQL 2003 standards. 99 of these queries are from TPC-DS benchmark, four of which has two variants (14, 23, 24, 39) and an “s_max” query that performs full scan and aggregation of biggest table (store_sales).
You can view benchmark results in this link.
To summarize, we ran the TPC-DS benchmark with 1 TB dataset and we see comparable performance between Kubernetes (takes ~5% less time to finish) and Yarn in this setup
I’m convinced! How do I run Spark on Kubernetes
There are two ways to run Spark on Kubernetes: by using Spark-submit and Spark operator. By using spark-submit CLI, you can submit Spark jobs with various configuration options supported by Kubernetes.
Spark-submit
./bin/spark-submit \
--master k8s://https://<KUBERNETES_CLUSTER_ENDPOINT> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.container.image=aws/spark:2.4.5-SNAPSHOT \
--conf spark.kubernetes.driver.pod.name=sparkpi-test-driver \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5-SNAPSHOT.jar
Spark-submit is the easiest way to run Spark on Kubernetes. If you review the code snippet, you’ll notice two minor changes. One is to change the Kubernetes cluster endpoint, which you can get from your EKS console (or via AWS CLI). Second, is the container image that hosts your Spark application. There are two ways of submitting jobs: client or cluster mode. There is a subtle difference between the two. If you use client mode, you can tell the driver to run on dedicated infrastructure (separate than executors) whereas if you choose cluster mode, both drivers and executors run in the same cluster. You can use Spark configurations as well as Kubernetes specific options within your command.
Spark operator
The more preferred method of running Spark on Kubernetes is by using Spark operator. The main reason is that Spark operator provides a native Kubernetes experience for Spark workloads. In addition, you can use kubectl and sparkctl to submit Spark jobs.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
spec:
mode: cluster
image: “aws/spark:2.4.5"
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5-SNAPSHOT.jar"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1024m”
memory: "512m"
serviceAccount: spark
executor:
cores: 1
instances: 2
memory: "512m"
Optimization tips
Below is a list of optimization tips to consider that can improve performance for Spark workloads. For specific configuration to tune, you can check out eks-spark-benchmark repo.
Kubernetes cluster
Because Kubernetes is a general-purpose container orchestration platform, you may need to tweak certain parameters to achieve the performance you want from the system. Broadly speaking, they can be divided into three categories: infrastructure layer (EC2 instance, network, storage, file system, etc), platform layer (Kubernetes, add-ons), and application layer (Spark, S3A committers). Customers should evaluate these tips as set of options available to increase performance but also for the reliability of the system and compare it against the amount they want to spend for a particular workload.
NVMe instance store
We recommend using AWS Nitro EC2 instances for running Spark workloads because they are fueled with AWS innovation such as faster I/O to block storage, enhanced security, lightweight hypervisor etc. Block-level storage is offered in two ways to an EC2 Nitro instance, EBS-only, and NVMe-based SSDs. For example, r5.24xlarge comes with EBS-only SSD volumes, which have significant EBS bandwidth (19,000 Mbps) and r5d.24xlarge comes with four 900 GiB NVMe SSD volumes. We used EBS backed SSD volumes in our TPC-DS benchmark tests but it’s important to evaluate against NVMe-based instance store because they are physically connected to the host server and you can drive a lot more I/O when used as scratch space. How to use local NVMe SSDs as Spark scratch space will be discussed in the Shuffle performance section. Keep in mind, you have to configure local NVMe SSDs in order to use them because by default they are not mounted on your instances. If you use eksctl to build your cluster, you can use the sample cluster config in order to bootstrap EKS worker nodes.
nodeGroups:
- name: spark-nodes
instanceType: r5d.xlarge
availabilityZones: ["us-west-2b"]
desiredCapacity: 1
minSize: 0
maxSize: 4
volumeSize: 50
ssh:
allow: true
publicKeyPath: '~/.ssh/id_rsa.pub'
preBootstrapCommands:
- IDX=1
- for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do mkfs.xfs ${DEV};mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
- mount -a
Single-AZ node groups
Typically, one of the best practices is to run a microservice across Multi-AZ for availability. By default, Kubernetes in AWS will try to launch your workload into nodes bound by multiple AZs. There are two potential problems with this pattern for Spark workloads. First, cross-AZ latency is typically in single digit milliseconds and when you compare with nodes within Single-AZ (with micro-second latency), this will impact your performance for shuffle service. Spark shuffle is an expensive operation involving disk I/O, data serialization and network I/O, and choosing nodes in Single-AZ will improve your performance. Second, cross-AZ communication carries data transfer costs. Data transferred “in” to and “out” from Amazon EC2 is charged at $0.01/GB in each direction. Because Spark shuffle is a high network I/O operation, customers should account for data transfer costs.
Customers should therefore validate if running Single-AZ wouldn’t compromise availability of their system. For Spark workloads that are transient in nature, Single-AZ is a no-brainer and you can choose to run Kubernetes node groups that are confined to Single-AZ. If you have multi-tenant Kubernetes cluster, you can use advanced scheduling like node affinity or review advanced features supported by Volcano scheduler described in later section.
Autoscaling considerations
Spark architecture includes driver and executor pods working together in a distributed context to provide job results. The driver pod performs several activities such as acquiring executors on worker nodes, sending application code (defined in JAR or Python) to executors, and sending tasks to executors. When the application completes, executor pods terminate but the driver pod will persist and remain in “completed” state until its garbage collected or manually cleaned up. You can access logs through the driver pod to check for results. Therefore, it is important to keep driver pod from failing due to scale-in action in your cluster. You can prevent this from happening by adding an annotation about Cluster Autoscaler (CA) in your driver pod manifest such as below.
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
In addition, if you choose to autoscale your nodes based on Spark workload usage in a multi-tenant cluster, you can do so by using Kubernetes Cluster Autoscaler (CA). CA infers target cluster capacity based on failed pod request due to lack of resources to comply with the request. This will result in increased scaling latencies when executor pods are ready for scheduling. To mitigate this issue, you can overprovision your cluster by running pause pods with low priority (see priority preemption). In a scenario when resources are limited, these pause pods are preempted by the scheduler in order to place executor pods. This also results in CA scaling to add additional nodes to accommodate pause pods.
Memory management
It’s important to understand how Kubernetes handles memory management to better manage resources for your Spark workload. Kubernetes nodes typically run many OS system daemons in addition to Kubernetes daemons. System daemons use non-trivial amount of resources and their availability is critical for the stability of Kubernetes nodes. Kubelet exposes Node Allocatable
so that you can reserve system resources for critical daemons. For example, using kube-reserved
, you can reserve compute resources for Kubernetes system daemons like kubelet, container runtime etc. By using system-reserved
, you can reserve resources for OS system daemons like sshd, udev etc. If you’d like to learn more, you can check here on reserve compute resources for system daemons. You can use the Allocatable
setting to reserve compute resources for pods.
By default, Kubernetes does memory allocation using cgroups based on request/limit defined in your pod definition. This is known as spark.executor.memory
. In addition, Kubernetes takes into account spark.kubernetes.memoryOverheadFactor * spark.executor.memory
or minimum of 384MiB as additional cushion for non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. If you want to change the default settings, you can override this behavior by assigning spark.executor.memoryOverhead
value.
With regards to heap settings, Spark on Kubernetes assigns both -Xms
(minimum heap) and -Xmx
(maximum heap) to be the same as spark.executor.memory
. In this case, Xmx
is slightly lesser than pod memory limit as this helps to avoid executors getting killed due to out of memory (OOM) errors.
There are three main reasons for pod getting killed due to OOM errors:
- If your Spark application uses more heap memory, container OS kernel kills the java program, xmx < usage < pod.memory.limit.
- If memory usage > pod.memory.limit, your host OS cgroup kills the container. Kubelet will try to restart theOOMKilled container either on the same or another host.
- If worker nodes experience memory pressure, Kubelet will try to protect node and it will kill random pods until it frees up memory. This doesn’t necessarily mean only pods that consume more memory will be killed.
Hence, it’s important to keep this in mind to avoid OOM errors. If you are not familiar with these settings, you can review documentation from java docs and Spark on Kubernetes configuration. If you want to proactively monitor Spark memory consumption, we recommend monitoring memory metrics (container_memory_cache and container_memory_rss
) from cadvisor in Prometheus or similar monitoring solutions.
Using Spot Instances
A Spot Instance is an unused EC2 instance that is available at significant discounts (up to 90%) over On-Demand price. Spark workloads that are transient as well as long-running are compelling use cases for Spot Instances. Because Spot Instances are interruptible, proper mitigation should be used for Spark workloads to ensure timely completion. It is important to run the driver pod on On-Demand Instances because if it gets interrupted, the entire job has to restart from the beginning. You can run two node groups: On-Demand and Spot and use node affinity to schedule driver pods on the On-Demand node group and executor pods on the Spot node group.
Amazon FSx for Lustre
Amazon FSx for Lustre provides a high-performance file system optimized for fast processing of workloads such as machine learning, high performance computing (HPC), video processing, financial modeling, and electronic design automation (EDA). These workloads commonly require data to be presented via a fast and scalable file system interface, and typically have datasets stored on long-term data stores like Amazon S3. You can follow Github instructions to install CSI drivers in your Kubernetes cluster. Amazon FSx for Lustre is deeply integrated with Amazon S3. You can specify import and export paths while you define storage class for data stored in S3 and have them accessible to Kubernetes pods. For Spark workload, drivers and executors can interact directly with S3 to minimize complexity with I/O operations. We recommend you to build lustre client in your container if you intend to export files to Amazon S3.
Working with Amazon S3
Amazon S3 is not a file system because data is stored as objects within resources called ‘buckets.’ This data can be accessed via Amazon S3 API’s or using Amazon S3 console. Hadoop on the other hand was written for distributed storage that is available as a file system where features such as file locking, renames, ACLs are important for its operation.
S3 limitations w.r.t Hadoop
Even though Hadoop’s S3A client can make an S3 bucket appear to be a Hadoop-compatible filesystem, it is still an object store and has some limitations when acting as a Hadoop-compatible filesystem. The key things to be aware of are:
- Operations on directories are potentially slow and non-atomic.
- Not all file operations are supported, like rename().
- Data is not visible in the object store until the entire output stream has been written.
- Amazon S3 offers eventually consistency for overwrite PUTS and DELETES and read-after-write consistency for PUTS of new objects. Objects are replicated across servers for availability, but changes to a replica take time to propagate to the other replicas; the object store is inconsistent during this process. The inconsistency issues surface when listing, reading, updating, or deleting files. To mitigate the inconsistency issues, you can configure S3Guard. To learn more, refer to S3Guard: consistency and metadata caching for S3A.
- Neither the per-file and per-directory permissions supported by HDFS nor its more sophisticated ACL mechanism are supported.
- Bandwidth between your workload clusters and Amazon S3 is limited and can vary significantly depending on network and VM load.
For these reasons, while Amazon S3 can be used as the source and store for persistent data, it cannot be used as a direct replacement for a cluster-wide filesystem such as HDFS, or be used as fs.defaultFS. To address these problems, there is now explicit support for committing work to S3 via S3A filesystem client in hadoop-aws module, called S3A committers.
S3A committers
There are two types of committers, staging and magic. Staging committers are developed by Netflix and come in two forms, directory and partitioned. Magic committers are developed by Hadoop community and requires S3Guard for consistency.
- Directory committer: Buffers working data to the local disk, uses HDFS to propagate commit information from tasks to job committer, and manages conflict across the entire destination directory tree.
- Partitioned committer: Identical to the directory committer except that conflict is managed on a partition-by-partition basis. This allows it to be used for in-place updates of existing datasets. It is only suitable for use with Spark.
- Magic committer: Data is written directly to S3, but “magically” retargeted at the final destination. Conflict is managed across the directory tree. It requires a consistent S3 object store, which means S3Guard is a mandatory pre-requisite.
For example, if you would like to configure directory S3A committers for Spark 2.4.5 with Hadoop 3.1, you can use this option.
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.hadoop.fs.s3a.committer.staging.conflict-mode": "append"
To learn how to configure S3A committers for specific Spark and Hadoop versions, you can read more here.
Shuffle performance
Most Spark operations are spent during shuffle phase, because it contains large number of disk I/O, serialization, network data transmission, and other operations. Both Spark driver and executors use directories inside the pods for storing temporary files. There are several optimization tips associated with how you define storage options for these pod directories. For specific shuffle configuration to tune, you can check out the eks-spark-benchmark repo.
Use EmptyDir as scratch space
Docker uses copy-on-write (CoW) whenever new data is written to a container’s writable layer. Ideally, little data is written to this layer due to performance impact. The best practice is to offload writes to Docker storage drivers. Kubernetes provides an abstraction for storage option that you can use to present to your container using emptyDir volume. EmptyDir can be backed by volumes attached to your host, network files system or memory on your host.
Use tmpfs backed emptyDir volumes
Kubernetes allocates memory to pods as scratch space if you define tmpfs in emptyDir specification. Using built-in memory can significantly boost Spark’s shuffle phase and result in overall job performance. This also means that your Spark job will be limited to memory allocated to the host but it works great for diskless hosts or hosts with low storage footprint. In the case of spark-operator, you can configure Spark to use tmpfs using below configuration option.
# spark-operator example
spec:
sparkConf:
"spark.kubernetes.local.dirs.tmpfs": "true"
...
Use hostPath for emptyDir volumes
Likewise, as we mentioned before you can configure emptyDir to be volumes that are mounted on host. For EC2 instances that are backed by NVMe SSD instance store volumes, using such configuration can provide significant boost over volumes that are backed by EBS. This use case works perfectly for this scenario because we are using instance store as a scratch space for Spark jobs. This also helps for jobs that are large and require heavy I/O such as TPCDS. Keep in mind, data stored in instance store volumes (or tmpfs) is only available as long as the node is not rebooted/terminated. Here is an example of Spark-operator using instance store volumes.
# spark-operator example
spec:
...
mainApplicationFile: local:///opt/spark/examples/jars/eks-spark-examples-assembly-1.0.jar
volumes:
- name: "spark-local-dir-1"
hostPath:
path: "/tmp/spark-local-dir"
executor:
instances: 10
cores: 2
....
volumeMounts:
- name: "spark-local-dir-1"
mountPath: "/tmp/spark-local-dir"
Use multiple disks for emptyDir volumes
If your instance has multiple disks, you can specify those in your configuration to boost I/O performance. Configuring multiple disks is similar in nature with small variation.
Job scheduling using Volcano scheduler
Kubernetes scheduler by default does pod-by-pod scheduling. It’s not well designed for batch workloads. Volcano scheduler can help fill the gap with features mentioned below.
Gang scheduling
If there are not enough resources in the cluster, Spark jobs might experience deadlock where they are constantly waiting for Kubernetes to scale and add additional nodes to the cluster. Gang scheduling is a way to schedule a number of pods all at once. It ensures that Kubernetes never launches partial applications. This will resolve resource deadlock issues from different jobs. For example, if a job requiring X pods is requested and there are only enough resources to schedule X-2 pods, then that job will wait until resources to accommodate all X pods is available. At the same time, if there’s a job with Y pods and if the cluster has resources to schedule Y pods, then that job can be scheduled. Gang scheduling is available in Volcano scheduler. You can install Volcano scheduler by following the instructions in GitHub repo.
Task topology and advanced binpacking
You can further enhance job scheduling using task topology and advanced binpacking strategy. By using this strategy, you will be able to reduce network overhead between instance to instance communication and resource fragmentation. Keep in mind, this might work for some use case but not all because having more executors binpack’ed on a single EC2 will lead to network performance bottleneck.
Conclusion
Running Spark workload requires high I/O between compute, network, and storage resources and customers are always curious to know the best way to run this workload in the cloud with max performance and lower costs. Kubernetes offers multiple choices to tune and this blog explains several optimization techniques to choose from. We hope readers benefit from this blog and apply best practices to improve Spark performance. We are also keen on what you want to see us work on. Please leave us your feedback by creating issues on eks-spark-benchmark GitHub repo.