Amazon Web Services ブログ

Amazon CloudWatch エージェントで Apache Flink のメトリクスを収集する

Apache Flink は分散ストリーム処理エンジンです。Amazon EMR では Flink を YARN アプリケーションとして実行することができます。Flink のメトリクスはFlink ウェブインターフェースで可視化することができますが、メトリクスの値に応じた対処をするにはどうしたらよいでしょうか?このブログ記事では、CloudWatch エージェントを使用して Flink のメトリクスを Amazon CloudWatch に収集する方法を紹介します。これによってFlink メトリクスのモニタリングやダッシュボードへの追加、アラームのトリガー、さらにはアラームを契機とするイベントドリブンな処理も実行できるようになります。

ソリューションの概要

Figure 1: Solution workflow
図 1: ソリューションのワークフロー

Flink のメトリクスはメトリックレポーターによって外部のメトリックシステムへ送信することができます。また、CloudWatch エージェントはローカルインターフェイスを介してメトリックシステムのメトリクスをカスタムメトリクスとして収集することができます。この記事で紹介するソリューションでは、Flink と CloudWatch エージェントの双方でサポートされているメトリックシステムである StatsD を使用します。

このソリューションでは、 Amazon Simple Storage Service (Amazon S3) バケットに格納された Amazon EMR ブートストラップスクリプトを使用します。このスクリプトは、各ノードで CloudWatch エージェントをダウンロード、インストール、および設定します。エージェントは、クラスター全体での配布を必要とするローカル設定ファイルを使用するのではなく、一元管理された AWS Systems Manager パラメータストアから設定を取り出します。

EMR の最新バージョンでは Flink StatsD メトリックレポーターライブラリをクラスター上でビルドするか、またはダウンロードし、ノード間で共有する必要があります。このソリューションでは、EMR クラスターステップでパブリックリポジトリからライブラリをダウンロードし、共有パスを介して各ノードに配布する方法をとっています。

できあがったEMRクラスターの各ノードには CloudWatch エージェントがインストールされ、Systems Manager パラメータストアで管理されている設定が反映されます。Flink メトリクスは、Flink StatsD メトリックスレポーターを介して各ノードの CloudWatch エージェントの StatsD インターフェイスに送信されます。CloudWatch エージェントは、これらのメトリクスを収集し、CloudWatchのカスタムメトリクス名前空間にパブリッシュします。ユーザーは、リモート接続に Systems Manager セッションマネージャーを使用して、Flink ジョブの開始や停止など、クラスターノード上でコマンドを実行することができます。

図 1 はこのソリューションのワークフローを示しています。ユーザーがAWS Systems Manager で EMR クラスターに接続してジョブを開始すると、EMR クラスターは Flink ジョブを実行します。このジョブは StatsD レポーターを通じて CloudWatch エージェントのローカル StatsD インターフェイスにメトリクスを送信します。CloudWatch エージェントは、Systems Manager パラメータストアから設定を取り出します。CloudWatch エージェントは Amazon CloudWatch にメトリクスをパブリッシュし、ユーザーが参照できるようにします。

構築手順

この手順では、ほとんどの作業をマネジメントコンソールで実行していますが、AWS CLI または AWS CloudFormation テンプレートを使用することもできます。

以下の作業を実施します。

  1. EMRにアタッチするIAMインスタンスプロファイルロールを作成します。
  2. CloudWatch エージェントの設定をSystems Manager パラメータストアに登録します。
  3. ブートストラップスクリプトを作成し、S3 バケットにアップロードします。
  4. 以下を含むEMR クラスターを作成します。
    • Apache Flink アプリケーション
    • StatsD ライブラリで Flink メトリクスをレポートするための設定
    • Flink StatsD メトリックレポーターライブラリをダウンロードしてインストールするステップ
    • Flink セッションを開始するステップ
  5. Systems Manager セッションマネージャを使用して EMR クラスターに接続し、Flink ジョブを開始します。
  6. CloudWatch コンソールで Flink メトリクスをモニタリングします。

作業 1〜4 は、こちらの CloudFormation テンプレートに含まれています。
Launch Stack をクリックして、 CloudFormation スタックを起動し、テンプレートをデプロイします。
Launch Stack をクリックする前に前提条件を満たしていることを確認してください。
launch-stack-button

