How do I resolve "Container killed on request. Exit code is 137" errors in Spark on Amazon EMR?

Last updated: 2020-01-08

My Apache Spark job on Amazon EMR fails with a "Container killed on request" stage failure:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 23, ip-xxx-xxx-xx-xxx.compute.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container marked as failed: container_1516900607498_6585_01_000008 on host: ip-xxx-xxx-xx-xxx.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137

Short Description

When a container (Spark executor) runs out of memory, YARN automatically kills it. This causes a "Container killed on request. Exit code is 137" error. These errors can happen in different job stages, both in narrow and wide transformations.

Resolution

Use one or more of the following methods to resolve "Exit status: 137" stage failures:

Increase driver or executor memory

Increase container memory by tuning the spark.executor.memory or spark.driver.memory parameters (depending on which container caused the error).

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf
spark.executor.memory 10g
spark.driver.memory 10g

For a single job:

Use the --executor-memory or --driver-memory option to increase memory when you run spark-submit. Example:

spark-submit --executor-memory 10g --driver-memory 10g ...

Add more Spark partitions

If you can't increase container memory (for example, if you're using maximizeResourceAllocation on the node), increase the number of Spark partitions. This reduces the amount of data that's processed by a single Spark task, which reduces the overall memory used by a single executor. Use the following Scala code to add more Spark partitions:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Increase the number of shuffle partitions

If the error happens during a wide transformation (for example join or groupBy), add more shuffle partitions. The default value is 200.

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf
spark.sql.shuffle.partitions 500

For a single job:

Use the --conf spark.sql.shuffle.partitions option to add more shuffle partitions when you run spark-submit. Example:

spark-submit --conf spark.sql.shuffle.partitions=500 ...

Reduce the number of executor cores

This reduces the maximum number of tasks that the executor processes simultaneously, which reduces the amount of memory that the container uses.

On a running cluster:

Modify spark-defaults.conf on the master node. Example:

sudo vim /etc/spark/conf/spark-defaults.conf
spark.executor.cores  1

For a single job:

Use the --executor-cores option to reduce the number of executor cores when you run spark-submit. Example:

spark-submit --executor-cores 1 ...