Amazon Web Services ブログ

Amazon EMR の伸縮性と回復力を高めるための Spark の機能強化

お客様は Amazon EMR の伸縮性を利用して、ワークフローが完了したとき、またはより軽いジョブを実行するときにクラスターをスケールしてコストを節約しています。これは、低コストの Amazon EC2 スポットインスタンスでクラスターを起動する場合も同様です。

Amazon EMR の Automatic Scaling 機能により、お客様はクラスターの使用状況やその他のジョブ関連のメトリックスに基づいて、クラスターを動的にスケーリングできます。これらの機能は、リソースを効率的に使用するのに役立ちますが、実行中のジョブの途中で EC2 インスタンスがシャットダウンすることもあります。これにより、計算とデータが失われる可能性があり、それがジョブの安定性に影響を与えたり、再計算による重複作業を招いたりする可能性があります。

実行中のジョブに影響を与えずにノードを適切にシャットダウンするために、Amazon EMR は Apache Hadoop の廃止メカニズムを使用しています。このメカニズムは Amazon EMR チームが開発し、コミュニティに還元したものです。これはほとんどの Hadoop ワークロードではうまく機能しますが、Apache Spark ではそれほどうまくいきません。Spark は現在、ノードの損失に対処する際、さまざまな問題点に直面しています。これにより、失われたタスクやデータをリカバリおよび再計算しようとしてジョブが停滞したり、場合によってはジョブがクラッシュしたりすることもあります。Spark の未解決問題のいくつかについての詳細は、以下のリンクを参照してください。

これらの問題のいくつかを回避し、お客様が Spark を使用して Amazon EMR の伸縮性機能を最大限に活用できるようにするために、Amazon EMR では、オープンソースの Spark をカスタマイズしてノードの損失に対する回復力を高めることができます。再計算は最小限に抑えられ、ジョブはノードのエラーおよび EC2 インスタンスの終了からより迅速にリカバリできます。これらの改善点は、Amazon EMR リリースバージョン 5.9.0 以降で反映されています。

このブログ記事では、問題に対処するためにオープンソースの Spark でノードの損失に対処する方法と、Amazon EMR の改善にまつわる問題点の概要について説明します。

Spark でノードの損失に対処する方法

アクティブな Spark ジョブの実行中にノードが停止すると、次のような危険性があります。

  • ノード上でアクティブに実行されているタスクが完了できず、別のノード上で実行しなければならない。
  • ノード上のキャッシュされた RDD (回復力のある分散データセット) が失われる可能性がある。これはパフォーマンスに影響を与えますが、障害を引き起こしたり、アプリケーションの安定性に影響を与えたりすることはありません。
  • メモリ内のシャッフル出力ファイルやノード上のディスクに書き込まれたファイルが失われる。Amazon EMR はデフォルトで外部シャッフルサービスを有効にしているため、シャッフル出力がディスクに書き込まれます。シャッフルファイルを失うと、別のアクティブノードで再計算されるまでアプリケーションが停止する可能性があります。将来のタスクはそれらに依存する可能性があるためです。シャッフル操作の詳細については、「シャッフル操作」を参照してください。

ノードの損失からリカバリするために、Spark は次のことが行えます。

  • アクティブに実行されているタスクが失われた場合は、別のノードでそれらをスケジュールする必要があります。さらに、予定外の残りのタスクの計算を再開する必要があります。
  • 失われたノードで計算したシャッフル出力は、シャッフルブロックを生成したタスクを再実行することによって再計算しなければなりません。

