Amazon Web Services ブログ

AWS Step Functions と Apache Livy を使用して Apache Spark アプリケーションをオーケストレーションする

Apache Spark の採用は過去数年で大幅に増加しており、Spark ベースのアプリケーションパイプラインを実行することは今や新しい標準とも言えます。ETL (抽出、変換、ロード) パイプラインにある Spark ジョブの要件はそれぞれ異なります。ジョブ間の依存関係を処理し、実行中の順序を維持し、複数のジョブを並行して実行する必要があります。ほとんどの場合、Apache OozieApache Airflow、さらには Cron などのワークフロースケジューラツールを使用して、これらの要件を満たすことができます。

Apache Oozie は、Hadoop ベースのジョブに広く使われているワークフロースケジューラシステムです。しかし、UI 機能が制限されている、他サービスとの統合がしにくい、XML に大きく依存しているなど、一部のユーザーには適していない恐れがあります。一方、Apache Airflow には、強力な UI とモニタリング機能、いくつかの AWS やサードパーティのサービスとの統合など、たくさんの優れた機能が搭載されています。ただし、Airflow は Airflow サーバーのプロビジョニングと管理を必要とします。Cron ユーティリティは強力なジョブスケジューラです。けれども、仕事の細部に関する可視性があまりなく、Cron ジョブを使用してワークフローを作成することは難しいかもしれません。

特定の順序でいくつかの Spark ジョブを実行したいが、それらのジョブをオーケストレーションしたり、別のアプリケーションを管理するのに時間を取りたくないといった、単純なユースケースがあるとしましょう。 今日は、AWS Step Functions を使ったサーバーレスの方法で、このユースケースを取扱います。Amazon EMR 上で、AWS Step Functions でワークフロー全体を作成し、Apache Livy を通じて、Spark とのやり取りを実行します。

この記事では、AWS Step Functions と Apache Livy を使用して、サーバーレスの Spark ベース ETL パイプラインをオーケストレーションする手順の一覧を順に見ていきます。

データを入力する

この投稿のソースデータについては、ニューヨーク市タクシーリムジン委員会 (TLC) の走行記録データを使用します。データの説明については、この「detailed dictionary of the taxi data」を参照してください。この例では、主に Spark ジョブの次の 3 つの列で作業します。

Column name Column description
RateCodeID 走行終了時に有効なレートコードを表します (標準レートでは 1、JFK 空港では 2、ニューアーク空港では 3 など)。
FareAmount メーターで計算された時間距離の運賃を表します。
TripDistance タクシーメーターが報告する経過距離をマイルで表します。

走行データは、最初の行をヘッダーとして、コンマ区切り値 (CSV) 形式で格納されます。Spark の実行時間を短縮するために、大きな入力データを 2 万行にまで削減しました。デプロイフェーズでは、入力ファイル tripdata.csv は Amazon S3 の <<your-bucket>>/emr-step-functions/input/ フォルダに格納されます。

次の図は、走行データのサンプルを示しています。

ソリューションの概要

次のいくつかのセクションでは、このソリューションの Spark ジョブでの作成方法、Apache Livy を使用した Spark とのやり取り方法、AWS Step Functions を使用したこれら Spark アプリケーションとのオーケストレーション方法について説明します。

このソリューションには、高次レベルで次のステップが含まれています。

  1. 入力ファイルのパスを渡すことによって AWS Step Function ステートマシンをトリガーします。
  2. ステートマシンの最初の段階では、AWS Lambda をトリガーします。
  3. Lambda 関数は Apache Livy を使用して、Amazon EMR 上で動作する Apache Spark とやり取りし、Spark ジョブを送信します。
  4. ステートマシンが Spark ジョブステータスを確認するまで、数秒待ちます。
  5. ジョブステータスに基づいて、ステートマシンは成功状態または失敗状態に移行します。
  6. 後続の Spark ジョブは、同じアプローチで送信されます。
  7. ステートマシンがジョブが終了するまで、数秒待ちます。
  8. ジョブの完了後、ステートマシンは最終ステータスで更新されます。

このソリューションで使用される Spark アプリケーションを見てみましょう。

Spark ジョブ

この例では、spark-taxi.jar と名付けた Spark jar ファイルを作成しました。このファイルには、2 つの異なる Spark アプリケーションがあります。

  1. MilesPerRateCode – Amazon EMR クラスターで実行する最初のジョブ。このジョブは、入力ソースから乗車データを読み取り、各レートコードの合計乗車距離を計算します。このジョブの出力は 2 つの列で構成され、出力パスに Apache Parquet 形式で格納されます。

