How do I resolve ExecutorLostFailure "Slave lost" errors in Spark on Amazon EMR?

Last updated: 2020-01-08

I submitted an Apache Spark application to an Amazon EMR cluster. The application fails with a "slave lost" stage failure that looks like this:

Most recent failure: Lost task 1209.0 in stage 4.0 (TID 31219, ip-xxx-xxx-xx-xxx.compute.internal, executor 115): ExecutorLostFailure (executor 115 exited caused by one of the running tasks) Reason: Slave lost

Short Description

A "slave lost" error indicates that a Spark task failed because a node in the cluster was terminated or was unavailable. This problem is commonly caused by one of the following:

  • High disk utilization
  • Using Spot Instances for cluster nodes
  • Aggressive Amazon Elastic Compute Cloud (Amazon EC2) Auto Scaling policies

Resolution

High disk utilization

In Hadoop, NodeManager periodically checks the Amazon Elastic Block Store (Amazon EBS) volumes that are attached to the cluster's nodes. If disk utilization on a node that has one volume attached is greater than the YARN property yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage (default value 90%), the node is considered unhealthy. When a node is unhealthy, all containers running on that node are killed. ResourceManager doesn't schedule new containers on unhealthy nodes. For more information, see NodeManager in the Hadoop documentation.

If multiple Spark executors are killed because of unhealthy nodes, the application fails with a "slave lost" error.

Review the NodeManager logs or the instance controller logs to confirm that a node is unhealthy:

  • The location of the NodeManager logs is defined in the YARN_LOG_DIR variable in yarn-env.sh.
  • The instance controller logs are stored at /emr/instance-controller/log/instance-controller.log on the master node. The instance controller logs provide an aggregated view of all the nodes of the cluster.

If a node is unhealthy, the logs show an entry that looks like this:

2019-10-04 11:09:37,163 INFO Poller: InstanceJointStatusMap contains 40 entries (R:40):
  i-006baxxxxxx  1829s R   1817s ig-3B ip-xxx-xx-xx-xxx I:    7s Y:U    11s c: 0 am:    0 H:R  0.0%Yarn unhealthy Reason : 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad: /var/log/hadoop-yarn/containers
  i-00979xxxxxx  1828s R   1817s ig-3B ip-xxx-xx-xx-xxx I:    7s Y:R     9s c: 3 am: 2048 H:R  0.0%
  i-016c4xxxxxx  1834s R   1817s ig-3B ip-xxx-xx-xx-xxx I:   13s Y:R    14s c: 3 am: 2048 H:R  0.0%
  i-01be3xxxxxx  1832s R   1817s ig-3B ip-xxx-xx-xx-xxx I:   10s Y:U    12s c: 0 am:    0 H:R  0.0%Yarn unhealthy Reason : 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad: /var/log/hadoop-yarn/containers

To resolve this problem, increase the size of the EBS volumes that are attached to the core and task nodes. Or, delete unused data from HDFS.

Spot Instances

If you're using Amazon EC2 Spot Instances for EMR cluster nodes—and one of those instances terminates—you might get a "slave lost" error. Spot Instances might be terminated for the following reasons:

  • The Spot Instant price is greater than your maximum price.
  • There aren't enough unused EC2 instances to meet the demand for Spot Instances.

For more information, see Reasons for Interruption.

To resolve this problem:

Amazon EC2 Auto Scaling policies

When a scaling policy performs many scale-in and scale-out events in sequence, a new node might get the same IP address that a previous node used. If a Spark application is running during a scale-in event, the decommissioned node is added to the Spark blacklist to prevent an executor from launching on that node. If another scale-out event occurs and the new node gets the same IP address as the previously decommissioned node, YARN considers the new node valid and attempts to schedule executors on it. However, because the node is still on the Spark blacklist, attempts to launch executors on that node will fail. When the maximum number of failures is reached, the Spark application fails with a "slave lost" error.

To resolve this problem:

To remove a node from the Spark blacklist, decrease the Spark and YARN timeout properties, as shown in the following examples:

Add the following property in /etc/spark/conf/spark-defaults.conf. This reduces the amount of time that a node in the decommissioning state is blacklisted. The default is one hour. For more information, see Configuring Node Decommissioning Behavior.

spark.blacklist.decommissioning.timeout 600s

Modify the following YARN property in /etc/hadoop/conf/yarn-site.xml. This property specifies the amount of time to wait for running containers and applications to complete before transitioning a decommissioning node into the decommissioned state. The default is 3600 seconds.

yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 600