Amazon Web Services ブログ

新着情報 – Step Functions を使用して Amazon EMR ワークロードをオーケストレートする

AWS Step Functions を使用すると、アプリケーションにサーバーレスワークフローのオートメーションを追加できます。ワークフローのステップは、 AWS Lambda 関数、Amazon Elastic Compute Cloud (EC2)、またはオンプレミスなど、どこでも実行可能です。ワークフローの構築を簡略化するために、Step Functions は AWS の複数のサービス (Amazon ECSAWS FargateAmazon DynamoDBAmazon Simple Notification Service (SNS)Amazon Simple Queue Service (SQS)AWS BatchAWS GlueAmazon SageMaker、および (ネストされたワークフローを実行するために) Step Functions 同士) と直接統合されています。

本日より、Step FunctionsAmazon EMR に接続されました。これにより、最小限のコードでデータ処理および分析ワークフローを作成して、時間の節約、クラスターの使用の最適化を実現することができます。たとえば、機械学習用のデータ処理パイプラインの構築は時間がかかり、困難です。この新しい統合により、並列実行や前のステップにおける結果からの依存関係といったワークフロー機能のオーケストレーションや、データ処理ジョブを実行する際の障害および例外の処理が簡単になります。

具体的には、Step Functions ステートマシンで以下のことができるようになりました。

  • EMR クラスター作成終了 (クラスターの終了保護の変更も可能)。これを行うと、既存の EMR クラスターをワークフローで再利用したり、ワークフローの実行中にオンデマンドで作成したりできます。
  • クラスターの EMR ステップ追加キャンセル。 EMR ステップは、クラスターにインストールされたソフトウェア (Apache、SparkHive、または Presto といったツールなど) による処理のためにデータを操作する手順を含んだ作業単位です。
  • EMR クラスターインスタンスのフリートグループサイズ変更すると、ワークフローの各ステップの要件に応じてプログラムでスケーリングを管理できるようになります。たとえば、コンピューティングを多用するステップを追加する前にインスタンスグループのサイズを大きくし、完了後に小さくすることもできます。

クラスターを作成もしくは終了するか、または EMR ステップをクラスターに追加する際、同期統合を利用すると、対応するアクティビティが EMR クラスターで完了したときにのみワークフローの次のステップに移動することができます。

EMR クラスターの設定や状態の読み取りは、Step Functions サービス統合には含まれていません。必要な場合は、Lambda 関数をタスクとして使用して、EMR List* および Describe* API にアクセスすることができます。

EMRStep Functions でワークフローを構築する
Step Functions コンソールで新しいステートマシンを作成します。視覚的にレンダリングされるため理解しやすいコンソールとなっています。

ステートマシンを作成するには、Amazon States Language (ASL) を使用して以下の定義を使用します。