以下は、ノードが失われたときに Spark がリカバリするイベントのシーケンスです。

  • Spark は、ノード上でアクティブに実行中のタスクを失敗したと見なし、それらを別のアクティブノードで再実行します。
  • ノードに将来のタスクに必要なシャッフル出力ファイルがある場合、他のアクティブノード上のターゲットエグゼキューターは、失敗したノードから欠けているシャッフルブロックを取得しようとしている間に FetchFailedException を受け取ります。
  • FetchFailedException が発生すると、ターゲットエグゼキューターは spark.shuffle.io.maxRetries および spark.shuffle.io.retryWait 設定値が決定した時間に失敗したノードからブロックを取得することを再試行しようとします。すべての再試行が尽きた後、失敗はドライバーに伝播されます。
  • ドライバーは FetchFailedException を受け取ると、障害が発生した現在実行中のシャッフルフェーズを「失敗」とマークし、その実行を停止します。また、シャッフルブロックを取得できなかったノードまたはエグゼキューター上のシャッフル出力を「使用不可/喪失」とマークし、再計算できるようにします。これにより、前の Map フェーズがそれらの欠けているシャッフルブロックの再計算を再試行します。
  • 欠けているシャッフル出力が計算された後、失敗したシャッフルフェーズの再試行がトリガーされ、停止したところからジョブを再開します。次に、失敗したかまだスケジュールされていないタスクを実行します。

Spark がノードの損失に対処する際の問題

Spark のリカバリプロセスは、あらゆるクラウド環境で発生する可能性があるランダムエグゼキューターやノードの障害からリカバリするのに役立ちます。ただし、ノードがすでに失敗し、シャッフルブロックをフェッチしようとしている間に Spark が FetchFailedException を取得した後でのみリカバリプロセスが開始されます。これはこのセクションで説明されている問題のいくつかを引き起こします。

Amazon EMR は、手動でのサイズ変更、EC2 によるスポットインスタンスの終了、または自動スケーリングイベントによって、いつどのノードが停止しているかを認識しているため、リカバリを早期に開始できます。Spark はこれらのノードについてすぐに Spark に通知することができるので、Spark はノードの損失を適切に処理して早期にリカバリを開始するためにプロアクティブなアクションを実行できます。けれども Spark には、YARN の廃止処理など、ノードが停止していることを通知するためのメカニズムは現在ありません。したがって、早期のリカバリを行うための適切な措置をすぐに講じることはできません。その結果、Spark のリカバリに関する以下のいくつかの問題点があります。

  • 次の図に示すように、ノードは Map フェーズの途中で停止します。

このシナリオでは、シャッフルフェーズは不必要にスケジュールされており、アプリケーションは失われたシャッフルを再計算する前に FetchFailedException を待つ必要があります。これにはかなり時間がかかります。代わりに、シャッフルフェーズに進む前でも、失われたシャッフルをすべて Map フェーズですぐに再計算できればさらに良いでしょう。

  • 次の図に示すように、ノードはシャッフルフェーズの途中で停止してします。

FetchFailedException に依存してフェッチを再試行するのではなく、すぐにノードの損失について Spark に通知する方法がある場合は、復旧時間を節約できます。

  • Spark ドライバーは、最初の FetchFailedException を受け取ったときに再計算を開始します。失われたノード上のシャッフルファイルは存在しないと見なされます。ただし、前の Map フェーズの最初の再試行で複数のノードが同時に停止した場合、Spark ドライバーは FetchFailedException を受け取った最初のノードのシャッフル出力のみを再計算します。最初の取得の失敗を受信してから再試行を開始するまでの短い間に、ドライバーが他の失敗したノードから取得の失敗を受信する可能性があります。その結果、同じ再試行で複数の失われたノードに対してシャッフルを再計算することはできますが、保証はありません。ほとんどの場合、ノードが同時に停止しても、失われたシャッフル出力をすべて再計算するため、Spark はマップとシャッフルフェーズを複数回再試行する必要があります。これにより、ジョブがかなり長い間ブロックされやすくなってしまいます。理想的には、Spark は、ほぼ同時に失われたすべてのノードでシャッフル出力を 1 回だけ再試行することです。
  • 停止しようとしているノードに到達できる限り、Spark はそのノードでさらにタスクをスケジュールし続ける可能性があります。これにより、より多くのシャッフル出力が計算され、最終的には再計算する必要があるかもしれません。理想的には、これらのタスクを正常なノードにリダイレクトして再計算を防ぐことで、復旧時間を短縮することです。
  • Spark にはフェーズで連続して失敗できる試行回数に制限があり、これを超えるとジョブが中止されます。これは spark.stage.maxConsecutiveAttempts で設定することができます。ノードに障害が発生して FetchFailedException が発生すると、Spark は実行中のシャッフルステージを「失敗」とマークし、欠けているシャッフル出力を計算した後に再試行をトリガーします。シャッフルフェーズ中に頻繁にノードをスケーリングすると、フェーズの失敗がしきい値に達し、ジョブが中止され易くなります。手動スケールイン、自動スケーリングイベントや EC2 によって引き起こされるスポットインスタンスの終了などの正当な理由でフェーズが失敗した場合、失敗をそのフェーズの spark.stage.maxConsecutiveAttempts にカウントしないように Spark に指示する方法があれば理想です。

