Amazon Web Services ブログ
Apache Airflow、Genie、および Amazon EMR でビッグデータワークフローのオーケストレーションを行う: パート 2
AWS 上でビッグデータの ETL ワークフローを実行している大企業は、多数の内部エンドユーザーにサービスを提供できるようなスケールで運用しており、何千もの同時パイプラインを実行しています。これは、新しいフレームワークと、ビッグデータ処理フレームワークの最新のリリースに後れを取らないためにビッグデータプラットフォームを更新し拡張する継続的なニーズだけでなく、ビッグデータプラットフォームの管理の簡素化、そしてビッグデータアプリケーションへの容易なアクセスの促進の両方を可能にする効率的なアーキテクチャと組織構造が必要となります。
この記事シリーズのパート 1 では、Apache Airflow、Genie、および Amazon EMR を使用してビッグデータワークフローを管理する方法を学びました。
今回の記事では、AWS CloudFormation テンプレートのデプロイメント、Genie の設定、および Apache Airflow で作成されたワークフロー例の実行について説明していきます。
前提条件
このウォークスルーには、以下の前提条件が必要です。
ソリューションの概要
このソリューションは、AWS CloudFormation テンプレートを使用して必要なリソースを作成します。
ユーザーは、踏み台ホストへの SSH トンネルを経由して Apache Airflow Web UI と Genie Web UI にアクセスします。
Apache Airflow デプロイメントは、Celery バックエンドとして Amazon ElastiCache for Redis、DAG を保存するためのマウントポイントとして Amazon EFS、およびデータベースサービスに Amazon RDS for PostgreSQL を使用します。
Genie は、リーダー選出に Apache Zookeeper、設定 (バイナリ、アプリケーションの依存関係、クラスターメタデータ) の保存に Amazon S3 バケット、およびデータベースサービスに Amazon RDS for PostgreSQL を使用します。Genie はジョブを Amazon EMR クラスターに送信します。
この記事のアーキテクチャはデモ用です。本番環境では、Apache Airflow と Genie のインスタンスを Auto Scaling グループの一部にする必要があります。詳細については、Genie Reference Guide にある 「Netflix Deployment」 を参照してください。
以下の図は、このソリューションのアーキテクチャを示しています。
AWS Systems Manager パラメータストアでの管理者パスワードの作成と保存
このソリューションは、設定スクリプトで使用されるパスワードの保存に AWS Systems Manager パラメータストアを使用します。AWS Systems Manager パラメータストアでは、プレーンテキストのパラメータ名と暗号化されたパラメータ値を持つパラメータである secure string パラメータを作成することができます。パラメータストアは、AWS KMS を使用して secure string パラメータのパラメータ値を暗号化し、復号化します。
AWS CloudFormation テンプレートをデプロイする前に、以下の AWS CLI コマンドを実行します。これらのコマンドは、RDS マスターユーザー、Airflow DB 管理者、および Genie DB 管理者のパスワードを保存するための AWS Systems Manager パラメータストアのパラメータを作成します。
ソリューション用の Amazon S3 バケットの作成と S3 へのソリューションアーティファクトのアップロード
このソリューションは、Amazon S3 を使ってソリューションで使用されるすべてのアーティファクトを保存します。AWS CloudFormation テンプレートをデプロイする前に、Amazon S3 バケットを作成し、このリンクからソリューションに必要なアーティファクトをダウンロードします。
ソリューションに必要なアーティファクトを解凍し、先ほど作成した Amazon S3 バケットに airflow
と genie
ディレクトリをアップロードします。Amazon S3 ルートパスは、後ほどパラメータとして AWS CloudFormation テンプレートに追加するため、記録しておくようにしてください。
例えば、以下のスクリーンショットでは、geniestackbucket
というルートロケーションが使用されています。
AWS CloudFormation パラメータである GenieS3BucketLocation
と AirflowBucketLocation
には、作成した Amazon S3 バケットの値を使用します。
AWS CloudFormation スタックのデプロイメント
ソリューション全体を起動するには、[Launch Stack] を選択します。
以下の表では、テンプレートが必要とするパラメータが説明されています。表にないパラメータには、デフォルトの値を受け入れることができます。パラメータの完全なリストについては、AWS CloudFormation テンプレートを参照してください。
パラメータ | 値 | |
設定アーティファクトの場所 | GenieS3BucketLocation |
Genie アーティファクトと Genie のインストールスクリプトがある S3 バケット (例: geniestackbucket )。 |
AirflowBucketLocation |
Airflow アーティファクトがある S3 バケット (例: geniestackbucket )。 |
|
ネットワーキング | SSHLocation |
Genie、Apache Zookeeper、および Apache Airflow EC2 インスタンスへの SSH 接続のための IP アドレス範囲。 |
セキュリティ | BastionKeyName |
踏み台ホストインスタンスへの SSH アクセスを有効化するための既存の EC2 キーペア。 |
AirflowKeyName |
Apache Airflow インスタンスへの SSH アクセスを有効化するための既存の EC2 キーペア。 | |
ZKKeyName |
Apache Zookeeper インスタンスへの SSH アクセスを有効化するための既存の EC2 キーペア。 | |
GenieKeyName |
Netflix インスタンスによる Genie への SSH アクセスを有効化するための既存の EC2 キーペア。 | |
EMRKeyName |
Amazon EMR クラスターへの SSH アクセスを有効化するための既存の Amazon EC2 キーペア。 | |
ロギング | emrLogUri |
Amazon EMR クラスターログを保存するための S3 の場所 (例: s3://replace-with-your-bucket-name/emrlogs/ ) |
デプロイメント後のステップ
Apache Airflow および Genie の Web インターフェイスにアク次のステップを完了してください。
- AWS CloudFormation コンソールで作成したスタックを選択します。
- [出力] を選択します。
- 踏み台ホストインスタンスのパブリック DNS を見つけます。以下のスクリーンショットには、この記事で使用するインスタンスが表示されています。
- ダイナミックポートフォワーディングを使用してマスターノードへの SSH トンネルをセットアップします。
クラスターのマスターパブリック DNS 名とユーザー名hadoop
(ウォークスルーが参照するもの) を使用する代わりに、踏み台ホストインスタンスのパブリック DNS を使用して、ユーザーhadoop
をユーザーec2-user
に置き替えます。
- マスターノードでホストされるウェブサイトを表示するようにプロキシを設定します。
ウォークスルー内のどのステップも変更する必要はありません。
このプロセスは、テキストパターンに基づいて URL を自動的にフィルタリングし、プロキシ設定を Amazon EC2 インスタンスのパブリック DNS 名の形式と一致するドメインに限定することを可能にする SOCKS プロキシ管理ツールを設定します。
Apache Airflow と Genie の Web UI へのアクセス
Apache Airflow と Genie の Web UI にアクセスするには、以下のステップを完了します。
- CloudFormation コンソールで、作成したスタックを選択します。
- [出力] を選択します。
- Apache Airflow と Genie Web UI の URL を見つけます。以下のスクリーンショットには、この記事で使用される URL が表示されています。
- ウェブブラウザでタブを 2 つ開きます。これらのタブは、Apache Airflow UI と Genie UI 用に使用します。
- 前に設定した Foxy Proxy については、ブラウザの右上セクションに追加された Foxy Proxy アイコンをクリックし、[定義済みパターンと優先度に基づいたプロキシを使用] を選択します。以下のスクリーンショットは、そのプロキシオプションを示しています。
- Apache Airflow Web UI と Genie Web UI の URL をそれぞれのタブに入力します。
これで、ワークフローをこのソリューションで実行する準備が整いました。
アプリケーションリソースの準備
プラットフォーム管理者エンジニアとしての最初のステップは、プラットフォームがサポートするビッグデータアプリケーションのバイナリと設定を準備することです。この記事で、Amazon EMR クラスターはリリース 5.26.0 を使用します。Amazon EMR リリース 5.26.0 には Hadoop 2.8.5 および Spark 2.4.3 がインストールされていることから、これらが ビッグデータプラットフォームでサポートしたいアプリケーションです。異なる EMR リリースを使用する場合は、それらのバージョン用のバイナリと設定を準備してください。以下のセクションでは、異なる EMR リリースバージョンを使用したい場合にバイナリを準備するステップを説明しています。
Genie アプリケーションリソースを準備するには、アプリケーションリソースを作成するためのリクエストで Genie に送信されるフィールドを使って YAML ファイルを作成します。
このファイルは、アプリケーションの名前、タイプ、バージョン、タグ、セットアップスクリプトの S3 上の場所、およびアプリケーションバイナリの場所など、アプリケーションに関するメタデータ情報を定義します。詳細については、Genie REST API Guide の「Create an Application」を参照してください。
アプリケーションリソースのためのタグ構造
この記事では、アプリケーションリソースに以下のタグを使用します。
- type – Spark、Hadoop、Hive、Sqoop、または Presto などのアプリケーションタイプ。
- version – Hadoop の 2.8.5 といったアプリケーションのバージョン。
次のセクションでは、アプリケーションリソース用の YAML ファイルでタグがどのように定義されるかを説明します。任意の数のタグを追加して、Genie リソースと関連付けることができます。Genie は、プラットフォーム管理者が定義するものに加えて独自のタグも維持します。これらは、ファイルのフィールド ID とフィールド名で確認できます。
Hadoop 2.8.5 アプリケーションリソースの準備
この記事では、YAML ファイルが自動作成されます。以下のコードは、その結果であるファイルの詳細を示しています。
このファイルは、s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml
でも直接利用できます。
注意: 以下のステップは、提供されている自動オプションを使用せずに、手動で完了する場合に参照するためのものです。
setupFile
によって参照される S3 オブジェクトと、依存関係ラベルはお使いの S3 バケットにあります。参考までに、プロパティ setupFile
と依存関係で使用されるアーティファクトを準備するステップは以下のようになります。
-
https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/
からhadoop-2.8.5.tar.gz
をダウンロードします。 -
hadoop-2.8.5.tar.gz
をs3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/
にアップロードします。
Spark 2.4.3 アプリケーションリソースの準備
この記事では、YAML ファイルが自動作成されます。以下のコードは、その結果であるファイルの詳細を示しています。
このファイルは、s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml
で直接使用できます。
注意: 以下のステップは、提供されている自動オプションを使用せずに、手動で完了する場合に参照するためのものです。
setupFile
のオブジェクトと依存関係は、お使いの S3 バケットにあります。参考までに、プロパティ setupFile
と依存関係で使用されるアーティファクトを準備するステップは以下のようになります。
-
Https://archive.apache.org/dist/spark/spark-2.4.3/
からspark-2.4.3-bin-hadoop2.7.tgz
をダウンロードします。 -
spark-2.4.3-bin-hadoop2.7.tgz
をs3://Your_Bucket_Name/genie/applications/spark-2.4.3/
にアップロードします。
spark-2.4.3-bin-hadoop2.7.tgz
は Hadoop 2.8.3 ではなく Hadoop 2.7 を使用するため、Hadoop 2.7 を実行する EMR クラスター (リリース 5.11.3) から Hadoop 2.7 のための EMRFS ライブラリを抽出する必要があります。これはすでに、お使いの S3 バケットで利用できるようになっています。ちなみに、EMRFS ライブラリを抽出するステップは以下のとおりです。
- リリース 5.11.3 の EMR クラスターをデプロイします。
- 次のコマンドを実行します。
コマンドリソースの準備
プラットフォーム管理者エンジニアとしての次のステップは、プラットフォームがサポートする Genie コマンドの準備です。
この記事では、ワークフローが Apache Spark を使用します。このセクションでは、Apache Spark タイプのコマンドリソースを準備するためのステップを説明します。
Genie コマンドリソースを準備するには、コマンドリソースを作成するためのリクエストで Genie に送信されるフィールドを使って YAML ファイルを作成します。
このファイルは、コマンドの名前、タイプ、バージョン、タグ、セットアップスクリプトの S3 上の場所、およびコマンド実行中に使用するパラメータなど、コマンドに関するメタデータ情報を定義します。詳細については、Genie REST API Guide の「Create a Command」を参照してください。
コマンドリソースのためのタグ構造
この記事では、コマンドリソースに以下のタグ構造を使用します。
- type – spark-submit などのコマンドタイプ。
- version – Spark の 2.4.3 といったコマンドのバージョン。
次のセクションでは、コマンドリソース用の YAML ファイルでタグがどのように定義されるかを説明します。Genie は、プラットフォーム管理者が定義するものに加えて独自のタグも維持します。これらは、ファイルのフィールド ID とフィールド名で確認できます。
spark-submit コマンドリソースの準備
この記事では、YAML ファイルが自動作成されます。以下のコードは、その結果であるファイルの詳細を示しています。
このファイルは、s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml
でも利用できます。
setupFile
にあるオブジェクトは、お使いの S3 バケットにあります。
クラスターリソースの準備
この記事では、クラスターリソースを準備するステップも自動化しました。これは、前に説明したものと似たプロセスに従いますが、クラスターリソースに適用されます。
Amazon EMR クラスターの起動中、カスタムスクリプトがクラスターに関するメタデータ詳細を使って YAML ファイルを作成し、そのファイルを S3 にアップロードします。詳細については、Genie REST API Guide の「Create a Cluster」を参照してください。
このスクリプトは、すべての Amazon EMR ライブラリの抽出と、それらの S3 へのアップロードも実行します。次のセクションでは、クラスターを Genie に登録するプロセスについて説明します。
このスクリプトは、s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh
にあります。
クラスターリソースのためのタグ構造
この記事では、クラスターリソースに以下のタグ構造を使用します。
- cluster.release – Amazon EMR リリース名 (例: emr-5.26.0)。
- cluster.id – Amazon EMR クラスター ID (例:
j-xxxxxxxx
)。 - cluster.name – Amazon EMR クラスター名。
- cluster.role – このクラスターに関連付けられたロール。この記事でのロールは batch です。他の利用可能なロールには、ad hoc または Presto などがあります。
クラスターリソースに新しいタグを追加する、または s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh
を編集して既存のタグの値を変更することができます。
また、アプリケーションライフサイクル環境や必要なカスタム jar を特定するタグなど、タグのその他の組み合わせも使用できます。
Genie は、プラットフォーム管理者が定義するものに加えて独自のタグも維持します。これらは、ファイルのフィールド ID とフィールド名で確認できます。複数のクラスターが同じタグを共有する場合、Genie はデフォルトで、同じタグに関連付けれられているクラスター全体にジョブをランダムに分散します。詳細については、Genie Reference Guide の「Cluster Load Balancing」を参照してください。
Genie へのリソースの登録
これまでのセクションで説明したすべての設定アクティビティはすでに準備されていたものですが、
これからのセクションでは、Genie にリソースを登録する方法をご紹介します。このセクションでは、設定コマンドを実行するために、SSH 経由で踏み台に接続します。
アプリケーションリソースの登録
前のセクションで準備したアプリケーションリソースを登録するには、踏み台ホストに SSH 接続し、以下のコマンドを実行します。
リソース情報を確認するには、Genie Web UI に移動して、[Applications] タブを選択します。以下のスクリーンショットを参照してください。このスクリーンショットには 2 つのアプリケーションリソースが表示されており、ひとつは Apache Spark (バージョン 2.4.3) 用、もうひとつは Apache Hadoop (バージョン 2.8.5) 用です。
コマンドの登録とアプリケーションへのコマンドの関連付け
次のステップは Genie コマンドリソースの特定アプリケーションへの登録です。この記事では、spark-submit
が Apache Hadoop と Apache Spark に依存するため、spark-submit
コマンドを両方のアプリケーションに関連付けます。
genie_register_command_resources_and_associate_applications.py
ファイルでは、アプリケーションに定義する順序が重要です。Apache Spark は Apache Hadoop に依存するため、ファイルは最初に Apache Hadoop を参照し、その後 Apache Spark を参照します。以下のコードを参照してください。
コマンドリソースを登録して、それらを前のステップで登録したアプリケーションリソースに関連付けるには、踏み台ホストに SSH 接続し、以下のコマンドを実行します。
登録したコマンドと、それにリンクされたアプリケーションを確認するには、Genie Web UI に移動して [Commnads] タブを選択します。
以下のスクリーンショットには、コマンドの詳細と、コマンドがリンクされているアプリケーションが表示されています。
Amazon EMR クラスターの登録
前述したとおり、このソリューションでデプロイされた Amazon EMR クラスターは、クラスターが Amazon EMR ステップを通じて起動されるときにクラスターを登録します。Amazon EMR クラスターが使用するスクリプトには、s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh
でアクセスできます。このスクリプトは、クラスターが終了されたときの Genie からのクラスターの登録解除も自動化します。
Genie Web UI で、[Clusters] タブを選択します。このページでは、現在のクラスターリソースが表示されます。登録ステップ中にクラスター S3 の場所にアップロードされた設定ファイルの場所も見つけることができます。
以下のスクリーンショットには、クラスターの詳細と、設定ファイルの場所 (yarn-site.xml、core-site.xml、mapred-site.xml) が表示されています。
クラスターへのコマンドのリンク
これで、すべてのアプリケーション、コマンド、クラスターが登録され、コマンドが依存するアプリケーションにコマンドが関連付けられました。最後のステップは、コマンドを実行するために設定されている特定の Amazon EMR クラスターにコマンドをリンクすることです。
次のステップを完了してください。
- 踏み台ホストに SSH 接続します。
- 任意のテキストエディタで
/tmp/genie_assets/scripts/genie_link_commands_to_clusters.py
を開きます。 - コード内で
# Change cluster_name below
という行を探します。
clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
['spark-2.4.3_spark-submit']}]
- ファイル内の
j-xxxxxxxx
をcluster_name
に置き換えます。
クラスターの名前を確認するには、Genie Web UI に移動して、[Clusters] を選択します。 - コマンドを特定の Amazon EMR クラスターにリンクするには、以下のコマンドを実行します。
これで、コマンドがクラスターにリンクされました。
Genie Web UI で [Commands] タブを選択します。このページには、現在のコマンドリソースが表示されます。[Spark-2.4.3_spark_submit
] を選択して、コマンドに関連付けられたクラスターを確認します。
以下のスクリーンショットには、コマンドの詳細と、コマンドがリンクされているクラスターが表示されています。
すべてのリソースで Genie を設定したところで、ジョブリクエストを受けることが可能になりました。
Apache Airflow ワークフローの実行
ワークフローのコードとデータセットの詳しい説明は、この記事の対象範囲外です。このセクションでは、Apache Airflow がこの記事で提供される GenieOperator を通じて Genie にジョブを送信する方法を詳しく説明します。
Apache Airflow のための GenieOperator
GenieOperator は、データエンジニアがタグの組み合わせを定義して、コマンドと、タスクが実行されるべきクラスターを特定できるようにします。
以下のコード例で、クラスタータグは「emr.cluster.role:batch
」、コマンドタグは「type:spark-submit
」と「version:2.4.3
」です。
プロパティ command_arguments
は spark-submit
コマンドへの引数を定義し、dependencies
は Apache Spark アプリケーション (PySpark) のためのコードの場所を定義します。
GenieOperator のためのコードは次の場所にあります: s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py
。
DAG への引数のひとつは Genie 接続 ID (genie_conn_id
) です。この接続は、Apache Airflow インスタンスの自動セットアップ中に作成されました。この接続と、その他の既存接続を確認するには、以下のステップを完了してください。
- Apache Airflow Web UI で、[Admin] を選択します。
- [Connections] を選択します。
以下のスクリーンショットには、接続の詳細が表示されています。
DAG にある Airflow の variable s3_location_genie_demo
参照は、イントールプロセス中に設定されました。設定されたすべての Apache Airflow 変数を確認するには、以下のステップを完了してください。
- Apache Airflow Web UI で、[Admin] を選択します。
- [Variables] を選択します。
以下のスクリーンショットは、[Variables] ページを示しています。
ワークフローのトリガー
これで、movie_lens_transfomer_to_parquet
DAG の実行をトリガーできるようになりました。次のステップを完了してください。
- Apache Airflow Web UI で、[DAGs] を選択します。
- DAG の横にある [Off] を [On] に変更します。
以下のスクリーンショットは、[DAGs] ページを示しています。
この DAG 例のために、この記事では movielens データセットの小さなサブセットを使用します。このデータセットは人気が高いオープンソースデータセットで、データサイエンスアルゴリズムの探索に使用できます。各データセットファイルは、単一のヘッダ行を持つコンマ区切り値 (CSV) ファイルです。すべてのファイルは s3://Your_Bucket_Name/airflow/demo/input/csv
下にある ソリューション S3 バケットにあります。
movie_lens_transfomer_to_parquet
は、入力ファイルを CSV から Parquet に変換するための Spark ジョブをトリガーするシンプルなワークフローです。
以下のスクリーンショットには、この DAG が図示されています。
この DAG 例では、transform_to_parquet_movies
が完了した後で、4 つのタスクを並行して実行できる可能性がありますが、以下のコード例にあるように、DAG の同時実行性が 3 に設定されているため、同時に実行できるのは 3 つのタスクだけです。
Genie ジョブ UI へのアクセス
Apache Airflow 用の GenieOperator が Genie にジョブを送信しました。ジョブの詳細を確認するには、Genie Web UI で [Jobs] タブを選択します。送信されたジョブ、それらの引数、それらが実行されているクラスター、およびジョブステータスなどの詳細を確認できます。
以下のスクリーンショットは、[Jobs] ページを示しています。
これで、新しい Amazon EMR クラスターをプロビジョニングする、それを Genie タグ「emr.cluster.role
」のための新しい値 (例: 「production
」など) で登録する、クラスターをコマンドリソースにリンクする、そして DAG の一部のタスクによって使用される GenieOperator でタグの組み合わせを更新することで、このアーキテクチャを使って実験できるようになりました。
クリーンアップ
今後料金が発生しないようにするため、この記事で作成されたリソースと S3 バケットを削除してください。
まとめ
この記事では、Genie、Apache Airflow、および Amazon EMR のためのデモ環境をセットアップする AWS CloudFormation テンプレートのデプロイ方法について説明しました。また、Genie を設定して、Apache Airflow 向けの GenieOperator を使用する方法のデモも行いました。
著者について
Francisco Oliveira は、AWS のシニアビッグデータソリューションアーキテクトです。オープンソーステクノロジーと AWS を使用したビッグデータソリューションの構築に力を注いでいます。余暇には、新しいスポーツに挑戦したり、旅行や国立公園に出かけたりします。
Jelez Raditchkov は AWS のプラクティスマネージャです。
Prasad Alle は、AWS プロフェッショナルサービスのシニアビッグデータコンサルタントです。AWS のエンタープライズおよび戦略的顧客のために、スケーラブルで信頼性の高いビッグデータ、機械学習、人工知能、IoT ソリューションをリードし、構築するために尽力しています。Prasad の関心は、高度なエッジコンピューティング、Machine learning at Edge などのさまざまなテクノロジーに及びます。余暇には、家族との時間を楽しんでいます。