Amazon Web Services ブログ

AWS上でApache Flinkを使用してリアルタイムストリーム処理パイプラインを構築する

今日のビジネス環境では、多様なデータソースが着実に増加していく中で、データが継続的に生成されています。したがって、このデータを継続的にキャプチャ、格納、および処理して、大量の生データストリームを実用的な洞察に素早く繋げることは、組織にとって大きな競争上のメリットになっています。

Apache Flinkは、このようなストリーム処理パイプラインの基礎を形成するのに適したオープンソースプロジェクトです。ストリーミングデータの継続的な分析に合わせたユニークな機能を提供しています。しかし、Flinkを基にしたパイプラインの構築と維持には、物理​​的なリソースと運用上の努力に加え、かなりの専門知識が必要になることがよくあります。

この記事では、Amazon EMRAmazon KinesisAmazon Elasticsearch Serviceを使用してApache Flinkを基にした、一貫性のあるスケーラブルで信頼性の高いストリーム処理パイプラインの参照アーキテクチャの概要を説明します。 AWSLabs GitHubリポジトリは、実際に参照アーキテクチャを深く理解するために必要なアーティファクトを提供します。リソースには、サンプルデータをAmazon Kinesisストリームに取り込むプロデューサアプリケーションと、リアルタイムでデータを分析し、その結果をAmazon ESに可視化するためのFlinkプログラムが含まれています。

地理空間のタクシーデータをリアルタイムで分析する

タクシーの運行の最適化に関するシナリオを考えてみましょう。ニューヨーク市で現在運営されているタクシーの船団から継続的に情報を入手します。このデータを使用して、収集したデータをリアルタイムで分析し、データに基づいた決定を行うことで、オペレーションを最適化する必要があります。

たとえば、現在タクシー需要が高い地域を特定し、空いているタクシーをそこに向けることができます。また、現在の交通状況を追跡して、近くの空港への乗り物など、顧客におおよその旅行時間を伝えることもできます。もちろん、あなたの意思決定は、現在の需要と交通状況を厳密に反映した情報に基づいて決定する必要があります。入ってくるデータは、絶えずタイムリーに分析される必要があります。関連するKPIと派生した洞察は、リアルタイムダッシュボードにアクセス可能でなければなりません。

この記事では、ニューヨーク市で収集された歴史的なタクシートリップのデータセットをAmazon Kinesis Streamsに再生して、一連のトリップイベントをエミュレートしています。このデータセットは、New York City Taxi & Limousine Commissionのウェブサイトから入手できます。それには、地理的位置と個々のタクシートリップから収集された運賃に関する情報が含まれています。

より現実的なシナリオでは、AWS IoTを利用して、タクシーにインストールされているテレメトリユニットからデータを収集し、そのデータをAmazon Kinesisストリームに取り込むことができるでしょう。

信頼性の高いスケーラブルなストリーム処理パイプラインのアーキテクチャ

パイプラインはタクシーの船団を操作し最適化するための中心的なツールとして機能するため、単一ノードの障害に対して耐性があるアーキテクチャを構築することが重要です。パイプラインは着信イベントの変化率に適応する必要があります。したがって、イベントの処理、実際の処理、収集した洞察の視覚化を別々のコンポーネントに分ける必要があります。インフラストラクチャのこれらのコンポーネントを疎結合化し、マネージドサービスを使用することで、障害発生時のパイプラインの堅牢性を高めることができます。インフラストラクチャのさまざまな部分を個別に拡張し、パイプライン全体を構築および操作するために必要な作業を削減することもできます。

タクシーから送信されたイベントの取り込みと蓄積を、望ましい洞察を導くクエリの計算から切り離すことによって、インフラストラクチャの堅牢性を大幅に高めることができます。

イベントは、再生可能な順序付けされたログを保持し、複数のアベイラビリティゾーンにイベントを重複して格納する、Amazon Kinesis Streamsを使用して最初は永続化されます。その後、イベントはストリームから読み込まれ、Apache Flinkによって処理されます。 Flinkは内部状態をスナップショットとして継続的に処理するので、スナップショットから内部状態を復元し、ストリームから再処理する必要があるイベントを再生することで、オペレータまたはノード全体の障害を回復できます。

イベントを格納するためのログ集約のもう1つの利点は、複数のアプリケーションによってデータを消費することです。 Flinkアプリケーションのさまざまなバージョンをベンチマークとテストのために並べて実行することが可能です。または、Amazon Kinesis Firehoseを使用してストリームからAmazon S3へのデータを長期間のアーカイブに保存した後、Amazon Athenaを使用して履歴分析を深掘りすることができます。