{
  "StartAt": "Should_Create_Cluster",
  "States": {
    "Should_Create_Cluster": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.CreateCluster",
          "BooleanEquals": true,
          "Next": "Create_A_Cluster"
        },
        {
          "Variable": "$.CreateCluster",
          "BooleanEquals": false,
          "Next": "Enable_Termination_Protection"
        }
      ],
      "Default": "Create_A_Cluster"
    },
    "Create_A_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
      "Parameters": {
        "Name": "WorkflowCluster",
        "VisibleToAllUsers": true,
        "ReleaseLabel": "emr-5.28.0",
        "Applications": [{ "Name": "Hive" }],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "LogUri": "s3://aws-logs-123412341234-eu-west-1/elasticmapreduce/",
        "Instances": {
          "KeepJobFlowAliveWhenNoSteps": true,
          "InstanceFleets": [
            {
              "InstanceFleetType": "MASTER",
              "TargetOnDemandCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge"
                }
              ]
            },
            {
              "InstanceFleetType": "CORE",
              "TargetOnDemandCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge"
                }
              ]
            }
          ]
        }
      },
      "ResultPath": "$.CreateClusterResult",
      "Next": "Merge_Results"
    },
    "Merge_Results": {
      "Type": "Pass",
      "Parameters": {
        "CreateCluster.$": "$.CreateCluster",
        "TerminateCluster.$": "$.TerminateCluster",
        "ClusterId.$": "$.CreateClusterResult.ClusterId"
      },
      "Next": "Enable_Termination_Protection"
    },
    "Enable_Termination_Protection": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "TerminationProtected": true
      },
      "ResultPath": null,
      "Next": "Add_Steps_Parallel"
    },
    "Add_Steps_Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Step_One",
          "States": {
            "Step_One": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The first step",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "hive-script",
                      "--run-hive-script",
                      "--args",
                      "-f",
                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
                      "-d",
                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",
                      "-d",
                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
                    ]
                  }
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Wait_10_Seconds",
          "States": {
            "Wait_10_Seconds": {
              "Type": "Wait",
              "Seconds": 10,
              "Next": "Step_Two (async)"
            },
            "Step_Two (async)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The second step",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "hive-script",
                      "--run-hive-script",
                      "--args",
                      "-f",
                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
                      "-d",
                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",
                      "-d",
                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
                    ]
                  }
                }
              },
              "ResultPath": "$.AddStepsResult",
              "Next": "Wait_Another_10_Seconds"
            },
            "Wait_Another_10_Seconds": {
              "Type": "Wait",
              "Seconds": 10,
              "Next": "Cancel_Step_Two"
            },
            "Cancel_Step_Two": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "StepId.$": "$.AddStepsResult.StepId"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": null,
      "Next": "Step_Three"
    },
    "Step_Three": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "Step": {
          "Name": "The third step",
          "ActionOnFailure": "CONTINUE",
          "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
              "hive-script",
              "--run-hive-script",
              "--args",
              "-f",
              "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
              "-d",
              "INPUT=s3://eu-west-1.elasticmapreduce.samples",
              "-d",
              "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
            ]
          }
        }
      },
      "ResultPath": null,
      "Next": "Disable_Termination_Protection"
    },
    "Disable_Termination_Protection": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "TerminationProtected": false
      },
      "ResultPath": null,
      "Next": "Should_Terminate_Cluster"
    },
    "Should_Terminate_Cluster": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.TerminateCluster",
          "BooleanEquals": true,
          "Next": "Terminate_Cluster"
        },
        {
          "Variable": "$.TerminateCluster",
          "BooleanEquals": false,
          "Next": "Wrapping_Up"
        }
      ],
      "Default": "Wrapping_Up"
    },
    "Terminate_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId"
      },
      "Next": "Wrapping_Up"
    },
    "Wrapping_Up": {
      "Type": "Pass",
      "End": true
    }
  }
}

Step Functions コンソールで、このステートマシンの実行用に AWS Identity and Access Management (IAM) ロールを新たに作成します。ロールには、EMR へのアクセスに必要なアクセス許可がすべて自動で含まれます。

このステートマシンは、既存の EMR クラスターまたは新規作成したクラスターを使用できます。以下の入力を使用して、ワークフローの終わりで終了する新しいクラスターを作成できます。

{
"CreateCluster": true,
"TerminateCluster": true
}

既存のクラスターを使用するには、以下の構文を使用してクラスター ID を入力する必要があります。

{
"CreateCluster": false,
"TerminateCluster": false,
"ClusterId": "j-..."
}

仕組みを見ていきましょう。ワークフローが開始されると、Should_Create_Cluster Choice 状態が入力を調べて、Create_A_Cluster 状態に入るかどうかを決定します。そして、同期呼び出し (elasticmapreduce:createCluster.sync) を使用し、次のワークフロー状態に進む前に、新規の EMR クラスターが WAITING 状態になるのを待ちます。AWS Step Functions コンソールには、作成中のリソースが EMR コンソールへのリンク付きで表示されます。

