Amazon Web Services ブログ

AWS Step Functions の Distributed Map と再実行機能を使用した効率的な ETL パイプラインの構築

AWS Step Functions は、完全マネージドのビジュアルワークフローサービスで、AWS GlueAmazon EMRAmazon Redshift などのさまざまな抽出・変換・読み込み (Extract, Transform, Load; ETL) テクノロジーを含む複雑なデータ処理パイプラインを構築できます。個々のデータパイプラインタスクを繋ぎ、ペイロード、リトライ、エラー処理を最小限のコードで構成することで、ワークフローを視覚的に構築できます。

Step Functions は、データパイプライン内のタスクが一時的なエラーで失敗した場合、自動リトライとエラー処理をサポートしていますが、アクセス許可の誤り、無効なデータ、パイプライン実行中のビジネスロジックの失敗など、回復不可能な失敗が発生する可能性があります。これにより、ステップで発生した問題を特定し、問題を修正してワークフローを再起動する必要があります。従来は、失敗したステップを再実行するには、ワークフロー全体を初めからやり直す必要がありました。これにより、特に複雑な長時間実行の ETL パイプラインの場合、ワークフローの完了が遅れていました。パイプラインに最初から実行するための状態遷移回数が増加するため、Map, Parallel ステートを使用する多数のステップがある場合、コストも増加します。

Step Functions では、失敗、中止、タイムアウトしたステートからワークフローを再実行できるようになりました。これにより、ワークフローをより速く、低コストで完了させ、ビジネス価値の提供にさらに時間を費やすことができます。後続の問題が解決した後、失敗したステートに提供された同じ入力を使用して、失敗したワークフロー実行を再実行することで、取り扱われなかった失敗からより早く回復できるようになりました。

この投稿では、Step Functions の Distributed Map ステートを使用して、Amazon Relational Database Service (Amazon RDS) のテーブルからデータをエクスポートする ETL パイプラインジョブをご紹介します。その後、障害をシミュレートし、新しい失敗したステートから再実行する機能を使用して、障害が発生したタスクを障害発生地点から再起動する方法をデモンストレーションします。

ソリューションの概要

データパイプラインに含まれる一般的な機能の 1 つに、複数のデータソースからデータを抽出し、データレイクにエクスポートしたり、別のデータベースにデータを同期したりすることがあります。 Step Functions の Distributed Map ステートを使用することで、このようなエクスポートや同期のジョブを最大 10,000 の並列度で実行できます。Distributed Map は、Amazon Simple Storage Service (Amazon S3) から数何百万のオブジェクト、または単一の S3 オブジェクトから数百万レコードを読み込み、それらのレコードを後続のステップに配信することができます。 Step Functions は、Distributed Map 内のステップを、最大 10,000 の並列度で子ワークフローとして実行します。 10,000 の並列度は、AWS Glue のような他の多くの AWS サービスがサポートする並列度を大きく上回っています。AWS Glue は、ジョブごとに最大 1,000 のジョブ実行という制限 (ソフトリミット) があります。

このサンプルデータパイプラインは、Amazon DynamoDB から商品カタログデータ、Amazon RDS for PostgreSQL データベースから顧客注文データをソースとしています。データはその後前処理され、変換され、さらなる処理のために Amazon S3 にアップロードされます。データパイプラインは、RDS データベースの Data Catalog を作成するために、AWS Glue クローラーで開始されます。AWS Glue クローラーの起動が非同期であるため、パイプラインにはクローラーの完了をチェックする待ちループがあります。AWS Glue クローラーの完了後、パイプラインは DynamoDB テーブルと RDS テーブルからデータを抽出します。これら 2 つのステップは独立しているため、並列ステップとして実行されます。1 つは、AWS Lambda 関数を使用して、DynamoDB から S3 バケットにデータを出力、変換、ロードするものです。もう 1 つは、AWS Glue ジョブ同期インテグレーション を使用した Distributed Map で、RDS テーブルから S3 バケットに同様の処理を行います。AWS Identity and Access Management (IAM) アクセス許可が、Step Functions から AWS Glue ジョブを呼び出すために必要であることに注意してください。詳細は、Step Functions から AWS Glue ジョブを呼び出すための IAM ポリシー を参照してください。

次の図は、Step Functions のワークフローを示しています。