Amazon Kinesis Streams、Amazon EMR、およびAmazon ESは、単純なAPIコールを使用して作成および拡張できるマネージドサービスであるため、これらのサービスを使用すると、専門知識をビジネス価値の提供に集中できます。 AWSは、パイプライン全体を構築し、さらに重要なことに、運用と規模の拡大には必要ですが差別化につながらない重労働を担います。パイプラインの作成は、AWS CloudFormationを使用して完全に自動化することができ、個々のコンポーネントはAmazon CloudWatchを使用して監視し、自動的にスケーリングすることができます。障害が検出され、自動的に緩和されます。

この記事の残りの部分では、AWSで参照アーキテクチャを構築して実行することに関連する側面に焦点を当てます。 Flinkの詳細については、APIを使用してFlinkプログラムを実装する方法について説明するFlinkトレーニングセッションを参照してください。セッションで使用されるシナリオは、この記事で説明したシナリオとかなり似ています。

参照アーキテクチャの構築と実行

タクシートリップ分析アプリケーションの実際の動作を確認するには、2つのCloudFormationテンプレートを使用して参照アーキテクチャを構築して実行します。

  • 最初のテンプレートは、ストリームへのタクシートリップを取り込み、Flinkでトリップを分析するためのランタイムアーティファクトを構築します
  • 2番目のテンプレートは、アプリケーションを実行するインフラストラクチャのリソースを作成します

FlinkアプリケーションのソースコードやCloudFormationテンプレートなど、参照アーキテクチャの構築と実行に必要なリソースは、flink-stream-processing-refarch AWSLabs GitHubリポジトリから入手できます。

ランタイムアーティファクトの構築とインフラストラクチャーの作成

最初のCloudFormationテンプレートを実行して、AWS CodePipelineパイプラインを作成します。このパイプラインは、AWS CodeBuildを使用してサーバーレスでアーチファクトを構築します。 Mavenをインストールし、Flink Amazon Kinesisコネクタと他のランタイムアーティファクトを手動で構築することもできます。パイプラインのすべてのステージが正常に完了すると、CloudFormationテンプレートの出力セクションで指定されたS3バケットからアーティファクトを取得できます。

最初のテンプレートが作成され、ランタイムアーティファクトが構築されると、先に説明した参照アーキテクチャのリソースを作成する2番目のCloudFormationテンプレートを実行します。

両方のテンプレートが正常に作成されるまで待ってから、次の手順に進みます。これには最大15分かかります。新鮮なコーヒーを飲みながらCloudFormationがあなたのためにすべての仕事をしてくれるのをしばしお待ちください。

Flinkランタイムの起動とFlinkプログラムの実行

Flinkランタイムを開始し、解析を行っているFlinkプログラムを実行するには、EMRマスターノードに接続します。このコマンドとそれ以降のコマンドのパラメータは、インフラストラクチャのプロビジョニングとランタイムアーティファクトの構築に使用された2つのCloudFormationテンプレートの出力セクションから取得できます。

$ ssh -C -D 8157 «EMR master node IP»

CloudFormationテンプレートによってプロビジョニングされたEMRクラスターには、それぞれ4つのvCPUを持つ2つのc4.xlargeコア・ノードが付属しています。一般に、ノードのコア数は、タスクマネージャごとのスロット数と一致します。この記事では、2つのタスクマネージャーとタスクマネージャーあたり4つのスロットで長時間実行するFlinkクラスターを開始することは妥当です。

$ HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/yarn-session.sh -n 2 -s 4 -tm 4096 -d

Flinkランタイムが起動して実行されると、タクシーストリームプロセッサプログラムをFlinkランタイムに送信し、Amazon Kinesisストリームのトリップイベントのリアルタイム分析を開始することができます。

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .

$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Flinkアプリケーションが実行されているので、ストリームから入って来るイベントを読み込み、イベントの時間に合わせて時間ウィンドウに集約し、その結果をAmazon ESに送信しています。 Flinkアプリケーションは、細々したリクエストでElasticsearchクラスターを過負荷にしないようにバッチ処理を行い、バッチ処理リクエストに署名してElasticsearchクラスターを安全に構成できるようにします。

ブラウザでプロキシをアクティブ化した場合は、SSHセッションによってマスターノードに確立された動的ポート転送を使用して、Flink Webインターフェイスを利用することができます。

Amazon Kinesisストリームへのトリップイベントの取り込み

