How do I resolve the error "Container killed by YARN for exceeding memory limits" in Spark on Amazon EMR?

Last updated: 2019-04-09

How do I resolve the error "Container killed by YARN for exceeding memory limits" in Spark on Amazon EMR?

Short Description

Use one of the following methods to resolve this error:

  • Increase memory overhead
  • Reduce the number of executor cores
  • Increase the number of partitions
  • Increase driver and executor memory

Resolution

The root cause and the appropriate solution for this error depends on your workload. You might have to try each of the following methods, in the following order, until the error is resolved. Before you continue to another method, reverse any changes that you made to spark-defaults.conf in the preceding section.

Increase memory overhead

Memory overhead is the amount of off-heap memory allocated to each executor. By default, memory overhead is set to either 10% of executor memory or 384, whichever is higher. Memory overhead is used for Java NIO direct buffers, thread stacks, shared native libraries, or memory mapped files.

Consider making gradual increases in memory overhead, up to 25%. Be sure that the sum of the driver or executor memory plus the driver or executor memory overhead is always less than the value of yarn.nodemanager.resource.memory-mb for your Amazon Elastic Compute Cloud (Amazon EC2) instance type:

spark.driver/executor.memory + spark.driver/executor.memoryOverhead < yarn.nodemanager.resource.memory-mb

If the error occurs in the driver container or executor container, consider increasing memory overhead for that container only. You can increase memory overhead while the cluster is running, when you launch a new cluster, or when you submit a job.

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf

spark.driver.memoryOverhead 512
spark.executor.memoryOverhead 512

On a new cluster:

Add a configuration object similar to the following when you launch a cluster:

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.driver.memoryOverhead": "512",
      "spark.executor.memoryOverhead": "512"
    }
  }
]

For a single job:

Use the --conf option to increase memory overhead when you run spark-submit. Example:

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --conf spark.driver.memoryOverhead=512 --conf spark.executor.memoryOverhead=512 /usr/lib/spark/examples/jars/spark-examples.jar 100

If increasing memory overhead does not solve the problem, reduce the number of executor cores.

Reduce the number of executor cores

This reduces the maximum number of tasks that the executor can perform, which reduces the amount of memory required. Depending on the driver container that's throwing this error or the other executor container that's getting this error, consider decreasing cores for either the driver or the executor.

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf
spark.driver.cores  3
spark.executor.cores  3

On a new cluster:

Add a configuration object similar to the following when you launch a cluster:

[
  {
    "Classification": "spark-defaults",
    "Properties": {"spark.driver.cores" : "3",
      "spark.executor.cores": "3"
    }
  }
]

For a single job:

Use the --executor-cores option to reduce the number of executor cores when you run spark-submit. Example:

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --executor-cores 3 --driver-cores 3 /usr/lib/spark/examples/jars/spark-examples.jar 100

If you still get the error message, increase the number of partitions.

Increase the number of partitions

To increase the number of partitions, increase the value of spark.default.parallelism for raw Resilient Distributed Datasets or execute a .repartition() operation. Increasing the number of partitions reduces the amount of memory required per partition. Because Spark heavily use cluster RAM as an effective way to maximize speed, it's important to monitor memory usage with Ganglia and then verify that your cluster settings and partitioning strategy meet your growing data needs. If you still get the "Container killed by YARN for exceeding memory limits" error message, then increase driver and executor memory.

Increase driver and executor memory

If the error occurs in either a driver container or an executor container, consider increasing memory for either the driver or the executor, but not both. Be sure that the sum of driver or executor memory plus driver or executor memory overhead is always less than the value of yarn.nodemanager.resource.memory-mb for your EC2 instance type:

spark.driver/executor.memory + spark.driver/executor.memoryOverhead < yarn.nodemanager.resource.memory-mb

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf

spark.executor.memory  1g
spark.driver.memory  1g

On a new cluster:

Add a configuration object similar to the following when you launch a cluster:

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.executor.memory": "1g",
      "spark.driver.memory":"1g",
    }
  }
]

For a single job:

Use the --executor-memory and --driver-memory options to increase memory when you run spark-submit. Example:

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --executor-memory 1g --driver-memory 1g /usr/lib/spark/examples/jars/spark-examples.jar 100

If you still get the error message, try the following:

  • Benchmarking: It's a best practice to run your application against a sample dataset. This can help you spot slowdowns and skewed partitions, which can lead to memory problems.
  • Data filtration: Be sure that you are processing the minimum amount of data. If you don't filter your data, or if you filter late in the application run, excess data might slow down the application and increase the chance of a memory exception.
  • Dataset size: It's a best practice to process the minimum required data. Partition your data so that only the required data is ingested.
  • Partitioning strategy: Consider using a different partitioning strategy. For example, partition on an alternate key to avoid large partitions and skewed partitions.
  • EC2 instance type: It's possible that your EC2 instance doesn't have the memory resources required for your workload. Switching to a larger memory optimized instance type might resolve the error. If you still get memory exceptions after changing instance types, try the methods mentioned earlier in this article on the new instance.