Amazon Web Services ブログ

Amazon EMR と Apache Livy を使用して、同時データオーケストレーションパイプラインを構築する

多くのお客様が、Amazon EMR と Apache Spark を使用してスケーラブルなビッグデータパイプラインを構築しています。大規模な生産パイプラインの場合、一般的ユースケースは、さまざまなソースから発生する複雑なデータを読み取ることです。このデータは、機械学習パイプライン、分析ダッシュボード、ビジネスレポートなどの下流アプリケーションに役立つように変換する必要があります。このようなパイプラインでは、Amazon EMR で Spark ジョブを並行して実行することがたびたび必要になります。この記事では、EMR バージョン 5.9.0 以降で利用可能な Apache Livy を使用して、複数の Spark ジョブを EMR クラスター上で平行して送信する方法を中心に取り扱います。

Apache Livy は、REST インターフェイスを通じて、Spark クラスターとのやりとりを容易にするサービスです。Apache Livy を使用すると、大きな jar ファイルを管理、デプロイする代わりに、REST API コールでシンプルな Scala コードまたは Python コードを送信できます。これは、EMR ステップ API を使用してシリアルで実行するのではなく、複数の Spark ジョブをパラレルで実行することでデータパイプラインを容易に拡張できるためです。お客様は、ワークフローの一環として一時的なクラスターを継続して利用し、結果としてコストを削減できます。

このブログ記事の目的に合わせ、今回は Apache Airflow を使用してデータパイプラインをオーケストレーションします。Airflow は ETL タスクの管理に役立つオープンソースのタスクスケジューラです。ワークフローを 1 か所からスケジューリングして管理できるため、Apache Airflow はお客様に人気があります。Airflow の Configuration as Code アプローチを使用すると、ワークフロー、ETL タスク、依存関係の生成を簡単に自動化できます。そのため、お客様はデータパイプラインの構築やデバッグから、ビジネス上の問題に焦点を合わせることができます。

高レベルのアーキテクチャ

次の図は、デプロイされるアーキテクチャの構成を示す詳細なテクニカル図です。

このワークフローの作成に必要な AWS サービスの起動には、AWS CloudFormation スクリプトを使用します。CloudFormation は、クラウド環境に必要なすべてのインフラストラクチャとリソースを、シンプルな JSON または YAML テンプレートに記述およびプロビジョニングできる力強いサービスです。この場合、テンプレートに次のものが含まれます。

Airflow サーバーは LocalExecutor を使用します (タスクはサブプロセスとして実行されます) 。これにより、タスクをローカルで並列化するのに役立ちます。実稼働ワークロードでは、複数のワーカーノードを持つクラスター上で CeleryExecutor を使用してスケールアウトすることを検討する必要があります。

デモンストレーション用に、Movielens データセットを使用して CSV ファイルを Parquet 形式に変換し、Amazon S3 に保存します。このデータセットは一般的なオープンソースのデータセットで、データサイエンスアルゴリズムの探索に使用されます。それぞれのデータセットファイルは、単一のヘッダー行を持つコンマ区切りのファイルです。次のテーブルでは、データセット内のそれぞれのファイルを説明します。

データセット 説明
movies.tsv 視聴されている動画のタイトルと、ジャンルのリストがあります。
ratings.csv 1 ~ 5 のスケールを使用して、ユーザーが動画をどのように評価したかを示します。ファイルには動画視聴のタイムスタンプも含まれています。
tags.csv それぞれの動画のユーザー生成のタグを示します。タグは、動画に関するユーザー生成のメタデータです。タグは、単語または短いフレーズになっています。ファイルにはタグのタイムスタンプも含まれています。
links.csv IMDBMovieDB に使用される動画にリンクする識別子が含まれています。
genome-scores.csv それぞれの動画のタグの関連性を示します。
genome-tags.csv genome-scores.csv ファイルのそれぞれのタグの説明を提供します。

パイプラインの構築

ステップ 0: 前提条件

お客様のマシンに Bash 有効の AWS CLI がインストールされていることを確認してください。

ステップ 1: Amazon EC2 キーペアを作成する

この ELT パイプラインを構築するには、SSH を使用して EC2 インスタンスに接続します。これには、CloudFormation スタックを起動している AWS リージョンの Amazon EC2 キーペアにアクセスする必要があります。お客様のリージョンで既存のキーペアがある場合には、先へ進みこのエクササイズにそのキーペアを使用してください。ない場合には、AWS マネジメントコンソールを開き、EC2 コンソールに移動してキーペアを作成してください。EC2 コンソールの左のナビゲーションペインで [キーペア] を選択します。

[キーペアの作成]を選択し、airflow_key_pair と入力します(表示されているとおりに入力してください) 。次に、[作成] を選択します。airflow_key_pair.pem というファイルがダウンロードされます。このファイルは、安全でプライベートな場所に保管してください。このファイルにアクセスできないと、SSH を使用して EC2 インスタンスに接続することができなくなります。