イベントを取り込むには、タクシーストリームプロデューサーアプリケーションを使用します。これはS3にあるニューヨーク市で記録されたタクシートリップの歴史的なデータセットを、8つのシャードを持つAmazon Kinesisストリームに再生します。タクシートリップに加えて、プロデューサアプリケーションはウォーターマークイベントをストリームに取り込み、Flinkアプリケーションが、プロデューサが履歴データセットを再生した時間を、判断できるようにします。

$ ssh -C «producer instance IP»

$ aws s3 cp s3://«artifact-bucket»/artifacts/kinesis-taxi-stream-producer-1.0.jar .

$ java -jar kinesis-taxi-stream-producer-1.0.jar -speedup 1440 -stream «Kinesis stream name» -region «AWS region»

このアプリケーションは、この記事で説明したリファレンスアーキテクチャに特有のものではありません。たとえば、Apache Flinkの代わりにAmazon Kinesis Analyticsに基づいた同様のストリーム処理アーキテクチャを構築するなど、他の目的にも簡単に再利用できます。

Kibanaダッシュボードの利用

パイプライン全体が実行されているので、最終的に、Flinkアプリケーションによってリアルタイムで得られた洞察を表示するKibanaダッシュボードを利用できます。

https://«Elasticsearch end-point»/_plugin/kibana/app/kibana#/dashboard/Taxi-Trips-Dashboard

この記事の目的のために、Elasticsearchクラスターは、インフラストラクチャを作成するCloudFormationテンプレートのパラメータとして指定されたIPアドレス範囲からの接続を受け入れるように設定されています。プロダクション対応のアプリケーションでは、これが常に望ましいとは限りません。 Elasticsearchクラスターに安全に接続する方法の詳細については、AWSデータベースブログのAmazon Elasticsearch Service用アクセスコントロールの設定を参照してください。

Kibanaダッシュボードでは、左のマップがタクシーの出発点を視覚化します。より赤い矩形であるほど、その場所でより多くのタクシートリップが開始されます。右の折れ線グラフは、それぞれジョンF.ケネディ国際空港とラガーディア空港へのタクシートリップの平均時間を示しています。

この情報が与えられれば、現在需要の高い場所に積極的に空いているタクシーを送り、地方空港への旅行時間をより正確に見積もることで、タクシーの運行の最適化を図ることができます。

基盤となるインフラストラクチャを拡張することができます。たとえば、ストリームのシャード容量をスケーリングしたり、インスタンス数やElasticsearchクラスターのインスタンスタイプを変更したり、スケール変更操作中であってもパイプライン全体が機能し応答可能であることを確認します。

AWS上でApache Flinkを実行する

ここまで見てきたように、FlinkランタイムはYARNを使用してデプロイできるため、EMRはAWS上でFlinkを実行するのに適しています。しかし、Flinkアプリケーションをビルドして実行するには、AWS関連のいくつかの考慮事項があります。

  • Flink Amazon Kinesisコネクタの構築
  • Amazon Kinesisのコンシューマ設定の変更
  • Amazon Kinesisにウォーターマークを送信してイベント時間処理を有効にする
  • FlinkをAmazon ESに接続する

Flink Amazon Kinesisコネクタの構築

Flinkは、Amazon Kinesisストリーム用のコネクタを提供しています。他のFlinkアーティファクトとは対照的に、Amazon KinesisコネクタはMavenセントラルリポジトリからは利用できませんので、自分で構築する必要があります。 Maven 3.3.xは不適切な依存関係を持つ出力を生成する可能性があるので、最近のMaven 3.3.xではなくMaven 3.2.xでFlinkを構築することをお勧めします。

$ wget -qO- https://github.com/apache/flink/archive/release-1.2.0.zip | bsdtar -xf-

$ cd flink-release-1.2.0

$ mvn clean package -Pinclude-kinesis -DskipTests -Dhadoop-two.version=2.7.3 -Daws.sdk.version=1.11.113 -Daws.kinesis-kcl.version=1.7.5 -Daws.kinesis-kpl.version=0.12.3

Flink Amazon Kinesisコネクタを入手したら、それぞれの.jarファイルをローカルMavenリポジトリにインポートできます:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.10-1.2.0.jar
 -DpomFile=flink-connector-kinesis_2.10-1.2.0.pom.xml

Amazon Kinesisのコンシューマ設定の変更