RDS データベースには、顧客と注文データに関連する複数のテーブルがあります。Amazon S3 は、すべてのテーブルのメタデータを .csv ファイルとしてホストしています。このパイプラインは、Step Functions の Distributed Map を使用して、Amazon S3 からテーブルメタデータを読み取り、すべてのアイテムを反復処理し、後続の AWS Glue ジョブを並列で呼び出してデータをエクスポートします。次のコードを参照してください:

"States": {
            "Map": {
              "Type": "Map",
              "ItemProcessor": {
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "STANDARD"
                },
                "StartAt": "Export data for a table",
                "States": {
                  "Export data for a table": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::glue:startJobRun.sync",
                    "Parameters": {
                      "JobName": "ExportTableData",
                      "Arguments": {
                        "--dbtable.$": "$.tables"
                      }
                    },
                    "End": true
                  }
                }
              },
              "Label": "Map",
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                  "InputType": "CSV",
                  "CSVHeaderLocation": "FIRST_ROW"
                },
                "Parameters": {
                  "Bucket": "123456789012-stepfunction-redrive",
                  "Key": "tables.csv"
                }
              },
              "ResultPath": null,
              "End": true
            }
          }

前提条件

ソリューションをデプロイするには、次の前提条件が必要です:

CloudFormation テンプレートの起動

AWS CloudFormation を使用してソリューションリソースをデプロイするには、次のステップを完了してください:

  1. 以下を選択して CloudFormation スタックを起動してください。
  2. スタック名を入力してください。
  3. 機能と変換の下のすべてのチェックボックスを選択してください。
  4. スタックの作成を選択してください。

CloudFormation テンプレートは、次のものを含む多くのリソースを作成します:

  • データパイプラインとなる Step Functions ワークフロー
  • エクスポートされたデータと Amazon RDS のテーブルのメタデータを格納する S3 バケット
  • DynamoDB の製品カタログテーブル
  • 事前にテーブルがロードされた RDS for PostgreSQL データベースインスタンス
  • RDS テーブルをクロールして AWS Glue Data Catalog を作成する AWS Glue クローラー
  • RDS テーブルから S3 バケットにデータをエクスポートするパラメータ化された AWS Glue ジョブ
  • DynamoDB から S3 バケットにデータをエクスポートする Lambda 関数

障害のシミュレート

ソリューションをテストするには、次のステップを完了してください:

  1. Step Functions コンソールで、ナビゲーションペインのステートマシンを選択します。
  2. ETL_Process という名前のワークフローを選択します。
  3. デフォルトの入力でワークフローを実行します。

数秒で、ワークフローは Distributed Map ステートで失敗します。

マップ実行のエラーは、マップ実行と子ワークフローの Step Functions ワークフロー実行イベントにアクセスすることで調査できます。この例では、例外が AWS Glue の Glue.ConcurrentRunsExceededException によるものであることがわかります。このエラーは、AWS Glue ジョブの同時実行要求数が、設定された数を超えていることを示しています。Distributed Map は Amazon S3 からテーブルメタデータを読み取り、.csv ファイルの行数と同じ数の AWS Glue ジョブを起動しますが、AWS Glue ジョブは作成時に同時実行数が 3 に設定されています。これが子ワークフローの失敗を引き起こし、失敗が Distributed Map ステート、次いで Parallel ステートに伝播した結果です。Parallel ステートのもう一方のステップで DynamoDB テーブルを取得する処理は正常に完了しました。Parallel ステートのいずれかのステップが失敗した場合、連鎖的な失敗が見られるように、ステート全体が失敗します。

Distributed Map による障害への対処

デフォルトでは、ステートがエラーを報告すると、Step Functions はワークフローの失敗を引き起こします。 Distributed Map ステートでこの失敗を処理する方法は複数あります:

Retry": [
                      {
                        "ErrorEquals": [
                          "Glue.ConcurrentRunsExceededException "
                        ],
                        "BackoffRate": 20,
                        "IntervalSeconds": 10,
                        "MaxAttempts": 3,
                        "Comment": "Exception",
                        "JitterStrategy": "FULL"
                      }
                    ]
JSON
  • 場合によっては、ビジネスが障害を許容できることがあります。これは、数百万のアイテムを処理していて、データセットにデータ品質の問題があることを予測している場合に特に当てはまります。デフォルトでは、Map ステートのマップ処理が失敗すると、他のすべてのマップ処理が中止されます。Distributed Map を使用すると、障害のしきい値として、失敗したアイテムの最大数または割合を指定できます。障害が許容レベル内であれば、Distributed Map は失敗しません。
  • Distributed Map ステートを使用すると、子ワークフローの並列実行数を制御できます。並列実行数を AWS Glue ジョブの並列実行数にマッピングできます。この並列実行数は、ワークフロー実行レベルでのみ適用されることに注意してください。ワークフローの異なる実行を跨いでは適用されません。
  • エラーの根本原因を修正した後、失敗したステートからステートを再実行できます。

