Amazon Web Services 한국 블로그
Amazon EMR 클러스터 탄력성에 따른 Spark 노드 손실 문제 해결 방법
AWS 고객은 Amazon EMR의 클러스터 탄력성을 활용하여 작업량에 따라 사용 인스턴스 수를 조정해서 비용을 절감할 수 있습니다. 특히, EC2 스팟 인스턴스를 사용하면, 빠르게 끝나는 작업에 대해서 80-90%의 저렴한 비용으로 작업을 할 수 있습니다.
또한, Amazon EMR의 오토 스케일링 기능을 통해 고객은 클러스터 사용이나 기타 작업 관련 지표에 따라 클러스터를 동적으로 확장 및 축소 할 수 있습니다. 다만, 이 기능을 통해 리소스를 효율적으로 사용할 수 있지만 작업 실행 중에 EC2 인스턴스가 중단될 수도 있습니다. 그 결과 계산 및 데이터가 손실될 수 있으며 이는 작업의 안정성을 저해하거나 재컴퓨팅을 통해 중복 작업을 초래할 수 있습니다.
이에 대한 해결 방법으로 실행 중인 작업에 영향을 미치지 않고 노드를 정상적으로 중단하기 위해 Amazon EMR은 Apache Hadoop의 폐기 메커니즘을 사용할 수 있습니다. Amazon EMR 팀은 이 메커니즘을 개발하여 오픈 소스로 공헌하기도 했습니다.
이 메커니즘은 대부분의 하둡 워크로드에서 잘 작동하지만 Apache Spark에서는 그렇지 않습니다. Spark는 현재 노드 손실 문제를 처리하면서도 다양한 단점을 경험하고 있습니다. 이로 인해 작업이 중단되어 손실된 작업 및 데이터를 복구하고 다시 계산해야 할 수 있으며 일부 경우에는 결국 작업이 중단될 수도 있습니다. Spark의 아직 해결되지 않은 문제에 대한 자세한 내용은 다음 링크를 참조하십시오.
Amazon EMR은 이러한 문제 중 일부를 방지하고 AWS 고객이 Spark에서 Amazon EMR의 탄력성 기능을 최대한 활용할 수 있도록 Spark에 대한 사용자 지정 기능을 제공하므로 노드 손실에 대한 복원력이 더욱 뛰어납니다. 재계산을 최소화하고 노드 장애 및 EC2 인스턴스 종료 문제 발생 시 작업을 빠르게 복구할 수 있습니다. 이러한 개선 사항은 Amazon EMR 릴리스 버전 5.9.0 이상에 포함되었습니다.
이 블로그 게시물은 Spark가 노드 손실을 처리하는 방법과 문제점 해결을 위한 Amazon EMR의 개선 사항에 대한 개요를 제공합니다.
Spark의 노드 손실 문제 살펴 보기
Spark 작업 중에 노드가 내려가면 다음과 같은 위험이 발생합니다.
- 노드에서 활발하게 실행 중인 작업을 완료하지 못하고 다른 노드에서 실행해야 할 수 있습니다.
- 노드의 캐시된 RDD(복원 가능한 분산 데이터 세트)가 손실될 수 있습니다. 이는 성능에 영향을 미치지만 장애를 유발하지 않거나 애플리케이션의 안정성에 영향을 주지 않습니다.
- 메모리의 셔플 출력 파일 또는 노드의 디스크에 쓰여진 출력 파일이 손실됩니다. Amazon EMR은 기본적으로 외부 셔플 서비스를 실행하므로 셔플 출력은 디스크에 기록됩니다. 향후 작업이 셔플 파일에 의존하므로 분실된 셔플 파일은 다른 활성 노드에서 재계산될 때까지 애플리케이션을 중단시킬 수 있습니다. 셔플 작업에 대한 자세한 내용은 셔플 작업을 참조하십시오.
노드 손실에서 작업을 복구하려면 Spark가 다음 사항을 수행할 수 있어야 합니다.
- 활발하게 실행 중인 작업이 손실되면 다른 노드에서 예약해야 합니다. 또한 예약되지 않은 나머지 작업에 대한 컴퓨팅을 다시 시작해야 합니다.
- 손실된 노드에서 계산된 셔플 출력은 해당 셔플 블록을 생성한 작업을 재실행하여 재계산되어야 합니다.
다음은 노드가 손실되었을 때 Spark가 복구하는 일련의 이벤트입니다.
- Spark는 노드에서 실행 중인 작업을 실패한 것으로 간주하고 해당 작업을 다른 활성 노드에서 다시 실행합니다.
- 노드가 향후 작업에 필요한 셔플 출력 파일을 가지고 있는 경우, 실패한 노드에서 누락된 셔플 블록을 가져오는 동안 다른 활성 노드의 대상 실행자기 FetchFailedException을 얻습니다.
- FetchFailedException이 발생하면 대상 실행기는 spark.shuffle.io.maxRetries 및 spark.shuffle.io.retryWait 구성 값에 의해 결정된 시간 동안 실패한 노드에서 블록을 가져오려고 재시도합니다. 모든 재시도가 끝나면 실패 작업이 드라이버에 전달됩니다.
- 드라이버가 FetchFailedException을 받으면 실패가 발생한 현재 실행 중인 셔플 스테이지를 실패로 표시하고 실행을 중지합니다. 또한 셔플 블록을 사용 불가능/손실이라 가져올 수 없는 노드 또는 실행기에서의 셔플 출력을 표시하여 다시 계산할 수 있도록 합니다. 이 과정은 이전 맵 스테이지를 트리거하여 누락된 셔플 블록을 다시 계산합니다.
- 누락된 셔플 출력이 계산된 후에는 실패한 셔플 스테이지의 재시도가 트리거되어 작업이 중단된 곳에서 작업을 다시 시작합니다. 그러고 나서 실패했거나 아직 예약되지 않은 작업을 실행합니다.
Spark의 노드 손실 복구 프로세스
Spark의 복구 프로세스는 모든 클라우드 환경에서 발생할 수 있는 임의 실행기 및 노드 오류를 복구하는 데 도움이 됩니다. 그러나 노드가 이미 실패하고 셔플 블록을 가져오는 동안 Spark가 FetchFailedException을 가져온 후에만 복구 프로세스가 시작됩니다. 이로 인해 이 섹션에서 설명하는 몇몇 문제가 발생합니다.
Amazon EMR은 수동 크기 조절, EC2 트리거 스팟 인스턴스 종료 또는 자동 조정 이벤트로 인해 언제 그리고 어느 노드가 내려가는지 알기 때문에 초기에 복구 작업을 시작할 수 있습니다. Amazon EMR이 이러한 노드에 대해 즉시 Spark에 알릴 수 있으므로 Spark는 노드 손실을 정상적으로 처리하고 조기에 복구를 시작할 수 있도록 사전 조치를 취할 수 있습니다. 그러나 현재 Spark에는 YARN 폐기와 같이 노드가 내려감을 알릴 수 있는 메커니즘이 없습니다. 따라서 더 신속한 복구에 도움이 되는 즉각적이고 적절한 조치를 취할 수 없습니다. 그 결과 Spark의 복구와 관련된 몇 가지 문제점이 발생합니다.
- 노드는 다음 다이어그램과 같이 맵 스테이지의 중간에 있습니다.
이 시나리오에서 셔플 스테이지는 불필요하게 예약되고 애플리케이션은 손실된 셔플을 다시 계산하기 전에 FetchFailedException을 기다려야합니다. 이 과정에는 많은 시간이 소요됩니다. 대신, 셔플 스테이지로 진행하기 전에 잃어버린 모든 셔플을 맵 스테이지에서 즉시 다시 계산할 수 있다면 더 좋을 것입니다.
- 노드는 다음 다이어그램과 같이 셔플 스테이지의 중간에 있습니다.
FetchFailedException 및 재시도 페칭에 의존하는 대신 노드 손실을 즉시 Spark에게 알려주는 방법이 있다면 복구 시간을 절약할 수 있습니다.
- Spark 드라이버는 첫 번째 FetchFailedException을 얻을 때 재계산을 시작합니다. 손실된 노드의 셔플 파일이 누락된 것으로 간주합니다. 그러나 여러 노드가 동시에 내려가는 경우 Spark 드라이버는 이전 맵 스테이지를 처음 다시 시도하는 과정에서 FetchFailedException을 수신한 첫 번째 노드의 셔플 출력만 다시 계산합니다. 첫 번째 가져오기 실패를 수신하고 재시도를 시작하는 짧은 시간에 드라이버가 다른 실패한 노드에서 가져오기 실패를 수신할 수 있습니다. 결과적으로 동일한 재시도에서 여러 개의 손실된 노드에 대한 셔플을 다시 계산할 수 있지만 보장할 수는 없습니다. 대부분의 경우 노드가 동시에 내려가더라도 Spark가 손실된 셔플 출력을 모두 재계산하려면 맵과 셔플 스테이지를 여러 번 재시도해야 합니다. 이로 인해 상당한 시간 동안 작업이 차단되는 현상이 쉽게 초래될 수 있습니다. 이상적으로 Spark는 오직 같은 시간에 손실된 모든 노드에서 셔플 출력만 재시도하여 재계산할 수 있습니다.
- 이제 막 내려가려고 하는 노드에 도달할 수만 있다면 Spark는 계속 더 많은 작업을 예약할 수 있습니다. 이로 인해 더 많은 셔플 출력이 계산되므로 결과적으로 재계산해야 할 수도 있습니다. 이상적으로는 이러한 작업을 정상적인 노드로 리디렉션하여 재계산을 방지하고 복구 시간을 향상시킬 수 있습니다.
- Spark는 작업을 중단하기 전에 스테이지에서 허용된 연속 실패 시도 횟수에 제한을 두고 있습니다. 이는 spark.stage.maxConsecutiveAttempts로 구성할 수 있습니다. 노드가 실패하고 FetchFailedException이 발생하면 Spark는 실행 중인 셔플 스테이지를 실패로 표시하고 누락된 셔플 출력을 계산한 후 재시도를 트리거합니다. 셔플 스테이지에서 노드를 자주 확장하면 스테이지 오류가 임계값에 도달하고 작업이 중단될 수 있습니다. 이론적으로 수동 조정, 자동 조정 이벤트 또는 EC2 트리거 스팟 인스턴스 종료와 같은 타당한 이유로 스테이지가 실패하면 Spark가 해당 스테이지에 대한 spark.stage.maxConsecutiveAttempts에 대해 이를 세지 않도록 지시할 수 있는 방법이 있어야 합니다.
Amazon EMR의 Spark의 노드 손실 문제 해결 방법
이 섹션에서는 Amazon EMR이 이전 섹션에서 설명한 문제를 해결하기 위해 Spark에 적용한 세 가지 주요 개선 사항에 대해 설명합니다.
1. YARN의 폐기 메커니즘과 통합
Amazon EMR의 Spark는 YARN을 클러스터 리소스의 기본 관리자로 사용합니다. Amazon EMR은 YARN에 대한 적절한 폐기 메커니즘을 구현하여 폐기 상태의 노드에 새 컨테이너를 예약하지 않고 YARN 노드 관리자를 정상적으로 중단할 수 있는 방법을 제공합니다. Amazon EMR은 노드를 폐기하기 전에 컨테이너 실행에 대한 기존 작업이 완료되거나 시간이 초과할 때까지 기다리는 방식으로 이 작업을 수행합니다. 이 폐기 메커니즘은 최근 오픈 소스 하둡에 다시 기여했습니다.
우리는 YARN의 폐기 메커니즘을 통해 Spark를 통합하여 노드가 YARN에서 폐기 중 또는 폐기된 상태를 거칠 때 Spark 드라이버에 통지합니다. 이러한 내용은 다음 다이어그램에 나옵니다.
이 알림은 드라이버가 제거되기 전에 모든 노드가 폐기 프로세스를 거치기 때문에 드라이버가 적절한 조치를 취하고 조기에 복구를 시작할 수 있도록 합니다.
2. Spark의 블랙리스트 메커니즘 확장
YARN의 폐기 메커니즘은 폐기 노드에서 더 이상 컨테이너를 시작하지 않아 하둡 MapReduce 작업에 적합합니다. 이렇게 하면 더 많은 하둡 MapReduce 작업이 해당 노드에서 예약되는 것을 방지할 수 있습니다. 그러나 Spark에서는 각 실행기가 수명이 오래된 YARN 컨테이너를 할당받고 계속 작업을 수신하기 때문에 이 방법은 Spark 작업에는 제대로 작동하지 않습니다.
새 컨테이너가 시작되지 않도록 하면 더 많은 실행기가 노드에 할당되지 않습니다. 이미 활동 중인 실행기/컨테이너는 노드가 내려갈 때까지 새로운 작업을 계속 예약하고, 결국 실패하고 재실행해야 합니다. 또한 이러한 작업이 셔플 출력을 기록하면 손실될 수도 있습니다. 이렇게 하면 재계산과 복구에 소요되는 시간이 늘어납니다.
이 문제를 해결하기 위해 Spark 드라이버가 YARN 폐기 신호를 받으면 Amazon EMR은 Spark의 블랙리스트 작성 메커니즘을 확장하여 노드를 블랙리스트에 작성합니다. 이러한 내용은 다음 다이어그램에 나옵니다.
이를 통해 새 작업이 블랙리스트에 있는 노드에 예약되는 것을 방지할 수 있습니다. 대신 새 작업은 정상적인 노드에 예약됩니다. 노드에서 이미 실행 중인 작업이 완료되는 즉시 노드는 작업 장애 또는 손실의 위험 없이 안전하게 폐기될 수 있습니다. 또한 내려가는 노드에서 더 많은 셔플 출력을 생성하지 않아 복구 프로세스의 속도를 향상시킵니다. 이는 재계산될 셔플 출력의 수를 줄입니다. 노드가 폐기 상태에서 벗어나서 다시 활성화되면 Amazon EMR은 해당 노드를 블랙리스트에서 제거하여 새 작업을 예약할 수 있도록 합니다.
이 블랙리스트 작성 확장은 기본적으로 spark.blacklist.decommissioning.enabled 속성이 참으로 설정된 Amazon EMR에서 활성화됩니다. spark.blacklist.decommissioning.timeout 속성을 사용하여 노드가 블랙리스트에 있는 시간을 제어할 수 있습니다. 이는 기본적으로 1시간으로 설정되어 있으며 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs의 기본값과 동일합니다. Amazon EMR이 전체 폐기 기간 동안 노드를 블랙리스트에 표시하도록 spark.blacklist.decommissioning.timeout을 yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs보다 같거나 큰 값으로 설정할 것을 권장합니다.
3. 폐기된 노드에 대한 작업
노드가 폐기 중이고 새 작업이 예약되지 않으며 활성 컨테이너가 유휴 상태가 되거나 제한 시간이 만료되면 해당 노드는 폐기됩니다. Spark 드라이버가 폐기된 신호를 수신하면 가져오기 실패가 발생할 때까지 기다리지 않고 복구 프로세스를 더 빨리 시작하기 위해 다음과 같은 추가 작업을 수행할 수 있습니다.
- 폐기된 노드의 모든 셔플 출력은 등록이 해제되어 사용할 수 없는 것으로 표시됩니다. Amazon EMR은 기본적으로 spark.resourceManager.cleanupExpiredHost를 참으로 설정하여 이 기능을 실행합니다. 여기에는 다음과 같은 이점이 있습니다.
- 노드가 맵 스테이지 중간에 손실되어 폐기될 경우 Spark는 다음 단계로 진행하기 전에 복구를 시작하고 폐기된 노드에서 손실된 셔플 출력을 재계산합니다. 이는 셔플 스테이지에서 가져오기 실패를 방지합니다. Spark가 맵 스테이지 끝에서 계산되고 사용 가능한 모든 셔플 블록을 가지므로 복구 속도가 크게 빨라지기 때문입니다.
- 셔플 스테이지 중간에 노드가 손실되면 손실된 노드에서 셔플 블록을 얻으려 하는 대상 실행기는 셔플 출력을 사용할 수 없음을 바로 알게 됩니다. 그런 다음, 드라이버를 다시 시도하거나 여러 번 실패하여 드라이버를 가져오는 대신 드라이버에 실패를 전송합니다. 그러면 드라이버는 즉시 스테이지를 실패 처리하고 손실된 셔플 출력을 재계산하기 시작합니다. 이렇게 하면 손실된 노드에서 셔플 블록을 가져오는 데 소요되는 시간을 단축할 수 있습니다.
- 셔플 출력을 등록 해제할 때의 가장 중요한 이점은 클러스터가 다수의 노드로 확장될 때 구현됩니다. 모든 노드가 같은 시간에 내려가기 때문에 모두 같은 시간에 폐기되고 셔플 출력은 등록 해제됩니다. Spark는 누락된 블록을 계산하기 위한 첫 번째 재시도를 예약할 때 폐기된 노드에서 누락된 블록을 모두 발견하고 이를 단 한 번의 시도로 복구합니다. 이는 모든 노드에서 누락된 셔플을 재계산하기 위해 스테이지를 여러 번 재조정할 수 있는 오픈 소스 Spark 구현에 비해 복구 프로세스의 속도를 크게 높이고 작업이 실패하여 재계산되는 것을 방지합니다.
- 기본적으로 폐기되고 있는 노드에서 가져오기 실패로 인해 스테이지가 실패할 경우 Amazon EMR은 spark.stage.maxConsecutiveAttempts에서 설정한 스테이지에서 허용되는 최대 실패 횟수에 대해 스테이지 장애를 계산하지 않습니다. 이는 spark.stage.attempt.ignoreOnDecommissionFetchFailure가 참이 되도록 설정하여 결정됩니다. 이를 통해 수동 크기 조절, 자동 조정 이벤트 또는 EC2 트리거 스팟 인스턴스 종료와 같은 타당한 이유로 발생한 노드 오류 때문에 스테이지가 여러 번 실패한 경우에도 작업이 실패하지 않습니다.
요약
이 글에서는 Spark가 노드 손실을 처리하는 방법과 활성 Spark 작업 중에 클러스터의 크기가 조정될 때 발생할 수 있는 몇몇 문제에 대해 설명합니다. 또한 Amazon EMR이 Spark에 구축한 사용자 지정 기능과 Amazon EMR에서 Spark을 보다 탄력적으로 사용할 수 있도록 한 구성을 소개하여 Amazon EMR에서 제공하는 탄력성 기능을 최대한 활용할 수 있도록 합니다.
질문이나 제안이 있으면 의견을 남겨주시기 바랍니다.
Udit Mehrotra는 Amazon Web Services의 소프트웨어 개발 엔지니어입니다. Amazon EMR의 주요 기능을 개발하고 있으며, Apache Spark, Apache Hadoop 및 Apache Hive와 같은 오픈 소스 프로젝트에도 참여합니다.
이 글은 AWS Bigdata 블로그의 Spark enhancements for elasticity and resiliency on Amazon EMR의 한국어 번역입니다.