CloudFormation ウィザードでは、次のパラメータを指定してください。

  • BootstrapScriptPath: CloudWatch エージェントのインストールスクリプトが置かれている S3 パス。デフォルト値は、Amazon がホストするインストールスクリプトのS3パスとなっています。(必須)
  • EC2KeyPairName: セッションマネージャーを使わず、クラスターノードにSSH接続するための EC2 キーペア。(任意)
  • EMRReleaseLabel: 使用する EMR リリースラベル。デフォルト値は emr-5.32.0 です。(必須)
  • InstanceCountCore: コアインスタンスグループ内のインスタンスの数。デフォルト値は 2 です。(必須)
  • Subnet:クラスターを起動するサブネット。(必須)
  • InstanceType: すべてのインスタンスグループのインスタンスのタイプ。デフォルト値は m4.xlarge です。(必須)

この CloudFormation テンプレートを使用する場合は、スタックが正常に作成された後に「作業 5: クラスターに接続して Flink ジョブを実行する」に進んでください。

前提条件

この構築手順は以下が用意されていることを前提としています。

  • AWS アカウント。
  • ブートストラップスクリプトを格納するための S3 バケット。
  • EMR クラスターが起動されるVPCおよび、サブネット。
  • AWS サービスとリソースに対する Amazon EMR のデフォルト IAM サービスロール。AWS CLIで aws emr create-default-roles を実行することで作成するがことできます。
  • オプションの EC2 キーペア。Systems Manager セッションマネージャーではなく SSH でクラスターに接続する場合に必要となります。

作業 1: IAM インスタンスプロファイルロールを作成する

EMRロールについてはデフォルトの EMR サービスロールを使用します。インスタンスプロファイルロールについてはデフォルトでアタッチされているIAMポリシー以外のポリシーをアタッチする必要があるため、カスタムインスタンスプロファイルロールを作成して利用することを推奨します。デフォルトのインスタンスプロファイルロールがアタッチされた既存の EMR クラスターや今後、作成されるEMRクラスターにそれらのポリシーが意図せず適用されることを避けるためです。

詳細については、『IAM ユーザーガイド』の「IAM ロールの作成」および『Amazon EMR 管理ガイド』の「Amazon EMRのサービスロールを設定する」を参照してください。

以下の手順でIAM インスタンスプロファイルロールを作成します。

  1. IAM コンソールのナビゲーションペインで、「ロール」 を選択し、[ロールの作成] を選択します。
  2. 「信頼されたエンティティの種類を選択」で、[AWSサービス] を選択します。[ユースケースの選択]では、[EC2]を選択し、[次へ] を選択します。
  3. AmazonElasticMapReduceforEC2Role を検索してアタッチします。
  4. AmazonSSMManagedInstanceCore を検索してアタッチします。
  5. CloudWatchAgentServerPolicy を検索してアタッチします。
  6. [次のステップ:タグ] を選択し、続けて [次のステップ:確認] を選択します。
  7. [ロール名]を入力し、[ロールの作成] を選択します。

作業 2: Systems Manager パラメータを作成する

ここでは、CloudWatch メトリクスのカスタム名前空間 (FlinkSystem) を設定し、ホストの 8125 番ポートで StatsD インターフェイスを設定します。詳細については、『Amazon CloudWatch ユーザーガイド』の「カスタムメトリクスを発行する」、「StatsD を使用したカスタムメトリクスの取得」、「CloudWatch エージェント設定ファイルを手動で作成または編集する」および『AWS Systems Manager ユーザーガイド』の「Systems Manager パラメータストア」を参照してください。

以下の手順でSystems Manager パラメータを作成します。

  1. Systems Manager コンソールのナビゲーションペインで、[パラメータストア] を選択し、[パラメータの作成] を選択する。
  2. [名前] に”AmazonCloudWatch-config.json”と入力する。
  3. [値] に次のJSONを入力する。
{
    "agent": {
        "logfile": "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log",
        "debug": false,
        "run_as_user": "cwagent"
    },
    "metrics": {
        "namespace": "FlinkSystem",
        "metrics_collected": {
            "statsd": {
                "service_address": ":8125",
                "metrics_collection_interval": 60,
                "metrics_aggregation_interval": 300
            }
        },
        "aggregation_dimensions": [["InstanceId"],[]]
    }
}
  1. [パラメータを作成] を選択する。