失敗したステートの再試行

このサンプルソリューションの問題の根本的な原因は、AWS Glue ジョブの同時実行性にあります。 失敗したステートを再実行するために、次のステップを完了してください:

  1. AWS Glue コンソールで、ExportsTableData という名前のジョブに移動します。
  2. ジョブの詳細 タブの 詳細プロパティ で、最大同時実行数 を 5 に更新します。

再実行機能のローンチに伴い、過去 14 日間で正常に完了しなかった 標準ワークフロー の実行を再起動できるようになりました。 これには、失敗、中止、タイムアウトした実行が含まれます。 失敗したワークフローは、最後に正常に完了しなかったステートと同じ入力を使用して、失敗したステップからのみ再起動できます。 最初のワークフロー実行とは異なるステートマシン定義を使用して、失敗したワークフローを再起動することはできません。 失敗したステートが正常に再起動されると、Step Functions はすべての後続のタスクを自動的に実行します。Distributed Map の再起動がどのように機能するかの詳細については、マップ実行の再起動 を参照してください。

マップ内のステップが子ワークフローとして実行されるため、ワークフローの IAM 実行ロールには、Distributed Map のステートを再起動するためにマップ実行を再実行する権限が必要です。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "states:RedriveExecution"
      ],
      "Resource": "arn:aws:states:us-east-2:123456789012:execution:myStateMachine/myMapRunLabel:*"
    }
  ]
}
JSON

プログラムで失敗したステップからワークフローを再実行するには、AWS Command Line Interface (AWS CLI) や AWS SDK を使用するか、視覚的な操作体験を提供する Step Functions コンソールを使用できます。

  1. Step Functions コンソールで、再実行したい失敗したワークフローに移動します。
  2. 詳細 タブで、失敗から再実行 を選択します。

パイプラインは、AWS Glue ジョブを実行するのに十分な同時実行数があるため、正常に実行されます。

ワークフローをプログラムで失敗したポイントから再実行するには、新しい Redrive Execution API アクションを呼び出します。同じワークフローが最後の失敗したステートから開始され、最後に成功しなかったステートと同じ入力が使用されます。ワークフロー定義と、再実行対象のステートとステートへの入力は、変更できません。 子ワークフローの異なるタイプについて、次の点に注意してください:

  • Express 子ワークフローの再実行 – Distributed Map 内の Express ワークフローである失敗した子ワークフローの場合、再実行機能により、子ワークフローの先頭からシームレスに再起動できます。 これにより、マップ全体を再起動することなく、個々のマップに固有の問題を解決できます。
  • 標準子ワークフローの再実行 – Distributed Map 内の標準ワークフローである失敗した子ワークフローの場合、再実行機能はスタンドアロンワークフローと同じように機能します。 各マップ内の失敗したステートを、すでに正常に実行された不要なステップをスキップして、失敗したポイントから再起動できます。

Step Functions のステータス変更通知Amazon EventBridge と組み合わせることで、失敗時にメールを送信するなどの失敗通知ができます。

クリーンアップ

リソースをクリーンアップするには、AWS CloudFormation コンソールから CloudFormation スタックを削除してください。

まとめ

この投稿では、Step Functions の再実行機能を使用して、失敗したステップを失敗したポイントから再起動することにより、Distributed Map 内の失敗したステップを再実行する方法を示しました。 Distributed Map ステートを使用すると、サーバーレスアプリケーション内で大規模なパラレルワークロードを調整するワークフローを記述できます。 Step Functions は、最大並列度 10,000 で Distributed Map 内のステップを子ワークフローとして実行します。これは、多くの AWS サービスでサポートされている同時実行数を大きく上回っています。

Step Functionsの Distributed Map の詳細については、Step Functions – Distributed Map を参照してください。ワークフローの再実行の詳細については、Redriving executions を参照してください。


本記事は、Sriharsh Adari、Joe Morotti、Uma Ramadoss による “Build efficient ETL pipelines with AWS Step Functions distributed map and redrive feature” を翻訳したものです。 翻訳はソリューションアーキテクトの高橋が担当しました。