Amazon EMR でこれらの問題を解決する方法

このセクションでは、前のセクションで説明した問題を解決するために、Amazon EMR が Spark に対して行った 3 つの主な機能強化について説明します。

YARN の廃止メカニズムと統合する

Amazon EMR の Spark は、クラスターリソースの基礎となるマネージャーとして YARN を使用しています。Amazon EMR には、YARN の適切な廃止メカニズムを独自に実装しており、Decommissioning 状態のノードで新しいコンテナをスケジュールしないことで YARN ノードマネージャーを適切にシャットダウンする方法を提供しています。Amazon EMR は、ノードの使用が中止される前に、実行中のコンテナに関する既存のタスクが完了するまで、またはタイムアウトするまで待機します。この廃止メカニズムは最近オープンソースの Hadoop に還元されました。

Spark を YARN の廃止メカニズムと統合して、ノードが YARN の Decommissioning または Decommissioned 状態になると Spark ドライバーに通知されるようにしました。次の図でこれを示します。

この通知により、すべてのノードが削除される前に廃止プロセスを経るため、ドライバーは適切なアクションを実行してリカバリを早期に開始できます。

Spark のブラックリスト登録メカニズムを拡張する

YARN の廃止メカニズムは、廃止処理されているノードでそれ以上コンテナを起動しないため、Hadoop MapReduce ジョブに適しています。これにより、そのノードでさらに Hadoop MapReduce タスクがスケジュールされるのを防ぎます。ただし、Spark では各エグゼキューターに長寿命の YARN コンテナが割り当てられ、タスクを受信し続けるため、これは Spark ジョブではうまく機能しません。

新しいコンテナが起動されないようにしても、エグゼキューターがさらにノードに割り当てられることを妨ぐだけです。すでにアクティブなエグゼキュータ/コンテナは、ノードが停止するまで新しいタスクをスケジュールし続け、それらが失敗すると、再実行する必要があります。また、これらのタスクがシャッフル出力を書き込んでいた場合、それらも失われてしまいます。これにより、再計算とリカバリにかかる時間が長くなります。

これに対処するために、Amazon EMR は、Spark ドライバーが YARN 廃止シグナルを受信したときに、そのノードをブラックリストに登録するように Spark のブラックリスト登録メカニズムを拡張しています。次の図でこれを示します。

これにより、ブラックリストに登録されたノードで新しいタスクがスケジュールされるのを防ぎます。そして正常なノード上でスケジュールされるようになります。ノード上ですでに実行中のタスクが完了するとすぐに、タスクの失敗や損失のリスクなしに、ノードを安全に廃止することができます。これはまた、停止しているノードでそれ以上シャッフル出力が生成されないようにすることで、リカバリプロセスをスピードアップします。これにより、再計算されるシャッフル出力の数が減ります。ノードが Decommissioning 状態を脱して再びアクティブになると、Amazon EMR はそのノードをブラックリストから削除して、新しいタスクをスケジュールできるようにします。