作業 3: ブートストラップスクリプトを作成してS3 バケットにアップロードする

詳細については、『Amazon S3 ユーザーガイド』の「オブジェクトのアップロード」および『Amazon CloudWatch ユーザーガイド』の「サーバーで CloudWatch エージェントをインストールして実行する」を参照してください。

以下の手順でブートストラップスクリプトを作成してS3 バケットにアップロードします。

  1. 次の内容で bootstrap_cloudwatch_agent.sh という名前のファイルをローカルに作成する。
#!/bin/bash

echo -e 'Installing CloudWatch Agent... \n'
sudo rpm -Uvh --force https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm

echo -e 'Starting CloudWatch Agent... \n'
sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c ssm:AmazonCloudWatch-Config.json -s
  1. Amazon S3 コンソールで、任意のS3 バケットを選択する。
  2. [オブジェクト] タブで、[アップロード] を選択する。
  3. [ファイルを追加] を選択し、ブートストラップスクリプトを選択する。
  4. [アップロード] を選択し、[閉じる] を選択する。

作業 4: EMR クラスターを作成する

EMR の詳細については、『Amazon EMR 管理ガイド』の「追加のソフトウェアをインストールするためのブートストラップアクションの作成」および「AWS CLI とコンソールを使用したステップの操作」を参照してください。Flink の詳細については、『Amazon EMR リリースガイド』の「Flink クラスターの作成」および「Flink の設定」を参照してください。

次のプロパティをクラスターに設定します。

Classification Property Value Reason
hadoop-env JAVA_HOME /usr/lib/jvm/java-11-amazon-corretto.x86_64 Set the Java version
flink-conf jobmanager.web.address 0.0.0.0 Configure web servers
flink-conf rest.address 0.0.0.0
flink-conf jobmanager.heap.size 1024m Set memory allocation
flink-conf taskmanager.memory.process.size 1728m
flink-conf metrics.reporter.stsd.port 8125 Set up the StatsD reporter
flink-conf metrics.reporter.stsd.host localhost
flink-conf metrics.reporters stsd
flink-conf metrics.reporter.stsd.class org.apache.flink.metrics.statsd.StatsDReporter
flink-conf metrics.scope.jm jobmanager Simplify system scope metric names
flink-conf metrics.scope.jm.job jobmanager.<job_id>
flink-conf metrics.scope.tm taskmanager.<tm_id>
flink-conf metrics.scope.tm.job taskmanager.<tm_id>.<job_id>
flink-conf metrics.scope.task taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
flink-conf metrics.scope.operator taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

デフォルトではメトリック名のプレフィックスに内部ホスト名(例: ip-10-0-0-34-0_ap-northeast-1_compute_internal)が付与されますが、可読性の観点から内部ホスト名を除いたメトリック名にオーバーライドすることを推奨します。

内部ホスト名を除いたメトリック名にオーバーライドしたとしても、CloudWatch メトリックスのディメンションでホストが識別できるため、ホストでフィルタリングすることができます。詳細については、『Amazon CloudWatch ユーザーガイド』の「ディメンション」および Flink ドキュメントの「System metrics」を参照してください。

CloudWatch エージェントをインストールおよび設定するブートストラップアクションと Maven リポジトリからFlink StatsD レポーターをダウンロードするためのクラスターステップを追加する必要があります。

最後に、次のようなコマンドを使用して Flink セッションを開始するクラスターステップが必要になります。
flink-yarn-session -d -n 2

ここでは、yarn-session.sh のラッパーである flink-yarn-session コマンドを使用して、2 つのタスクマネージャとデタッチモードのFlink セッションを開始します。オプションはクラスター構成に応じて適切な値に変更してください。

詳細については、『Amazon EMR リリースガイド』の「Amazon EMR での Flink ジョブの操作」を参照してください。

以下の手順でクラスターを作成します。

  1. Amazon EMR コンソールで、[クラスターを作成] を選択し、[詳細オプションに移動する] を選択する。
  2. [ソフトウェア設定] のリリースで “emr-5.32.0” を選択します。
  3. Flink 1.11.2 アプリケーションをインストールします。

以下の手順でクラスターのプロパティを設定します。

  1. [ソフトウェア設定の編集] で、[設定の入力] を選択し、次のJSONを入力する。