予想される出力列は、次の通りです。

  • rate_code – 乗車のレートコードを表します。
  • total_distance – そのレートコードの合計乗車距離を表します (例えば、sum (trip_distance))。
  1. RateCodeStatus – 最初のジョブが正常に終了した場合に限り、EMR クラスター上で実行される 2 番目のジョブ。このジョブは、2 つの異なる入力セットに依存します。
  • csv – 最初の Spark ジョブで使用された同じ乗車データ。
  • miles-per-rate – 最初のジョブの結果。

このジョブは、まず tripdata.csv ファイルを読み取り、fare_amountrate_codeで集計します。この時点以降、rate_code で集計された 2 つの異なるデータセットがあります。最後に、ジョブは rate_code フィールドを使用して、2 つのデータセットを結合し、rate code ステータス全体を単一の CSV ファイルに出力します。

出力列は、次の通りです。

  • rate_code_id – レートコードの種類を表します。
  • total_distanc – 最初の Spark ジョブから派生し、総乗車距離を表します。
  • total_fare_amount – 2 番目の Spark アプリケーションで生成される新しいフィールドで、レートコードタイプで合計料金を表します。

この場合、その出力を生成するのに、2 つの異なる Spark ジョブを実行する必要はありません。この方法でジョブを設定するのは、2 つのジョブ間の依存関係を作成し、AWS Step Functions 内で使用するためだけです。

どちらの Spark アプリケーションも、rootPath という 1 つの入力引数を取ります。これは入力と出力のデータとともに Spark ジョブが格納されている Amazon S3 の場所です。次が、最終的な出力のサンプルです。

次のセクションでは、Apache Livy を使用して Amazon EMR 上で実行している Spark アプリケーションとやり取りする方法について解説します。

Apache Livy を使用して Apache Spark とやり取りする

Apache Livy は、EMR クラスタ上で実行している Spark とやり取りするための REST インターフェイスを備えています。Apache Livy は、Amazon EMR のリリースバージョン 5.9.0 以降に含まれています。この記事では、Livy を使って Spark のジョブを送信し、ジョブステータスを取得します。Apache Livy をインストールした状態で Amazon EMR を起動すると、EMR マスターノードは Livy のエンドポイントになり、デフォルトでポート 8998 でのリッスンを開始します。Livy は Spark とやり取りするための API を備えています。

Apache Livy を使って Amazon EMR 上で実行されている Spark とどのようにやり取りできるのか、いくつか例を見てみましょう。

アクティブな実行中のジョブを一覧表示するには、EMR マスターノードから次のコマンドを実行します。

curl localhost:8998/sessions

同じインスタンスをリモートインスタンスから実行する場合は、次のように localhost を EMR ホスト名に変更するだけです (ポート 8998 はセキュリティグループを介してそのリモートインスタンスに対してオープンである必要があります)。

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/sessions

Apache Livy が Amazon S3 のアプリケーション jar を指し示すように Spark ジョブを送信するには、次のようにします。

curl -X POST --data '{"file": "s3://<<bucket-location>/spark.jar", "className": "com.example.SparkApp "]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

Livy を通じて Spark submit は、0 から始まるセッション ID を返します。そのセッション ID を使用すると、次のようにジョブステータスを取得できます。

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/0 | python -m json.tool

Spark submit を使用すると、Spark ジョブと Spark 構成設定の複数の引数を渡すことができます。次のように、S3 パスを args パラメータに渡すことで、Livy を使用して同じことを実行できます。

curl -X POST --data '{"file": "s3://<<bucket-location>>/spark.jar", "className": "com.example.SparkApp", “args”: [“s3://bucket-path”]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

すべての Apache Livy REST 呼び出しは、次の図に示すように、JSON として応答を返します。

その JSON の応答をきれいに印刷したい場合は、次のように Python の JSON ツールでコマンドをつなげることができます。

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/3 | python -m json.tool

Livy API の詳細なリストについては、「Apache Livy REST API」ページを参照してください。この投稿では、GET /batches and POST /batches を使用しています。

次のセクションでは、AWS Step Functions を用いてステートマシンを作成し、Spark アプリケーションをオーケストレーションします。

AWS Step Functions を使用した Spark ジョブワークフローの作成

AWS Step Functions は、各手順を自動的にトリガーおよび追跡し、エラーが発生した場合に再試行します。したがって、アプリケーションは毎回、順番どおりかつ予想どおりに実行されます。AWS Step Functions を使用して Spark ジョブのワークフローを作成するには、最初に、Lambda ステートマシンを作成し、さまざまなタイプのステートを使用してワークフロー全体を作成します。