このブラックリストの拡張機能は、Amazon EMR ではspark.blacklist.decommissioning.enabled プロパティが true に設定されているため、デフォルトで有効になっています。spark.blacklist.decommissioning.timeout プロパティを使用してノードがブラックリストに登録される時間を制御できます。これは yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs のデフォルト値と等しい 1 時間にデフォルトで設定されています。Amazon EMR が廃止期間全体にかけてノードをブラックリストに登録されるように、spark.blacklist.decommissioning.timeoutyarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs 以上の値に設定することをお勧めします。

廃止されたノードに対するアクション

ノードが廃止処理され、新しいタスクがスケジュールされず、アクティブコンテナがアイドル状態になる (またはタイムアウトになる) と、ノードは廃止されます Spark ドライバーが廃止されたシグナルを受信すると、取得の失敗が発生するのを待つのではなく、リカバリプロセスを早く開始するために以下の追加のアクションを実行できます。

  • 廃止されたノード上のシャッフル出力はどれも登録されていないため、「使用不可」としてマークされます。Amazon EMR では、デフォルトで spark.resourceManager.cleanupExpiredHost が true に設定され、これを有効にしています。これには以下の利点があります。
    • ノードがマップフェーズの途中で失われて廃止されると、Spark はリカバリを開始し、廃止されたノードで失われたシャッフル出力を再計算してから、次のフェーズに進みます。Spark はすべてのシャッフルブロックが計算されてマップフェーズの最後に使用可能になるため、シャッフルステージでのフェッチ失敗を防ぎます。これによりリカバリが大幅にスピードアップします。
    • ノードがシャッフルフェーズの途中で失われた場合、失われたノードからシャッフルブロックを取得しようとしているターゲットエグゼキュータは、シャッフル出力が利用できないことをすぐに認識します。次に、再試行して複数回失敗して取得するのではなく、失敗をドライバーに送信します。さらに、ドライバーはすぐにフェーズを失敗させ、失われたシャッフル出力の再計算を開始します。これにより、失われたノードからシャッフルブロックを取得するのにかかる時間が短縮されます。
    • シャッフル出力の登録を解除することの最も重要な利点は、クラスターが多数のノードによってスケールインされるときです。すべてのノードがほぼ同時に停止するため、すべてのノードがほぼ同時に廃止され、それらのシャッフル出力が登録解除されます。Spark は、失われたブロックを計算するための最初の再試行をスケジュールするとき、廃止されたノードから失われたブロックすべてを認識し、1 回の試行のみでリカバリします。これにより、すべてのノードから不足しているシャッフルを再計算するためにフェーズが複数回再スケジュールされ、何時間も失敗して再計算されることでジョブが停滞するのを防ぐことができるため、オープンソースの Spark を実装したときよりもリカバリプロセスが大幅にスピードアップします。
  • 廃止処理されているノードからの取得が失敗したためにフェーズが失敗した場合、デフォルトでは、Amazon EMR はspark.stage.maxConsecutiveAttempts で設定されているフェーズの最大許容失敗数にフェーズの失敗をカウントしません。これは、設定 spark.stage.attempt.ignoreOnDecommissionFetchFailure が true に設定されていることによって決まります。これは、手動のサイズ変更、自動スケーリングイベントや、 EC2 によってトリガーされるスポットインスタンスの終了などの正当な理由でノード障害が発生したことでフェーズが複数回失敗した場合に、ジョブが失敗するのを防ぎます。

結論

この記事では、Spark がノードの喪失を処理する方法と、アクティブな Spark ジョブの実行中にクラスターがスケールインされた場合に発生する可能性がある問題について説明しました。また、Amazon EMR が Spark 上に構築したカスタマイズ、および Amazon EMR の Spark をより回復力のあるものにするために使用できる設定を示しました。これにより、Amazon EMR が提供する伸縮性機能を最大限に活用できます。

ご不明な点がございましたら、コメントをお寄せください。

 


著者について

Udit Mehrotra は、アマゾン ウェブ サービスのソフトウェア開発エンジニアです。EMR の最先端機能に取り組み、Apache Spark、Apache Hadoop、Apache Hive などのオープンソースプロジェクトにも携わっています。余暇はギターを弾き、旅行し、腕時計を鑑賞し、友達と付き合って過ごしています。