[
  {
    "classification": "hadoop-env",
    "properties": {},
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-11-amazon-corretto.x86_64"
        },
        "configurations": []
      }
    ]
  },
  {
    "classification": "flink-conf",
    "properties": {
      "jobmanager.web.address": "0.0.0.0",
      "rest.address": "0.0.0.0",
      "jobmanager.heap.size": "1024m",
      "taskmanager.memory.process.size": "1728m",
      "metrics.reporter.stsd.port": "8125",
      "metrics.reporter.stsd.host": "localhost",
      "metrics.reporters": "stsd",
      "metrics.reporter.stsd.class": "org.apache.flink.metrics.statsd.StatsDReporter",
      "metrics.scope.jm": "jobmanager",
      "metrics.scope.jm.job": "jobmanager.",
      "metrics.scope.tm": "taskmanager.",
      "metrics.scope.tm.job": "taskmanager..",
      "metrics.scope.task": "taskmanager....",
      "metrics.scope.operator": "taskmanager...."
    },
    "configurations": []
  }
]

以下の手順でメトリクスレポーターライブラリをインストールするためのステップを追加します。

  1. [ステップの追加(オプション)] で、[ステップタイプ] の値を “カスタムJAR” に変更し、[ステップの追加] を選択する。
  2. [名前] にAddLibrariesと入力します。
  3. [JAR の場所] に command-runner.jar と入力します。
  4. [引数] には、以下を入力します。
  1. [追加] を選択します。

以下の手順でFlinkセッションを開始するためのステップを追加します。

  1. [ステップの追加 (オプション)] で、[ステップの追加] を選択する。
  2. [名前] に“FlinkStart”と入力します。
  3. [JAR の場所] に “command-runner.jar” と入力します。
  4. [引数] には、以下を入力します。
sudo
bash
-c
"flink-yarn-session -d -n 2"
  1. [追加] を選択します。

以下の手順でハードウェア構成を設定します。

  1. クラスターを起動するネットワークと EC2 サブネットを指定する。
  2. 各Node typeのインスタンスタイプとインスタンス数を指定する。
  3. [次へ] を選択する。

以下の手順でクラスター全般設定を設定します。

  1. [クラスター名] にDemo Flink Metricsと入力します。
  2. [タグ] の[キー]に DemoFlinkMetrics と入力します。
  3. [ブートストラップアクションを追加する]で、”カスタムアクション”を選択し、[設定と追加] を選択します。
  4. [スクリプトの場所] に、アップロードしたブートストラップスクリプトの S3 パスを入力します。
  5. [次へ] を選択します。

以下の手順でセキュリティオプションを設定してクラスターを起動します。

  1. Systems Manager セッションマネージャーではなく SSH でクラスターに接続する場合は、EC2 キーペアを指定します。
  2. [アクセス権限] で、”カスタム” を選択します。
  3. EC2 インスタンスプロファイルに、作業1で作成したカスタム IAM ロールを使用します。
  4. [クラスターを作成] を選択します。

作業 5: クラスターに接続して Flink ジョブを実行する

クラスターに接続してコマンドを実行するための方法はいくつかあります。クラスターに EC2 キーペアを設定している場合は、SSH で接続できます。また、クラスターの IAM インスタンスプロファイルロールに AmazonSSMManagedInstanceCore ポリシーがアタッチされている場合は、AWS CLI セッションマネージャープラグイン、またはSystems Manager コンソールを使用して接続できます。詳細については、『AWS Systems Manager ユーザーガイド』の「AWS Systems Manager セッションマネージャー」を参照してください。

以下の手順でマスターノードのインスタンス ID を特定します。

Amazon EMR コンソールのナビゲーションペインで、[クラスター] を選択します。Demo Flink Metrics を選択します。[ハードウェア] タブで、[ノードタイプ & 名前] が [MASTER Management Group] であるインスタンスグループの ID を選択します。
EC2 インスタンス ID(i-00123456789abcdef など)をメモします。

以下の手順でクラスターに接続して Flink ジョブを実行します。

  1. Systems Manager コンソールのナビゲーションペインで、[セッションマネージャー] を選択します。
  2. [セッション] タブで、[セッションの開始] を選択します。
  3. インスタンス ID がマスターノードの EC2 インスタンス ID と一致するインスタンスを選択します。
  4. [セッションを開始する] を選択します。
  5. セッションマネージャーのリモートシェルで、次のコマンドを実行します。