まず、Task 状態を使用します – 単一の作業単位を実行するAWS Step Functions の簡単な状態を使用します。Wait 状態を使用して、ステートマシンの継続実行に指定時間遅延をかけることもできます。後で Choice 状態を使って、ステートマシンに分岐ロジックを追加します。

下記では、ステートマシンでさまざまな状態を使用して、Spark ETL パイプラインを作成する方法について、簡単に概要を説明します。

  • Task 状態 – Lambda 関数を呼び出します。最初のタスク状態は Amazon EMR で Spark ジョブを送信し、次のタスク状態は以前の Spark ジョブステータスを取得するために使用します。
  • Wait 状態 – ジョブの実行が完了するまで、ステートマシンを一時停止します。
  • Choice 状態 – 各 Spark ジョブの実行は、失敗、エラー、または成功状態を返します。したがって、ステートマシンでは、Choice 状態を使用して、前のステップでの成功および失敗に基づき、次のアクションまたはステップを特定するルールを作成します。

次に示すものは Task 状態の 1 つ、MilesPerRateCode で、単に Spark ジョブを送信するだけのものです。

"MilesPerRate Job": {
      "Type": "Task",
      "Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:blog-miles-per-rate-job-submit-function",
      "ResultPath": "$.jobId",
      "Next": "Wait for MilesPerRate job to complete"
 }

この Task 状態の設定が、実行する Lambda 関数を特定します。Lambda 関数の中で、Livy の POST API を使った Spark ジョブを Livy を経由して送信します。ResultPath を用いて、実行中のタスクの結果をどこに置くべきかをステートマシンに伝えます。前のセクションで説明したように、Spark submit はセッション ID を返します。これは $.jobld で取得され、後の状態で使用されます。

次のコードセクションには Lambda関数 が示されており、MilesPerRateCode ジョブを送信するために使用します。Python リクエストライブラリを使用して、Amazon EMR でホストされている Livy エンドポイントに対して POST を実行し、必要なパラメータをペイロード経由で JSON 形式で渡します。次にレスポンスを解析し、レスポンスから ID を取得し、応答を返します。Next フィールドは、ステートマシンに次に進むべき状態を知らせます。

from botocore.vendored import requests
import json

def lambda_handler(event, context):
  headers = { "content-type": "application/json" }
  url = 'http://xxxxxx.compute-1.amazonaws.com:8998/batches'
  payload = {
    'file' : 's3://<<s3-location>>/emr-step-functions/spark-taxi.jar',
    'className' : 'com.example.MilesPerRateCode',
    'args' : [event.get('rootPath')]
  }
  res = requests.post(url, data = json.dumps(payload), headers = headers, verify = False)
  json_data = json.loads(res.text)
  return json_data.get('id')

MilesPerRate ジョブの場合と同様に、別の状態は RateCodeStatus ジョブを送信しますが、以前のジョブがすべて正常に完了した場合にのみ実行されます。

ここでは、ステートマシンの Task 状態が Spark のジョブステータスを確認しています。

"Query RateCodeStatus job status": {
   "Type": "Task",
   "Resource": "arn:aws:lambda:us-east-1:xxxxx:function:blog-spark-job-status-function",
   "Next": "RateCodeStatus job complete?",
   "ResultPath": "$.jobStatus"
}

他の状態と同様に、前の Task は Lambda 関数を実行し、結果 (jobStatus で表されます) を取得して次の状態に渡します。次は、既定のセッション ID に基づいて Spark のジョブスステータスをチェックする Lambda 関数です。

from botocore.vendored import requests
import json

def lambda_handler(event, context):
    jobid = event.get('jobId')
    url     = 'http://xxxx.compute-1.amazonaws.com:8998/batches/' + str(jobid)
    res = requests.get(url)
    json_data = json.loads(res.text)
    return json_data.get('state')

Choice 状態では、Spark のジョブステータス値をチェックし、それを所定の状態ステータスと比較し、その結果に基づいて状態を移行させます。例えば、ステータスが成功した場合は次の状態に移り (RateCodeJobStatusジョブ)、それが失敗した場合は MilesPerRate job failed の状態に移行します。

"MilesPerRate job complete?": {
   "Type": "Choice",
   "Choices": [{
       "Variable": "$.jobStatus",
       "StringEquals": "success",
       "Next": "RateCodeStatus Job"
   },{
       "Variable": "$.jobStatus",
       "StringEquals": "dead",
       "Next": "MilesPerRate job failed"
   }],
   "Default": "Wait for MilesPerRate job to complete"
}

AWS CloudFormation を使用したウォークスルー