ステップ 2: CloudFormation スクリプトを実行する

CloudFormation スクリプトを実行する準備ができました!

注意: CloudFormation スクリプトでは DBSecurityGroup を使用しますが、これはすべてのリージョンでサポートされているわけではありません。

次のページでは S3 バケット名と、前のステップで作成したキーペア (airflow_key_pair) を選択します。CloudFormation が新しい S3 バケットを作成するため、このS3 バケットは存在しないはずです。簡素化するために、他のパラメーターのデフォルト値が選択されています。

環境に合わせてこれらのパラメーターを入力したら、[次へ]を選択します。最後に、次のページのすべての設定を確認します。[I acknowledge that AWS CloudFormation might create IAM resources] とマークされたボックスを選択し (スクリプトが IAM リソースを作成するために必要)、[作成] を選択します。これで、このパイプラインに必要なすべてのリソースが作成されますが、実行には少し時間がかかります。スタックの進行状況を確認するには、作成したスタックを選択し、[イベント] セクションまたはパネルを選択してください。

CloudFormation テンプレートの完了には数分かかります。

ステップ 3: Airflow EC2 インスタンスで Airflow スケジューラを開始する

これらの変更を行うには、SSH を使用して CloudFormation スクリプトで作成された EC2 インスタンスに接続します。ローカルマシンに SSH クライアントがあれば、コマンドラインから実行できます。先ほどダウンロードした airflow_key_pair.pem ファイルを含むディレクトリに移動し、次のコマンドを挿入して、your-public-ip および your-region を EC2 インスタンスから取得した関連値に置き換えます。EC2 インスタンスのパブリック DNS 名は、[出力] タブで確認できます

SSH コマンドの後にプロンプトが表示されたら、[はい] と入力します。

chmod 400 airflow_key_pair.pem
ssh -i "airflow_key_pair.pem" ec2-user@ec2-your-public-ip.your-region.compute.amazonaws.com

詳細またはヘルプについては、SSH を使用して Linux インスタンスに接続するを参照してください。

ここで、root ユーザーとしていくつかのコマンドを実行する必要があります。

# sudo as the root user
sudo su
# Navigate to the airflow directory which was created by the cloudformation template – Look at the user-data section.
cd ~/airflow
source ~/.bash_profile

‘/root/airflow/’ ディレクトリは、次の画像のようになります。

ここで、Airflow スケジューラを開始する必要があります。Airflow スケジューラは、すべてのタスクと有向非循環グラフ (DAG) をモニタリングし、依存関係が満たされているタスクのインスタンスをトリガーします。バックグラウンドでは、フォルダに含まれる可能性のあるすべての DAG オブジェクトをモニタリングして同期しています。定期的に (約 1 分ごと) アクティブなタスクを検査し、トリガーできるかを確認します。

Airflow スケジューラは、Airflow 稼働環境で永続的なサービスとして実行されるように設計されています。開始するには、Airflow スケジューラを実行します。airflow.cfg で指定されている設定を使用します。

スケジューラーを開始するには、端末で次のコマンドを実行します。

airflow scheduler

スケジューラが実行されると、画面は次のようになります。

ステップ 4: Airflow ウェブサーバーで transform_movielens DAG を確認する

Airflow ウェブサーバーは、ポート 8080 で実行する必要があります。Airflow を表示するには、ブラウザを開き <EC2-public-dns-name>:8080 と入力します。パブリック EC2 DNS 名は、ステップ3 で確認したものと同じです。

Airflow ダッシュボードで DAG の一覧を確認できるはずです。サンプルの DAG は、実験を行うときのために残してあります。しかしここでは、このブログの目的に沿って transform_movielens DAG に焦点を当てます。DAG の名前の横にある ON ボタンを切り替えます。

次の例は、ダッシュボードがどのようなものかを示します。

[transform_movielens DAG] を選択し、[グラフ表示] を選択して次の画像を表示します。

この画像は、全体的なデータパイプラインを示します。現在の設定では、それぞれの CSV ファイルを MovieLens データセットから Parquet 形式に変換する 6 つの変換タスクがあります。Parquet は、ビッグデータアプリケーションで使用される一般的なカラムナストレージのデータ形式です。ワークフローが完了すると、DAG は EMR クラスターのスピンアップと終了の処理も行います。

DAG コードは、[コード] ボタンを選択しても表示されます。

ステップ 5: Airflow DAG を実行する

DAG を実行するには、Airflow ダッシュボードに戻り、transform_movielens DAGの [DAG トリガー] ボタンを選択します。

