Amazon Web Services ブログ
AWS Step FunctionsとAWS Lambdaを使って複数のETLジョブの統合を行う
抽出、変換、ロード(Extract, Transform, Load, ETL)操作は、現在のエンタープライズデータレイクのバックボーンにひとまとまりとして形成されています。rawデータを役に立つデータセットへ変換し、最終的には、洞察可能な状態に変換します。ETLジョブは通常1つまたは1つ以上のデータソースからデートを読み、様々な種類の変換を適用し、結果を利用準備できているターゲットに書き込みます。ETLジョブのソースとターゲットはリレーショナルデータベースであるAmazon RDS(Amazon Relational Database) もしくはオンプレミス、データウェアハウスとしてAmazon Redshift 、オブジェクトストレージとしてAmazon Simple Storage Service(Amazon S3) のバケットなどがあります。Amazon S3は、AWSでデータレイクを構築するという状況において特に一般的です。
AWSは、ETLジョブの作成とデプロイを支援するAWS Glueを提供しています。AWS Glueは抽出・変換・ロードを行うフルマネージドなサービスであり、お客様が簡単に自分のデータとして準備、ロードできるものとなります。他のAWSサービスでもETLジョブを実装、デプロイすることも可能です。 AWS Database Migration Service(AWS DMS)、Amazon EMR(ステップAPIの利用)、さらにAmazon Athenaも含まれます。
ETLジョブワークフロー統合へのチャレンジ
多様なETLテクノロジーを含むETLワークフローをどのように統合できるでしょうか? AWS Glue、AWS DMS、Amazon EMRなどのサービスは、Amazon CloudWatch Eventsをサポートしており、ETLジョブを連動させることができます。 Amazon S3は、中心に置かれたデータレークストアでもあり、CloudWatch Eventsをサポートしています。しかし、CloudWatchイベントのみに依存するということは、ETLワークフローの視覚的表現が1つもないことを意味します。また、全体的なETLワークフローの実行ステータスを追跡し、エラー・シナリオを処理することは困難になります。
本ブログでは、AWS Step FunctionsとAWS Lambdaを使用して、任意の複雑なETLワークフローでさまざまなテクノロジを含む複数のETLジョブを編成する方法を説明します。
AWS Step Functionsは、ビジュアルワークフローを使用して、分散アプリケーションおよびマイクロサービスのコンポーネントを調整することを可能にするサービスです。個々のコンポーネントからアプリケーションを作成します。各コンポーネントは、個別の機能またはタスクを実行し、アプリケーションの拡張と変更を素早く行うことができます。
ETLワークフローの例を見てみましょう
ETLワークフローのサンプルデータセット
複数のデータセットに基づいて質問に回答する必要があるビジネスユーザーがいるとします。おそらくユーザーは、一方ではオンラインのユーザーエンゲージメトリックと、他方では発生した予測されたセールス収入とチャンスとの間の相関を調べることを望んでいます。ユーザーエンゲージメントの指標には、ウェブサイト訪問、モバイルユーザー、およびデスクトップユーザーが含まれます。
ETLワークフローの手順は以下となります。
セールスデータセットの処理(Process the Salses Dataset:PSD) セールスデータを読み込みます。予測月間収入フィールドを集計して、1日ごとにレコードをグループ化します。空白をアンダースコアに置き換えるには、フィールドの名前を変更します。圧縮されたParquetの形式でAmazon S3に中間結果を出力します。以前の出力を上書きします。
マーケティングデータセットの処理(Process the Marketing dataset:PMD) マーケティングデータセットを読み込みます。空白をアンダースコアに置き換えるには、フィールドの名前を変更します。中間結果をAmazon S3に圧縮したParquetの形式で送信します。以前の出力を上書きします。
マーケティングとセールスデータセットの結合(Join Marketing and Sales datasets :JMSD) 処理されたSalesおよびMarketingデータセットの出力を読み取ります。日付フィールドで両方のデータセットの内部結合を実行します。日付順に昇順に並べ替えます。最終結合データセットをAmazon S3に送信し、以前の出力を上書きします。
これまでの手順は、ETLワークフローとしてAWS Glueで実装でき、ETLジョブはジョブ・トリガーを使用してチェーンされています。しかし、エンドツーエンドのデータ処理ワークフローの一部であるAWS Glue以外の要件は、次のような場合があります。
- セールスとマーケティングの両方のデータセットは、最大で1週間の間隔でランダムにS3バケットにアップロードされます。 PSDジョブは、Salesデータセットファイルがアップロードされるとすぐに開始する必要があります。 マーケティングデータセットファイルがアップロードされるとすぐにPMDジョブが開始されます。 パラレルETLジョブはいつでも開始および終了できますが、すべてのパラレルETLジョブが完了した後でのみ、最後のJMSDジョブを開始できます。
- PSDおよびPMDジョブに加えて、オーケストレーションは、JMSDジョブによって集約された最終データセットに貢献する、より多くの並列ETLジョブを将来サポートする必要があります。 追加のETLジョブは、AWSデータベース移行サービス、Amazon EMR、Amazon AthenaなどのAWS以外のサービスなど、AWSサービスによって管理されます。
データエンジニアは、これらの要件を満たすために次のETLワークフローチャートを作成します。
要件を満たすためには、汎用のETLオーケストレーションソリューションが必要です。 サーバーレスソリューションはさらに優れています。
ETLオーケストレーションのアーキテクチャとイベント
AWS Step FunctionsとAWS Lambdaを使用して、ETLワークフローの調整を実施し、要件を満たす方法を見てみましょう。次の図は、ETLオーケストレーションのアーキテクチャとイベントの流れを示しています。
主なイベントの流れは、AWS Step Functionsステートマシンから始まります。このステートマシンは、統合されたETLワークフローのステップを定義します。 AWSコマンドラインインターフェイス(AWS CLI)を使用して、またはAWS Lambdaまたはその他の実行環境でさまざまなAWS SDKを使用して、スケジュールに基づいてAmazon CloudWatchを通じてステートマシンを起動できます。
ステートマシンの実行が進むにつれて、ETLジョブが呼び出されます。図に示すように、呼び出しはアカウントで作成および設定する中間のAWS Lambdaを介して間接的に行われます。このタイプの関数をETLランナーと呼びます。
図のアーキテクチャはAmazon Athena、Amazon EMR、AWS Glueを示していますが、付随するコードサンプル(aws-etl-orchestrator)には図のAWS Glue Runner Functionというラベルの付いた単一のETLランナーが含まれています。このETLランナーを使用して、AWS Glueジョブをオーケストレーションすることができます。また、パターンに従ってETLランナーを実装して、他のAWSサービスまたは非AWSツールを統合することもできます。
ETLランナーは AWS Step Functionsのアクティビティタスクにより呼び出しが行われます。AWSステップ関数のアクティビティータスクが機能する方法のため、ETLランナーは、タスクのためにAWS Step Functionsステートマシンを定期的にポーリングする必要があります。 状態機械は、タスクオブジェクトを提供することによって応答する。 タスクオブジェクトには、ETLランナーがETLジョブを実行できるようにする入力が含まれています。
ETLランナーがタスクを受け取るとすぐに、ETLランナーはそれぞれのETLジョブを開始します。 ETLランナーは、Amazon DynamoDBテーブル内のアクティブなジョブの状態を維持します。 定期的に、ETLランナーはアクティブなジョブの状態をチェックします。 アクティブなETLジョブが完了すると、ETLランナーはAWS Step Functionのステートマシンに通知します。 これにより、AWS Step FunctionsのETLワークフローを次のステップに進めることができます。
重要な質問が出るかもしれません。なぜETLランナーはStep Functionsのステートマシンから独立して実行され、タスクをポーリングするのですか? 代わりにStep FunctionsステートマシンからAWS Lambdaを直接呼び出すことはできませんか? その後、その機能を開始し、完了するまでETLジョブを監視することはできませんか?
その答えは、AWS Lambdaは、1リクエストあたり最大実行時間が300秒、つまり5分であることです。 詳細については、「AWS Lambda Limits」を参照してください。 通常、ETLジョブは完了するまでに5分以上かかります。 ETLランナー関数が直接呼び出されると、ETLジョブが完了する前にタイムアウトになる可能性があります。 したがって、長時間可動するworkerのアプローチには、アクティビティータスクが必要です。 このコードサンプルのワーカー(ETLランナー)は、CloudWatchイベントを使用してスケジュールでトリガーされるAWS Lambdaです。 CloudWatchイベントを通じてポーリングスケジュールを管理したくない場合は、ETLワークフローのステートマシンにポーリングループを実装できます。 AWS Big Dataのブログ記事をチェックしてください。例として、AWS Step FunctionsとApache Livyを使用してApache Sparkアプリケーションをオーケストレーションがあります。
最後に、セールスとマーケティングのデータセットがランダムにS3バケットに到着するのを待つという要件をどのように満たしているかについて説明します。 これらの待機は、販売データの待機とマーケティングデータの待機の2つの個別のアクティビティタスクとして実装されます。 ステートマシンは、これらのアクティビティタスクのいずれかが発生すると、実行を停止します。 CloudWatch Eventsイベントハンドラは、Amazon S3バケット上に設定されているため、SalesまたはMarketingデータセットファイルがバケットにアップロードされると、Amazon S3はAWS Lambda関数を呼び出します。 Lambda関数は、アップロードされたデータセットに対応するアクティビティタスクを終了するように待機しているステートマシンに通知する。 後続のETLジョブは、ステートマシンによって呼び出されます。
ETLオーケストレーションの設定
aws-etl-orchestratorのGitHubリポジトリは、AWSアカウントでETLオーケストレーションアーキテクチャを設定するためにカスタマイズ可能なソースコードを提供します。 次のステップでは、この記事に示すアーキテクチャを使用してETLジョブの編成を開始するために必要な作業を示します。
- AWS Step FunctionsでETLオーケストレーションワークフローをモデル化する
- ETLランナーを構築する(または既存のAWS Glue ETLランナーを使用する)
- AWS CloudFormationテンプレートをカスタマイズしてスタックを作成する
- ETLオーケストレーション ステートマシンを呼び出す
- サンプルのSalesデータとMarketingデータセットをAmazon S3にアップロードする
AWS Step FunctionsのETLオーケストレーションワークフローをモデル化します。 このブログで説明しているETLワークフローをステートマシンとしてモデル化するには、AWS Step Functionsを使用します。 Step Functionsのステートマシンは、一連の状態と、これらの状態間の遷移とで構成されています。 ステートマシンはAmazon States Languageで定義されています。これはJSONベースの表記法です。 ステートマシン定義の例については、「サンプルプロジェクト」を参照してください。
AWS Step Functionsコンソールの次のスナップショットは、ステートマシンとしてモデル化されたサンプルのETLワークフローを示しています。 このワークフローは、コードサンプルで提供しています。
このステートマシンの実行を開始すると、2つのETLジョブを並列に実行して、セールスデータ(PSD)およびプロセスマーケティングデータ(PMD)を実行します。 しかし、要件に応じて、両方のETLジョブは、それぞれのデータセットがAmazon S3にアップロードされるまで開始してはなりません。 したがって、PSDとPMDの両方の前にWaitアクティビティタスクを実装します。 データセットファイルがAmazon S3にアップロードされると、状態マシンに待機状態を終了するように通知するAWS Lambda関数がトリガされます。 PMDジョブとPSDジョブの両方が成功すると、JMSDジョブが実行されて最終的なデータセットが生成されます。
最後に、このETLワークフローを1週間に1回実行するには、CloudWatchイベントを使用して1週間に1回開始するようにステートマシンの実行を設定する必要があります。
ETLランナーを構築する(または既存のAWS Glue ETLランナーを使用する)。 コードサンプルには、AWS Glue ETL ランナーが含まれています。 簡単にするため、AWS Glueジョブのみを使用してETLワークフローを実装しました。 ただし、異なるETLテクノロジを使用してPMDまたはPSDジョブを実装することはできません。 AWS Glue ETL ランナーの技術に従ったETLランナーを構築する必要があります。
AWS CloudFormationテンプレートをカスタマイズしてスタックを作成します。 aws-etl-orchestratorリポジトリに公開されているサンプルには、3つのAWS CloudFormationテンプレートが含まれています。 AWS CloudFormationのベストプラクティスに従って、3つのテンプレートにリソースを編成しました。 3つのリソースグループは論理的に異なり、別々のライフサイクルと所有権を持つ可能性があります。 各テンプレートには、関連するAWS CloudFormationパラメータファイル(「* -params.json」ファイル)があります。 これらのファイルのパラメータはカスタマイズする必要があります。 3つのAWS CloudFormationテンプレートの詳細は次のとおりです。
- AWS Glueリソースの設定を担当するテンプレート。サンプルETLワークフローのテンプレートは、PSD、PMD、JMSDの3つのAWS Glueジョブを作成します。 これらのジョブのスクリプトは、自分が所有するAmazon S3バケットからAWS CloudFormationによって取得されます。
- AWS Step Functionsがステートマシンを定義するテンプレートが定義されています。Amazon States Languageのステートマシン定義は、ステップ関数テンプレート内のステートマシンリソースに埋め込まれています。
- ETLランナーがAWS Glue用に必要とするリソースを設定するテンプレート。AWS Glue ETLランナーは、AWS Lambdaとして実行するように作成されたPythonスクリプトです。
ETLオーケストレーション・ステート・マシンを起動します。
最後に、AWS Step Functionsで新しいステートマシンの実行を開始します。 ETLの例では、AWS CloudFormationテンプレートはMarketingAndSalesETLOrchestratorという名前のステートマシンを作成します。 AWSステップ関数コンソールまたはAWS CLIコマンドを使用して実行を開始できます。 実行を開始すると、ステートマシンはすぐにデータ待機状態になり、データセットがAmazon S3にアップロードされるのを待ちます。
サンプルのSalesデータとMarketingデータセットをAmazon S3にアップロードする
コードサンプル設定で指定したS3バケットに提供されたアップロードデータセット。 このアップロードされたデータセットは、ステートマシンに信号を送信して実行を継続します。
ステートマシンは実行を完了するまでに時間がかかることがあります。 AWS Step Functionsコンソールで進捗状況を監視できます。 実行が成功すると、次の図に示す出力が表示されます。
おめでとうございます! あなたは正常に完了に例のETLワークフローを構築しました。
失敗したETLジョブの処理
ETLワークフローのジョブが失敗した場合はどうなりますか? このような場合、ETLワークフロー開発者は、管理者に通知するだけでなく、以前のジョブの影響を完全に元に戻すジョブを補うことで、エラー処理戦略を利用できます。 失敗したETLジョブの検出と応答は、AWS Step Functionsのキャッチ・メカニズムを使用して実装できます。 詳細については、「状態マシンを使用したエラー状態の処理」を参照してください。 サンプルステートマシンでは、エラーは何もしないPassによって処理されます。
やってみましょう。 AWS GlueコンソールまたはAWS CLIを使用して実行中のサンプルETLワークフローのジョブを停止します。 ステートマシンがETLジョブのフォールバック失敗状態に移行していることがわかります。
総論
この記事では、ETLロジックをオーケストレーションされたワークフローとして実装する方法を説明しました。 私は、ETLオーケストレーションのためのサーバーレスソリューションを紹介しました。これにより、AWS Step FunctionsとAWS Lambdaを使用してETLジョブの実行を制御できます。 この記事で説明した概念とコードを使用して、任意の複雑なETL状態マシンを構築することができます。
詳細とソースコードのダウンロードについては、aws-etl-orchestrator GitHubリポジトリを参照してください。 この投稿に関するご質問は、下記のコメント欄にお送りください。
追加で推薦するブログ
このブログが役に立つと思った方には次のブログも推奨します。「Amazon S3とAmazon Glueで構築するデータレイク」、「AWS Step FunctionsとApache Livyを使用してApache Sparkアプリケーションをオーケストレーションする」
著者:Moataz Anany
元のブログはこちら。(翻訳はSA小梁川が担当しました)