このソリューション全体を設定するには、いくつかの AWS リソースを作成する必要があります。簡単に作成するため、AWS CloudFormation テンプレートを用意しました。このテンプレートが必要な AWS リソースすべてを作成し、AWS Step Functions で Spark ベースの ETL パイプラインを作成するのに必要なすべてのリソースを設定してくれます。

この CloudFormation テンプレートでは、開始時に次の 4 つのパラメータを渡す必要があります。

パラメータ 説明
ClusterSubnetID Amazon EMR クラスターをデプロイし、Lambda がこのサブネットと通信するように設定されているサブネット。
KeyName Amazon EMR クラスターにアクセスするための既存の EC2 キーペアの名前。
VPCID EMR クラスターをデプロイし、Lambda がこの VPC と通信するように設定されている仮想プライベートクラウド (VPC) の ID。
S3RootPath すべての必須ファイル (入力ファイル、Spark ジョブなど) が保存され、結果のデータが書き込まれる Amazon S3 パス。

重要 : これらのテンプレートは、Apache Livy を使用して AWS Step Functions 上で Spark ベースの ETL パイプラインを作成する方法を示すためだけのものです。修正なしでは、本番運用に利用できません。このソリューションを米国東部 (バージニア北部) リージョンの外で試してみるなら、s3://aws-data-analytics-blog/emr-step-functions から必要なファイルをダウンロードし、自分のリージョンのバケットを編集するファイルをアップロードし、必要に応じてスクリプトを編集してから実行してください。

CloudFormation スタックを起動するには、[Launch Stack] を選択します。

このスタックを起動すると、次の AWS リソースのリストが作成されます。

論理 ID リソースタイプ 説明
StepFunctionsStateExecutionRole IAMロール ステートマシンを実行し、ステートサービスと信頼ある関係性を持った IAM ロール。
SparkETLStateMachine AWS Step Functions ステートマシン Spark ETL ワークフローのための AWS Step Functions のステートマシン。
LambdaSecurityGroup Amazon EC2 セキュリティグループ Lambda 関数が Livy API を呼び出すために使用するセキュリティグループ。

RateCodeStatusJobSubmitFunction

 

AWS Lambda 関数 RateCodeStatu ジョブを送信する Lambda 関数。
MilesPerRateJobSubmitFunction AWS Lambda 関数 MilesPerRate ジョブを送信する Lambda 関数。
SparkJobStatusFunction AWS Lambda 関数 Spark ジョブステータスを確認する Lambda 関数。
LambdaStateMachineRole IAMロール lambda と信頼関係を結んだ、すべての Lambda 関数のための IAM ロール。
EMRCluster Amazon EMR クラスター Livy が稼動し、ジョブが配置されている EMR クラスター。

AWS CloudFormation デプロイメントフェーズでは、入力と出力のための S3 パスを設定します。入力ファイルは <<s3-root-path>>/emr-step-functions/input/ path に格納されますが、spark-taxi.jar<<s3-root-path>>/emr-step-functions/ の下にコピーされます。

次のスクリーンショットは、デプロイ後に S3 パスがどのように設定されるかを表しています。この例では、S3 ルートパスの AWS アカウント s3://tm-app-demos で作成したバケットを渡しました。

CloudFormation テンプレートが正常に完了すると、次のように Spark-ETL-State-Machine が AWS Step Functions ダッシュボードに表示されます。

[Spark-ETL-State-Machine] ステートマシンを選択し、この実装を見てみましょう。AWS CloudFormation テンプレートは、ステートマシン全体を、依存関係にある Lambda 関数とともに構築しました。これで実行準備が整いました。

ダッシュボードで新しく作成したステートマシンを選択し、[New execution] を選択してステートマシンを開始します。JSON 形式で入力を渡すように求められます。この入力は、最初に MilesPerRate Job の状態になり、最終的に Lambda 関数 blog-mile-per-rate-job-submit-function を実行します。

Amazon S3 のルートパスを入力として渡す

{

“rootPath”: “s3://tm-app-demos”

}

次に、[Start Execution] を選択する

rootPath の値は、CloudFormation スタックの作成時に渡された値と同じです。これは、S3 バケットの場所または接頭辞付きのバケットにすることができますが、AWS CloudFormation で使用されるのと同じ値である必要があります。この値は、Spark jar と入力ファイルがある場所と、出力ファイルを書き込む場所を、ステートマシンに示します。ステートマシンが始動した後、各状態およびタスクは、ステートマシンでの定義に基づいて実行されます。