sudo flink run --jobmanager yarn-cluster --detached /usr/lib/flink/examples/streaming/StateMachineExample.jar

devops-776-fig2
図 2: Flink サンプルジョブ”StateMachineExample”開始時のリモートシェル出力

作業 6: CloudWatch でメトリクスを監視する

以下の手順でCloudWatch でメトリクスをモニタリングします。

  1. CloudWatch コンソールのナビゲーションペインで、[メトリクス] > [All metrics]を選択します。
  2. [FlinkSystem] カスタム名前空間を選択します。
  3. [host, metric_type] を選択します。
  4. 必要に応じて、メトリクス名の一部であるジョブ ID で検索します。Flink ジョブが開始されてからメトリクスの表示が開始されるまで数分かかる場合があります。
  5. メトリクスを選択してグラフ化します。

次の例は、Flink の稼働時間と停止時間の CloudWatch グラフを示しています。

devops-776-fig3-1024x543
図 3: ジョブマネージャの稼働時間と停止時間の CloudWatch グラフ

次の例は、Flink チェックポイント所要時間の CloudWatch グラフを示しています。

devops-776-fig4-1024x539
図 4: Flink チェックポイント所要時間の CloudWatch グラフ

後片付け

検証が完了したらそれ以上の課金を回避するために、この構築手順で作成したリソースを削除します。EMR クラスターはクラスターがアクティブである限り課金が発生するため、検証完了後にクラスターを終了します。

CloudFormation のサンプルテンプレートを起動した場合

  1. CloudFormation コンソールのナビゲーションペインで、[スタック] を選択します。
  2. 起動したスタック (EMR-CloudWatch-Statsd-Demo) を選択し、[削除] を選択します。

リソースを手動で作成した場合

  1. Amazon EMR コンソールのナビゲーションペインで、[クラスター] を選択します。作成したクラスターを選択し、[削除] を選択します。
  2. Systems Manager コンソールのナビゲーションペインで、[パラメータストア] を選択します。AmazonCloudWatch-config.json を選択し、[削除] を選択します。
  3. IAM コンソールのナビゲーションペインで、[ロール] を選択します。作業1で作成したロールを選択し、[ロールの削除] を選択します。

まとめ

クラスターの各ノード上で稼働するCloudWatch エージェントが StatsD のローカルインターフェイスを介して CloudWatch にメトリクスをパブリッシュするように設定しました。また、Flink アプリケーションが、このローカルインターフェイスにメトリクスを送信するように設定しました。その結果、CloudWatch で Flink メトリクスを参照できるようになりました。

IAM インスタンスプロファイルロール、Systems Manager パラメータ、および EMR クラスターを作成するこのテンプレートのように CloudFormation テンプレートで、このソリューションをパッケージ化してデプロイすることができます。

CloudWatch でのメトリクスの表示がうまくいかない場合は、Flink アプリケーションログ(yarn logs -applicationId コマンドで集計されます。)と CloudWatch エージェントのログ(各ノードの/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.logにあります。)を参照してトラブルシューティングしてください。

このソリューションをさらに有効活用するには、CloudWatch アラームでこれらのメトリクスに閾値を設定してアラートしたり、異常検出の一部の条件としてこれらのメトリクス使用することを検討してみてください。アラームを他のアラームとともに複合アラームを構成したり、Amazon SNS 通知の送信などのアラームアクションを設定して、AWS Lambda 関数などのイベント駆動型プロセスをトリガーすることもできます。

これらのメトリクスとアラームの使用方法が理解できたら、CloudWatch エージェント設定ファイルを手動で作成または編集したり、Flink メトリクスのドキュメントを参照して必要なメトリクスのみを収集するよう調整してみてください。また、本番ワークロードとしてEMRクラスターを稼働させる際にはAmazon EMR を保護するベストプラクティスを参照して、クラスターを適切なセキュリティ設定で保護してください。

著者について

Josh Haycraft

Josh Haycraft はアマゾンのシニアソフトウェアエンジニアで、ディベロッパーが安全にサービスを提供するためのセキュリティやガバナンスに関するツールを開発しています。プライベートでは、ボードゲーム、スーパーヒーローフィクション、国立公園の散策を楽しんでいます。

翻訳はソリューションアーキテクトの 小森谷が担当しました。原文はこちら