その後、Merge_Results Pass 状態は、入力状態を新規作成されたクラスターのクラスター ID とマージして、ワークフローの次のステップに渡します。

どんなデータの処理を開始する前であっても、Enable_Termination_Protection 状態 (elasticmapreduce:setClusterTerminationProtection) を使用すれば、EMR クラスター内の EC2 インスタンスが事故やエラーによってシャットダウンされることはありません。

これで、EMR クラスターを使用する準備が整いました。このワークフローには 3 つの EMR ステップがあります。便宜上、これらのステップはすべてこの Hive チュートリアルに基づいています。各ステップで、Hive の SQL と同様のインターフェイスを使用し、CloudFront のサンプルログでクエリを実行して結果を Amazon Simple Storage Service (S3) に書き込みます。実稼働ユースケースでは、おそらく EMR ツールを組み合わせて、データの処理や分析を並行して行う (2 つ以上のステップを同時に実行する) か、依存関係を用いる (あるステップの出力を別のステップが必要とする) ことになるでしょう。同様のことを試してみましょう。

最初に、Parallel 状態内で Step_OneStep_Two を実行します。

  • Step_One では、EMR ステップをジョブとして同期的に実行しています (elasticmapreduce:addStep.sync)。つまり、ワークフローの次のステップに進む前に、EMR ステップが完了する (またはキャンセルされる) のを実行が待機するということです。オプションでタイムアウトを追加して、EMR ステップが予想される時間枠内に実行されるかを監視することもできます。
  • Step_Two では、EMR ステップを非同期的に追加しています(elasticmapreduce:addStep)。この場合、リクエストが受信されたことを EMR が応答し次第、ワークフローは次のステップに進むことになります。数秒経ってから、別の統合を試すために Step_Two をキャンセルします (elasticmapreduce:cancelStep)。この統合は、実稼働ユースケースで非常に役立つでしょう。たとえば、並行して実行されている別のステップでエラーが発生し、このステップの実行を続ける意味がなくなった場合に、EMR ステップをキャンセルできます。

これらの 2 つのステップがいずれも完了し結果が生成されたら、Step_One と同様に Step_Three をジョブとして実行します。このワークフローではクラスターを使用したので、Step_Three が完了したら Disable_Termination_Protection ステップに入ります。

入力状態に応じて、Should_Terminate_Cluster Choice 状態は Terminate_Cluster 状態 (elasticmapreduce:terminateCluster.sync) に入り、EMR クラスターが終了するのを待つか、または Wrapping_Up 状態に直接進んでクラスターを実行したままにします。

最終的に、Wrapping_Up 状態になります。実はこの最後の状態ですることはあまりないのですが、Choice 状態からワークフローを終了することはできません。

EMR コンソールでクラスターのステータスと EMR ステップのステータスを確認します。

AWS コマンドラインインターフェイス (CLI) を使用して、EMR ステップの出力として設定された S3 バケットにあるクエリの結果を確認します。

aws s3 ls s3://MY-BUCKET/MyHiveQueryResults/
...

入力に基づいて、このワークフローの実行終了時にも EMR クラスターはまだ実行中です。Create_A_Cluster ステップのリソースリンクから、EMR コンソールに移動してクラスターを終了します。このデモのとおりに実行する場合、不要であれば EMR クラスターを実行したままにしないように注意してください。

今すぐ利用可能
Step FunctionsEMR との統合は、すべてのリージョンで利用できます。この機能を使用するにあたり、通常の Step Functions および EMR の料金に追加料金が発生することはありません。

Step Functions を使用して、EMR ジョブを実行するための複雑なワークフローをすばやく構築できるようになりました。ワークフローには、並列実行、依存関係、および例外処理を含めることができます。 Step Functions を使用すれば、問題が発生した際に何が起こったかを特定できるため、重大なエラーが発生した後でも、失敗したジョブの再試行やワークフローの終了を簡単に行えます。この機能を、みなさんはどのように使用しますか。ぜひ教えてください。

Danilo