以下に、高次レベルでのイベントの流れを表します。

  1. 最初の Spark ジョブ MilesPerRate を実行します。
  2. Spark ジョブは、入力ファイルを <<rootPath>>/emr-step-functions/input/tripdata.csv の場所から読み取ります。ジョブが正常に終了すると、出力データを <<rootPath>>/emr-step-functions/miles-per-rate に書き出します。
  3. Spark ジョブが失敗すると、エラー状態 MilesPerRate job failed に移行し、ステートマシンは停止します。Spark ジョブが正常に終了すると、RateCodeStatus Job 状態に移行し、2 番目の Spark ジョブが実行されます。
  4. 2 番目の Spark ジョブが失敗すると、エラー状態 RateCodeStatus job failed に移行し、ステートマシンは Failed ステータスで停止します。
  5. この Spark ジョブが正常に完了すると、最終出力データを <<rootPath>>/emr-step-functions/rate-code-status/ に書き込み、RateCodeStatus job finished 状態に移行して、ステートマシンは Success ステータスで実行を終了します。

次のスクリーンショットは、正常に完了した Spark ETL ステートマシンを示しています。

ステートマシンダイアグラムの右側には、個々のステートの入力と出力の詳細が示されています。

2 回目にステートマシンを実行すると、S3 パスがすでに存在するために失敗します。ステートマシンは赤色に変わり、MilePerRate job failed で停止します。次の画像は、ステートマシンの実行に失敗したことを示しています。

Amazon EMR コンソールで [Application history] タブを表示して、Spark アプリケーションのステータスとログを確認することもできます。

このウォークスルーでは、AWS Step Functions と Apache Livy を使用して、Amazon EMR 上で Spark ジョブをオーケストレーションするためのサーバーレスソリューションを作成する方法についてご紹介しましたが、お役に立てたでしょうか。次のセクションでは、このソリューションをより洗練させるアイディアをいくつか紹介します。

次のステップ

この記事では、AWS Step Functions を使用して、サーバーレスで Spark ベースのジョブのオーケストレーションを作成するためのシンプルな方法を示すのが目的です。このソリューションを安定したかつ本番に適したものにするには、次のオプションを検討してください。

  • この例では、rootPath を入力として渡し、ステートマシンを手動で開始しています。代わりに、ステートマシンを自動的にトリガーすることも可能です。ファイルが S3 バケットに到着するとすぐに ETL パイプラインを実行するには、新しいファイルパスをステートマシンに渡します。CloudWatch Events は AWS Step Functions をターゲットとしてサポートしているため、S3 イベントの CloudWatch ルールを作成できます。その後、AWS Step Functions をターゲットとして設定し、新しいファイルパスをステートマシンに渡すことができます。準備完了です!
  • さらに、障害発生時にアラートメカニズムを追加することで、このソリューションを改善することも可能です。これを行うには、アラート電子メールを送信する Lambda 関数を作成し、その Lambda 関数を Fail That way に割り当てます。状態のいずれかが失敗したときに電子メールをトリガーしてユーザーに通知します。
  • 複数の Spark ジョブを並行して送信する場合は、AWS Step Functions で Parallel 状態タイプを使用します。Parallel 状態は、ステートマシンで実行の並列分岐を作成するのに使用されます。

Lambda と AWS Step Functions を使用すると、ビッグデータワークロードに対して非常に堅牢なサーバーレスオーケストレーションを作成できます。

クリーンアップ

このソリューションのテストが終了したら、AWS CloudFormation を使用して作成したすべての AWS リソースをクリーンアップするのを忘れないでください。AWS CloudFormation コンソールまたは AWS CLI を使用して、Blog-Spark-ETL-Step-Functions という名前のスタックを削除します。

まとめ

この記事では、AWS Step Functions を使用して Amazon EMR 上で実行している Spark ジョブをオーケストレーションする方法をお話ししました。Apache Livy を使って Lambda 関数から Spark にジョブを送信し、Spark ジョブのワークフローを作成し、特定のジョブの実行順序を維持し、ジョブの結果に基づいて異なる AWS イベントをトリガーしました。
さあ、やってみましょう。このソリューションを試してみてのご感想をぜひシェアしてください!

 


その他の参考資料

この記事がお役に立ったなら、「Building a Real World Evidence Platform on AWS」および「Building High-Throughput Genomics Batch Workflows on AWS: Workflow Layer (Part 4 of 4)」をぜひチェックしてみてください。


著者について

Tanzir Musabbir は、AWS の EMR スペシャリストソリューションアーキテクトです。彼はオープンソース型ビッグデータテクノロジーをいち早く採リ入れた人物です。AWS において、彼はお客様と協力しながら、Amazon EMR、Amazon Athena 、AWS Glue の分析ソリューションを実行するための構造的なガイダンスを提供しています。Tanzir はリアルマドリードの大ファンで、余暇には旅に出るのが好きです。