Amazon Web Services ブログ
Amazon EMR および IoT センサーネットワークで Apache Flink の複合イベント処理を使用したリアルタイムの山火事警告
山火事は、1 年のうち、気候が暑く乾燥している温暖な月に頻繁に発生します。オーストラリアおよび米国といった国々は、人々の生活と土地に甚大な傷跡を残す山火事によって大きな影響を受けています。長年の間、山火事の予測は様々な研究プロジェクトの研究テーマとされてきました。これらのプロジェクト多くが、複雑な機械学習アルゴリズムを使用します。これらのアルゴリズムは、特定の地理的地域におけるリアルタイムの火の延焼から山火事を予測することを学びます。
このブログ記事では、IoT からリアルタイムで受信する温度イベントを通じて潜在的な山火事のパターンを検知し、E メールでアラートを送信するために、Apache Flink の複合イベント処理 (CEP) によって提供されるイベント処理パラダイムを使用します。監視対象地域のリアルタイムでのヒートマップ可視化も、モニタリング目的のために統合されています。
この記事では、以下の AWS のサービスを使用します。
- Amazon EC2 インスタンス: IoT シミュレーター
- AWS IoT Core: IoT メッセージゲートウェイ
- Amazon Kinesis Data Streams: 耐久性のあるメッセージキュー
- Apache Flink をインストールした Amazon EMR: ストリーミングデータ処理エンジン
- Amazon SNS: アラート生成
- Amazon Elasticsearch Service: アラートストレージおよび可視化プラットフォーム
- AWS CloudFormation: 開始から終了までのスタックの作成とデプロイメント
山火事のリアルタイム予測アラートシステムの概要
山火事の検知と警告のための大規模なワイヤレスセンサーネットワークの開発とデプロイメントは複雑なタスクです。この記事のシナリオでは、センサーが長寿命バッテリー駆動であること、および LoRaWAN などのテクノロジーを使用するマルチホップワイヤレスメッシュネットワーク経由でデプロイされることを前提としています。また、IoT センサーが監視対象地域内に戦略的に配置され、直射日光が当たらない場所にあることも前提としています。この配置は、過剰な加熱を避け、取り付けられた場所で温度の測定値を継続的に記録し、生成します。センサーは、互いに通信して個々の温度測定値を送受信し、それぞれの環境のステータスを把握します。図 1 にあるように、デバイスによって記録される主なパラメーターには、摂氏での温度、タイムスタンプ、ノード ID、および「infectedBy」が含まれます。
図 1.IoT センサーデバイスによって異なる時点で測定された温度を含むセンサーイベントの一覧
山火事となり得る火事が広がり始めると、その進路内に配置された IoT センサーがそれ以降の温度上昇を検知できます。センサーはその後、近隣のセンサーとその情報を共有します。この現象は、感染症が方向経路に沿ってネットワーク全体に広がることに似ています。これは通常、ネットワーク科学で Susceptible-Infected (SI) 感染症モデルと呼ばれるものです。
図 2 にあるように、所定のノードによって送信された「infectedBy」パラメーターは、そのノードが近隣の IoT デバイスに「感染」した (つまり、山火事がこの経路で広がっている) ことを示し、「ノード ID」がパラメーター値としてリストされます。ここでは、ネットワーク内のノードが近隣のノードのひとつに感染すると、別のノードに再感染することはないと仮定します。従って、「infectedBy」パラメーターの値はそのまま変わりません。
図 2.周囲の地理的地域の温度をモニタリングする IoT センサーネットワークの大まかな概要
このシナリオの必要上、山火事が経時的に広がる様子を表す図 2 には、11 ノードの IoT センサーネットワークが示されています。IoT センサーの配置は、各ノードが IoT センサーデバイスである無向グラフネットワークとして可視化することができます。2 つの隣接するノードの間のリンクは、マルチホップワイヤレスアドホックネットワーク内におけるワイヤレス接続を示しています。図 2 は、以下の詳細を示します。
- 時間 t1 では、ノードのすべてが温度データを生成しています。しかし、50℃ に設定してある山火事アラートしきい値以上の温度を報告しているノードはありません。
- 時間 t2 では、ノード 1 が 50℃ を報告しており、これは事前設定されたしきい値以上のものです。実際には、ノード 1 の監視下にある地域において最近発生した小規模の山火事であった可能性があります。
- 時間が t3 に移行すると、ネットワーク内でノード 2 が監視する地域に火事が急速に広がっているのがわかります。このため、ノード 2 はノード 1 に感染したと言うことができます。これはノード 2 によって生成された「infectedBy」パラメーターに反映されており、ここにある値 1 は、火事が徐々に広がっている様子を表しています。
- 次に、時間 t4 では、火事がさらにノード 3 まで広がり、それに続いてノード 4、そして時間と共にノード 5 に広がります。
この類推は、IoT デバイスのネットワークにまたがる山火事の全体的な拡大をイメージするために役立ちます。このブログ記事では、Apache Flink からの CEP 機能を使って、測定された温度が 50℃以上で、感染レベルがワイヤレスに接続された IoT デバイス 5 台のうち 4 台となっているイベントパターンを検知します。このパターンが Amazon EMR でリアルタイムのイベントストリーム処理によって検知されると、SNS アラート E メールがサブスクライブされた E メールアドレスに送信されます。E メールには、それまでに広がった火事の経路が記載されています。
アーキテクチャの概要
このセクションでは、リアルタイムのイベント処理パイプラインを開始から終了まで構築します。これは、インフラストラクチャ層の IoT デバイスによって生成された温度測定値を簡素化し、潜在的な山火事のモニタリングのための予測アラートと可視化システムを構築します。図 3 にある大まかなアーキテクチャ図は、このシステムの構築に必要となるコンポーネントを示しています。
図 3.リアルタイムの山火事アラートと可視化システムの大まかなブロック図
この図は、IoT センサーイベント (つまり、測定された温度) が IoT ゲートウェイ送り込まれていることを示しています。ゲートウェイには、レコードをストリーム処理システムに転送するためのインターネット接続があります。ゲートウェイはすべての IoT イベントのためのランディングゾーンであり、耐久性のあるストリーミングストレージにレコードを取り込みます。IoT ゲートウェイは、ユーザーがインフラストラクチャを管理する必要なく、10 億台を超えるデバイスをサポートするために自動でスケールする必要があります。
次に、イベントを耐久性のあるストレージに保存します。温度イベントは、IoT センサーデバイスなどの信頼性のないソースから送られてくるためです。結果的に、レコードが喪失された場合、イベントは再現できません。ストリーミングストレージは、大量のイベントのイングレスとエグレスをサポートする必要もあります。イベントは次に、受信イベントをパターンに一致させ、その後必要に応じてアラートをサブスクライバーに送信するストリーム処理エンジンに取り込まれます。Raw イベントも可視化システムに取り込まれます。このシステムは、IoT ノードによって監視されている地域の温度のリアルタイムでのヒートマップを表示します。
AWS でのリアルタイム山火事アラートと可視化システムの構築
図 4.AWS のサービスを使用したリアルタイム IoT ストリーム処理パイプラインのアーキテクチャ
このセクションでは、図 4 にあるとおり、AWS のサービスをいくつか使用したイベント処理システムのコンポーネントレベルのアーキテクチャについて説明します。IoT センサーデータは、送られてくる温度レコードを受け取る AWS IoT サービス に送信されます。AWS IoT サービスはその後、ストリーミングパイプラインのゲートウェイとして機能します。AWS IoT サービスには、ルールエンジンが装備されています。これは、受信イベントを別の AWS のサービスを宛先として転送できるように、受信イベント用のアクションを設定するために使用できます。
今回は、宛先として Amazon Kinesis Data Streams サービスが選択されており、保持期間を 1 日とする信頼性のあるストリームストレージ基礎システムとして機能します。以下の図 5 は、このブログ記事で使用されるそのようなルールのひとつとアクション設定を示しています。イベントは次に、Amazon EMR クラスターで実行されている Apache Flink 処理エンジンに取り込まれます。Apache Flink は、Amazon Kinesis Data Streams シャードからレコードを取り込み、レコードを事前定義されたパターンに一致させ、潜在的な山火事の可能性を検知します。
図 5.受信する温度イベントのための AWS IoT のルールとアクション
Apache Flink のストリーム処理エンジンとしての使用
このブログでは、ストリーム処理エンジンとして Apache Flink を選択しました。これは、Apache Flink が高スループットと共にリアルタイムイベントの低レイテンシー処理を提供するからです。さらに重要なのは、Apache Flink が Amazon Kinesis Data Streams のためのイベント時間セマンティクスでのストリーム処理とウィンドウィングをサポートする点です。これは、イベントが前後して受信され、信頼性のないワイヤレスネットワーク通信が原因で遅延する可能性もあるという場合に重要な機能です。Apache Flink で使用できるもうひとつの優れた機能は、複合イベント処理 (CEP) ライブラリです。これは、一定期間にわたって受信イベントのストリーム内のパターンを検知することを可能にします。では、これらの機能と、これらを特定のユースケースで使用する方法について詳しく検証していきましょう。
IoT イベント、イベント時間処理、およびウォーターマークの特徴
ほとんどの IoT ユースケースは、大量のイベントを時間と共に継続的に生成する多数のセンサーデバイスを扱ったものです。イベントは通常、レコード内にタイムスタンプがあり、レコードが生成された時間を示しています。ただし、コンシューマーに関しては、イベントが前後して到着したり、処理中の遅延を伴って到着したりする可能性があります。ストリーム処理アプリケーションでは、コンシューマーがアウトオブオーダーイベントに対処できる必要があります。
Apache Flink は、イベント時間のウィンドウィングを用いてこれを行います。ウィンドウは、処理時間ウィンドウに従ってスライドするのではなく、イベント時間によってスライドします。これは、関連性が高いイベント時間に基づいてアラート生成判断を行うために役立ちます。処理ウィンドウがイベント時間に基づいている場合、イベント時間を繰り上げるタイミングを知っておく必要があります。これによって、いつウィンドウを閉じて、次の処理タスクをトリガーできるのかがわかります。これは、Flink の「ウォーターマーク」を使って行われます。ウォーターマークを生成するメカニズムには様々なものがあります。今回は、特定の単位時間で処理時間から遅れを取っているというウォーターマークを生成する TimeLagWatermarkGenerator を使用しました。これは、レコードの一部が特定の遅延後に Flink コンシューマーに到着することを前提とします。また、Flink を処理エンジンとして選択したことにも言及しておくべきでしょう。Flink は、私たちが提供するパターンに基づいてイベントを一致させる Flink CEP 機能を提供します。この機能は現在、Structured Spark Streaming および Amazon Kinesis Data Analytics では使用できません。
Apache Flink の複合イベント処理
Amazon Kinesis Data ストリームから Amazon EMR クラスターで実行される Apache Flink アプリケーションにレコードが取り込まれると、それらがパターンに一致する必要があります。パターンは、まず 50℃ に到達し、次に同じしきい値温度に到達して、パターンの最初のイベントに対応する最初の IoT センサーに感染した別のイベント (別の IoT センサーからのもの) が後に続くレコードをフィルターします。
例えば、受信イベントのうち、ノード 1 の IoT センサーから温度が 50℃ 以上のイベントを取得します。これが CEP パターンの最初のイベントです。次に、この最初のイベントに続くイベントを探します。例えば、温度が 50℃ のしきい値に達しており、その「infectedBy」フィールドがノード 1 を示す「1」になっているノード 2 からのイベントです。この状態が、ノード 3、ノード 4、および ノード 5 といった 3 台のノードに反復して繰り返される場合、ノード 1 から開始された潜在的な山火事の 4 レベルのネットワーク経路 (N1 -> N2 -> N3 -> N -> N5) の完全なパターンが検知されたと考えられます。
今回の実装では、火事が接続された 5 台のノードに広がったときにアラートを送信することにしました。しかし実際には、何台のノードに火が広がってからアラートを送信するかについては、注意深く選択する必要があります。この特定のパターンの論理図を図 6 に示します。最後に、潜在的な山火事に応答して、アラート E メールが SNS に発行され、SNS がサービスのサブスクライバーに E メールを配信します。以下の 図 7 は、SNS アラート E メールのサンプルです。
図 6.山火事予測のための論理パターン図
図 7.潜在的な山火事とその移動経路を通知するための Amazon SNS E メールアラートのサンプル
時間と共に広がる潜在的な山火事のリアルタイムの可視化
受信する IoT イベントレコード (Amazon Kinesis Data Stream から取得したフィルタリングされていない raw イベント) は、耐久性のあるストレージと Kibana Web UI での可視化のために Amazon Elasticsearch Service クラスターにプッシュされます。(Kibana の詳細については、What Is Kibana を参照してください。) 図 8 にあるように、山火事の進行を継続的に監視するための、リアルタイムヒートマップによる可視化とダッシュボードの作成が行われます。図 8 からわかるように、山火事はノード 1 からノード 2、ノード 3 へと広がり、次にノード 4 と ノード 5 に広がっています。この可視化は、IoT センサーノードの地理的位置を記録することによってさらに拡張できます。記録後、ヒートマップを監視対象の地理的地域に表示します。
図 8.Amazon Elasticsearch Services からの山火事ヒートマップ可視化のサンプル
セットアップとソースコード
下記の URL は、必要な AWS コンポーネントのすべてをセットアップし、IoT シミュレーターと Apache Flink CEP を実行するための手順を詳しく説明しています。今回は、AWS CloudFormation テンプレートを提供します。これは、図 4 にあるように、その他全ての関連コンポーネントと共に EC2 インスタンスに IoT シミュレーターをセットアップすることによって開始から終了までのアーキテクチャを作成した後、スタックを自動的に実行します。スタックの作成が完了すると、ユーザーは Kibana Web UI にアクセスして、リアルタイムの山火事ダッシュボードを観察できます。ユーザーは、潜在的な山火事の SNS アラート E メールに対する E メールサブスクリプションを確認して、アラート E メールを受け取ることもできます。
AWS CloudFormation スタックを起動するには、以下の Launch Stack ボタンをクリックして AWS CloudFormation コンソールを開きます。
テンプレート用に Amazon S3 URL を指定してから、次のセクションで説明されている詳細を指定する次のステップに進みます。すべてのパラメーターを入力したら、次へ進んでスタックを作成できます。使用するパラメーターは以下の通りです。
1.SNS subscription email: 火事アラート通知が送信される有効な E メールアドレスにする必要があります。CloudFormation スタックの作成が開始されたら、サブスクリプションの確認のために、入力した E メールアカウントに E メールが届きます。SNS 通知を受け取るには、Confirm subscription ボタンをクリックします。
注意: エンドツーエンドの考察が終わったら、SNS サブスクリプションを Amazon SNS コンソールから安全に削除することができます。
2.Public subnet ID for EMR cluster: ドロップダウンメニューからパブリックサブネットを選択します。これは、EMR クラスターが作成されたサブネットです。EMR クラスターには、Kinesis ストリームと Elasticsearch ドメインにアクセスするためのインターネット接続が必要です。このため、パブリックサブネットは enableDnsHostnames および enableDnsSupport オプションが true に設定されている、VPC 内でオプション パブリック IPv4 アドレスの自動割り当てを有効にする にチェックが入ったものになります。
3. S3 path location for EMR cluster logs: EMR がクラスターログを格納する S3 バケットです。
図 9.AWS リソーススタックを作成するための Amazon CloudFormation コンソール。
4.Public subnet ID for EC2 instance: IoT シミュレーターを実行するための EC2 インスタンスが作成されるサブネットです。CloudFormation スタックが表示されたら、IoT イベントを AWS IoT ゲートウェイに取り込むために、IoT シミュレーターが自動的に実行されます。このサブネットの選択は、図 9 にある EMR クラスター用に選択されたサブネット用に設定されているものと同じガイドラインに従います。
5.Security group ID for EC2 instance: IoT シミュレーターを実行する EC2 インスタンスにアタッチされたセキュリティグループです。オプションとして、このセキュリティグループに SSH ポート 22 のための新しいルールを追加できます。これによって、ワークステーションから EC2 インスタンスで実行される IoT シミュレーターにアクセスし、Kibana Web UI (この記事で後ほど説明) へのアクセスに同じパブリック IP アドレスを使用することが可能になります。
6. Key pair ID for EC2 instance: EC2 インスタンスと EMR クラスターに関連付けられるキーペアです。これは、さらなる調査のためにインスタンスにログインすることを可能にします。
図 10.AWS リソーススタックを作成するための Amazon CloudFormation コンソール。
7.Domain name for Amazon Elasticsearch domain: CloudFormation テンプレートによって作成される Elasticsearch ドメインの名前です。
8.Public IP address to access Kibana from local machine: 可視化のための Kibana ダッシュボードにアクセスするときのアクセス元となるローカルマシンの IP アドレスです。簡潔化のために、CloudFormation スタックの実行元であるワークステーションのパブリック IP アドレスを入力することができます。この IP アドレスは、リアルタイムの Kibana ダッシュボードを表示するための Amazon Elasticsearch ドメインへのアクセスを解放します。また、Kibana URL へのアクセスもこの IP アドレスからのみ行う必要があります。お使いの IP アドレスが変更される場合は、新しい IP アドレスで AWS Elasticsearch クラスターのポリシーを変更します。詳細については、Amazon Elasticsearch Service 開発者ガイドの「IP ベースのポリシー」を参照してください。
スタックの作成が完了したら、CloudFormation スタックの出力セクションに、このアーキテクチャ内の必要なリソースにアクセスするための web URL がリストされます。Kibana Web URL を使って、[Management] セクションで「weather-sensor-data」という名前のインデックスパターンを作成し、[Dashboard] を選択して IoT ネットワークの対象となっている山火事のリアルタイムの延焼の可視化を表示します。
図 11.インデックスパターンを作成するための Amazon Elasticsearch ドメイン Kibana Web UI。
このアーキテクチャの実装について更に詳しく調べることに興味があるユーザーのために、GitHub にソースコードと詳細な README が提供されています。
https://github.com/aws-samples/realtime-bushfire-alert-with-apache-flink-cep
一般的な問題のトラブルシューティング
1.Kibana Web UI にアクセスできない
Kibana Web UI へのアクセス試行中に「User: anonymous is not authorized to perform: es:ESHttpGet」というメッセージが表示される場合、CloudFormation スタック作成時に指定されたパブリック IP アドレスが正しくない、または変更されている可能性があります。パブリック IP アドレスは、http://checkip.amazonaws.com で再確認できます。確認後、AWS マネジメントコンソールの Elasticsearch に移動し、以下の例にあるように、セキュリティアクセスポリシーを変更して IP アドレスのみを変更します。
図 12.Amazon Elasticsearch ドメインアクセスポリシーでのパブリック IP アドレスの変更。
2. Elasticsearch にレコードが取り込まれていない
この問題は、EMR クラスターからのレコードの取り込みが失敗するときに発生する場合があります。トラブルシューティングを行うには、AWS マネジメントコンソールの IAM に移動し、「EMR_EC2_Default_Role」という名前の IAM ロールを検索して、それに「AmazonElasticMapReduceforEC2Role」というデフォルト AWS 管理ポリシーがアタッチされていることを確認してください。
図 13.IAM におけるデフォルト AWS 管理ポリシーが「EMR_EC2_Default_Role」にアタッチされていることの確認。
3.SNS アラート E メール通知が届かない
完全な可視化を数分観察しても潜在的な山火事の SNS E メールアラートが届かない場合は、受信ボックスをチェックして、CloudFormation スタックの作成当初に SNS サブスクリプションを確認したかどうかを確かめてください。さらに、正しい E メールアドレスを入力したことを確認して、スタックを最初から作成しなおします。
まとめ
このブログ記事では、様々な AWS のサービスを使用して、リアルタイムの IoT ストリーム処理、可視化、およびアラート生成パイプラインを構築する方法について説明しました。ネットワーク内で受信イベントからパターンを検知するために、Apache Flink が提供する複合イベント処理機能を活用しました。GitHub リポジトリには、このポストで説明した例を実行するために必要なリソースが含まれています。これには、素早く開始するために役立つ追加の情報があります。異なるパターンを持つより多くのレコードを取り込み、Kibana ダッシュボードで山火事の延焼経路パターンを可視化するために、IoT シミュレーターコードを詳しく検討し、異なるネットワーク構成でテストすることをお勧めします。
その他の参考資料
この記事が役に立つと思われる場合は、AWS上でApache Flinkを使用してリアルタイムストリーム処理パイプラインを構築する、Derive Insights from IoT in Minutes using AWS IoT、Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight, and Integrating IoT Events into Your Analytic Platform も併せてお読みください。
著者について
Santoshkumar Kulkarni は、AWS Big Data and Analytics Services のクラウドサポートエンジニアです。 Santoshkumar は、アーキテクチャおよびエンジニアリングに関するサポートとガイダンスを提供するために AWS のお客様と密接に連携しており、分散型テクノロジーとストリーミングシステムに情熱を傾けています。そのかたわらで、シドニーの美しい海辺での家族との時間を楽しんでいます。
Joarder Kamal 博士は、AWS Big Data and Analytics Services のクラウドサポートエンジニアです。 分散型通信、リアルタイムデータ、および集団的知性を組み合わせたシステムを構築して自動化することが好きです。余暇の間は、旅行記を読む、鉛筆画を描く、そして妻と一緒にオーストラリアを旅することを楽しんでいます。