Flinkは最近、EMRクラスターに関連付けられているロールからAWS認証情報を取得するためのサポートを導入しました。 Flinkアプリケーションソースコードでこの機能を有効にするには、AWS_CREDENTIALS_PROVIDERプロパティをAUTOに設定し、PropertiesオブジェクトからAWS_ACCESS_KEY_IDおよびAWS_SECRET_ACCESS_KEYパラメータをすべて省略します。

Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

認証情報はインスタンスのメタデータから自動的に取得され、長期の認証情報をFlinkアプリケーションのソースコードまたはEMRクラスターに保存する必要はありません。

プロデューサアプリケーションが毎秒何千ものイベントをストリームに取り込むので、単一のGetRecords呼び出しでFlinkによってフェッチされたレコードの数を増やすことができます。この値をAmazon Kinesisでサポートされている最大値に変更します。

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");

Amazon Kinesisにウォーターマークを送信してイベント時間処理を有効にする

Flinkは、いくつかの時間概念、特にイベント時間をサポートしています。ストリーミングアプリケーションにとって、クエリの非常に安定したセマンティクスをもたらすので、イベント時間は望ましいものです。イベントの時間は、プロデューサまたはプロデューサの近くで決定されます。ネットワークの影響によるイベントの並べ替えが、クエリ結果に与える影響はとても小さいです。

イベント時間を実現するために、Flinkは定期的な間隔でプロデューサによって送信されたウォーターマークを使用して、ソースの現在時刻をFlinkランタイムに通知します。 Amazon Kinesis Streamsと統合する場合、Flinkにウォーターマークを供給するための2つの異なる方法があります。

  • 手動でウォーターマークをストリームに追加する
  • ApproximalArrivalTimeに頼る。ApproximalArrivalTimeはストリームへの取り込み時にイベントに自動的に追加されます

Amazon Kinesisストリームで時間モデルをイベント時刻に設定するだけで、FlinkはAmazon Kinesisから提供されたApproximalArrivalTime値を自動的に使用します。

StreamExecutionEnvironment env = StreamExecutionEnviron-ment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

または、ストリームの対応するイベントからウォーターマーク情報を抽出するカスタムTimestamp Assigner演算子を指定して、プロデューサによって決定された時間を使用することもできます。

DataStream<Event> kinesis = env
	.addSource(new FlinkKinesisConsumer<>(...))
	.assignTimestampsAndWatermarks(new PunctuatedAssigner())

PunctuatedAssignerに頼る場合は、Flinkがストリームの各シャードを個別に処理するため、個々のシャードにウォーターマークを取り込むことが重要です。これは、ストリームのシャードを列挙することによって実現することができます。ウォーターマークを送信するシャードのハッシュ範囲にハッシュキーを明示的に設定して、特定のシャードにウォーターマークを挿入します。

Amazon Kinesisへのタクシートリップを取り込んでいるプロデューサーは、後者のアプローチを使用しています。AWSLabs GitHubリポジトリのflink-stream-processing-refarchで実装の詳細を調べることができます。

FlinkをAmazon ESに接続する

Flinkは、Elasticsearch用のコネクタをいくつか提供しています。ただし、これらのコネクタはすべてElasticsearchのTCPトランスポートプロトコルをサポートしていますが、Amazon ESはHTTPプロトコルに依存しています。 Elasticsearch 5以降、TCPトランスポートプロトコルは推奨されていません。 HTTPプロトコルをサポートするFlinkのElasticsearchコネクタはまだ動作していますが、Jestライブラリを使用してAmazon ESに接続できるカスタムシンクを構築できます。シンクは、IAM資格情報で要求に署名できる必要があります。

Elasticsearchシンクの実装の詳細については、Flinkアプリケーションのソースコードを含むAWSLabs GitHubリポジトリのflink-taxi-stream-processorを参照してください。

まとめ

この記事では、Apache Flinkを基にした、一貫したスケーラブルで信頼性の高いストリーム処理アーキテクチャを構築する方法について説明しました。マネージドサービスを活用して、低レイテンシおよび高スループットのストリーム処理パイプラインを構築および維持するために通常必要となる専門知識と運用上の労力を削減する方法を示しています。

今日すぐにでもAmazon EMRでApache Flinkを使用し始めることができます。 AWSLabs GitHubリポジトリには、ご紹介したサンプルを実行するために必要なリソースが含まれていますし、すぐに利用を始めるための詳細情報も含まれています。

ご質問やご提案がありましたら、ぜひご意見をお寄せください。

原文:Build a Real-time Stream Processing Pipeline with Apache Flink on AWS(翻訳:半場光晴)