Airflow DAG を実行すると、最初のタスクがrun_job_flow boto3 API を呼び出して EMR クラスターを作成します。2 番目のタスクは、EMR クラスターが新しいタスクを受け入れる準備ができるまで待機します。クラスターの準備ができると、ポート 8998 で実行される Apache Livy を使用して、変換タスクが並行して開始されます。現在の Airflow DAG の同時実行性は 3 に設定されており、3 つのタスクが並行して実行されます。EMR クラスターを圧倒せずにエアフロー内でより多くのタスク (複数のスパークセッション) を並行して実行するために、同時実行性を抑えることができます。

Apache Livy は、EMR クラスターで Scala コードをどのように並行して実行するか?

EMR クラスターの準備ができると、変換タスクは Airflow スケジューラによってトリガーされます。それぞれの変換タスクは、Livy をトリガーして新しい対話型 Spark セッションを作成します。それぞれの POST リクエストは、Spark インタプリタ-を使用して新しい Spark コンテキストを呼び出します。このリモート Spark インタプリターは、コードスニペットを受信して実行し、その結果を返します。

変換タスクの 1 つを例として使用し、ステップの詳細を理解しましょう。

# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
    # ti is the Task Instance
    ti = kwargs['ti']
    cluster_id = ti.xcom_pull(task_ids='create_cluster')
    cluster_dns = emr.get_cluster_dns(cluster_id)
    headers = emr.create_spark_session(cluster_dns, 'spark')
    session_url = emr.wait_for_idle_session(cluster_dns, headers)
    statement_response = emr.submit_statement(session_url,   '/root/airflow/dags/transform/movies.scala')
    emr.track_statement_progress(cluster_dns, statement_response.headers)
    emr.kill_spark_session(session_url)

このコードの最初の 3 行は、EMR クラスターの詳細を調べるのに役立ちます。これは、EMR クラスターで Apache Livy を使用して対話型 Spark セッションを作成するために使用されます。

Spark セッションの作成

Apache Livy は、それぞれの変換タスクに対話型スパークセッションを作成します。コードは以下に示します。SparkSession は、基盤となる Spark の機能とやり取りするための単一のエントリポイントを提供します。これにより、DataFrame および Dataset API を使用して Spark をプログラミングすることができます。Spark セッションは POST / セッション API を呼び出すことで作成されます。

注意: ドライバーメモリ、実行メモリ、ドライバーコアおよび実行コアの数のような異なるパラメーターを API コールの一部として変更することもできます。

# Creates an interactive scala spark session.
# Python(kind=pyspark), R(kind=sparkr) and SQL(kind=sql) spark sessions can also be created by changing the value of kind.
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

ステートメントの送信

セッションの起動が完了すると、アイドル状態に移行します。その後、変換タスクがセッションに送信されます。優れた障害耐性と同時実行性を備えるために、Scala コードが EMR クラスターの代わりに REST API コールとして Livy Server に送信されます。

# Submits the scala code as a simple JSON command to the Livy server
def submit_statement(session_url, statement_path):
    statements_url = session_url + '/statements'
    with open(statement_path, 'r') as f:
        code = f.read()
    data = {'code': code}
    response = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
    logging.info(response.json())
    return response

トラックステートメントの進行状況

ステートメントの進行状況も簡単に追跡できます。ログは Airflow ウェブサーバーに集中化されます。

# Function to help track the progress of the scala code submitted to Apache Livy
def track_statement_progress(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)

        #logging the logs
        lines = requests.get(session_url + '/log', headers={'Content-Type': 'application/json'}).json()['log']
        for line in lines:
            logging.info(line)

        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    logging.info('Final Statement Status: ' + final_statement_status)

以下は、Airflow ウェブサーバーの集中ログのスナップショットです。

ジョブが正常に実行されると、Spark セッションは終了し EMR クラスターは完了します。

Amazon Athena のデータを分析する

AWS Glue にクローラーを作成すると、S3 の出力データは Amazon Athena で分析できます。Athena に自動でテーブルを作成する方法の詳細については、Build a Data Lake Foundation with AWS Glue and Amazon S3 を参照してください。

まとめ

この記事では、Apache Livy と Apache Airflow を使用して、 Amazon EMR で Spark のデータパイプラインをオーケストレーションすることについて検討しました。Spark ジョブを同時に実行する方法を示すために、シンプルな Airflow DAG を作成しました。これを変更して ETL データパイプラインのスケールや、レイテンシーの改善ができます。また、最適な EMR リソースを使用して、REST を通じ Spark ジョブを送信する複雑さをカバーするために、Livy がどのように役立つのかを見てきました。

この記事に記載されているコードの詳細については、AWS Concurrent Data Orchestration Pipeline EMR Livy を参照してください。お気軽に質問やフィードバックを残してください。

 


その他の参考資料

この記事がお役に立ちましたら、Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy をぜひ確認してみてください。

 


著者について

Binal Jhaveri はアマゾン ウェブ サービスのビッグデータエンジニアです。クラウドにビッグデータの製品を構築することに、情熱を傾けています。余暇には、旅行や推理小説を楽しみ、大西洋北西の豊かな自然を探索することが大好きです。