ストリーミングデータをニアリアルタイムに取得し分析するソリューションを試す
丸山 友輔 (監修 : 稲田 大陸)
みなさん、こんにちは。ソリューションアーキテクトの丸山です!
皆さんは AWS ソリューションライブラリをご存知でしょうか ? この中には、すぐにお客様の課題に対応できるよう導入までの手順や AWS CloudFormation のテンプレート等を含めた AWS ソリューション実装というリファレンス実装パターンが掲載されています。今回はこの中からストリーミングデータをニアリアルタイムに取得し分析できる「Streaming Data Solution for Amazon Kinesis」をご紹介します!
ソーシャルネットワーキングサービス (SNS) の声をいち早く捉えてリアルタイムに分析したいがその手段について悩んでいる方や、センサーデータなどの IoT デバイスからの情報をリアルタイムで取得して早期に異常検知する方法を知りたい方などはいらっしゃいませんか?このような場合に Amazon Kinesis が有効です。ストリーミングデータをリアルタイムに収集しつつ、不要なデータをフィルタリングすることができ、データストアとクエリサービス、BI ツールと組み合わせることで可視化を行うことができます。また、IoT デバイスからのログを分析するような場合にも、リアルタイムでログを取得することでエラーの早期発見につながります。
ご注意
本記事で紹介する AWS サービスを起動する際には、料金がかかります。builders.flash メールメンバー特典の、クラウドレシピ向けクレジットコードプレゼントの入手をお勧めします。
この記事のデモを無料でお試しいただけます »
毎月提供されるデベロッパー向けアップデート情報とともに、クレジットコードを受け取ることができます。
Amazon Kinesis とは
Amazon Kinesis は、ストリーミングデータを収集、処理、処理するためのマネージドサービスです。Amazon Kinesis は Amazon Kinesis Data Streams、Amazon Kinesis Data Firehose、Amazon Kinesis Data Analytics、Amazon Kinesis Video Streams の 4 つのサービスに分かれています。
- Amazon Kinesis Data Streams は、耐久性、伸縮性に優れており、データの紛失を防ぎながら、大規模なストリーミングデータをリアルタイムに取り込むことができます。
- Amazon Kinesis Data Firehose は、ストリーミングデータを確実にキャプチャおよび変換し、データレイク、データストア、および分析サービスに配信する、抽出、変換、ロード (ETL) サービスです。
- Amazon Kinesis Data Analytics は、ストリーミングデータをリアルタイムで分析することができるサービスです。これにより、データの傾向の変化をリアルタイムで監視することができます。
- Amazon Kinesis Video Streams は、さまざまなデバイスからAWSへ動画をストリーミングすることができるサービスです。こちらは他の3つのサービスとはストリーミングの対象が異なるため本記事での説明は省略します。
ソリューションの概要
「Streaming Data Solution for Amazon Kinesis」では、ユースケースに応じた複数のオプションが用意されています。それぞれのオプションの概要は以下の通りです。
オプション 1.
モバイル端末のような AWS 以外の環境からデータをキャプチャーし、リアルタイムでデータの処理を行います。ユースケースとして、例えばモバイルアプリケーションからの位置情報をストリーミングで収集し、ユーザーの位置情報に基づいた分析をリアルタイムで行うことが可能です。
オプション 2.
Amazon Elastic Computing Cloud (Amazon EC2) などのサービスから送信されたストリーミングデータに対し、リアルタイムで集計を行います。こちらを活用することで、サーバーやアプリケーション、IoT デバイスの状態ログをリアルタイムに監視を行うことができます。また、SNS 上の情報をリアルタイムに分析し、最新のトレンドを把握することにも活用できます。
オプション 3.
ストリーミングデータをバッファリングし、S3 に格納します。この過程のログは Amazon CloudWatch によって監視されます。大規模なストリーミングデータを機械学習などに利用するために、必要最低限な ETL とデータを収集を行いたい場合などに活用できます。
オプション 4.
ストリーミングデータをリアルタイムで集計を行い、その結果を用いた処理を断続的に行います。ユースケースとして、サーバーやアプリケーションのログをリアルタイムに収集、分析を行い、あるメトリクスが一定の閾値を超えた場合に AWS Lambda による処理や機械学習を行いたい場合などに使用することが考えられます。
これら全てのオプションについてそれぞれ CloudFormation のテンプレートが用意されており、数クリックで環境の構築が可能となっています。今回は、データの収集から分析、保存まで一気通貫で行うことが出来るオプション 2 をご紹介します。データの生成元である EC2 をみなさんがお使いのサービスに置き換えることで既存環境に簡単に導入することが出来ます。
以下は、今回デプロイするソリューションのオプション 2 のアーキテクチャ図です。
アーキテクチャ
ソリューション中で行っている処理は以下の通りです。
- Amazon EC2 インスタンスは、Amazon Kinesis Producer Library (KPL) を使用してデモデータを生成します。デモデータはランダムな株価データです。
- Amazon Kinesis Data Streams によってデモデータストリームを受信します。
- Amazon Kinesis Data Analytics は、Amazon Kinesis Data Streams から送られてくるデータを処理し、処理後のデータを Amazon S3 バケットに保存します。
- Amazon CloudWatch ダッシュボードは、アプリケーションの状態、進行状況、リソース使用率、イベントとエラーを監視します。
デプロイ方法/設定方法
本ソリューションは CloudFormation のテンプレートを用いることで簡単に構築できるようになっています。Amazon Kinesis を用いたデータストリーミング の紹介ページから「オプション 2 を使用して AWS コンソールで起動する」をクリックします。
クリックすると拡大します
今回はバージニア北部 (us-east-1) リージョンを使用します。必要なテンプレートはすでに参照されているので「次へ」をクリックします。
クリックすると拡大します
スタック名を入力する必要があるので、「Kinesis-demo-app-02」と入力します。
クリックすると拡大します
また、Amazon KPL を使用したアプリケーションを動かすための EC2 を起動する VPC とサブネットを入力する必要があるためで、任意の VPC を選択します。今回は VPC (172.31.0.0/16) とサブネット (172.31.0.0/20) を選択しました。
クリックすると拡大します
上記設定ができたら、「次へ」をクリックします。
クリックすると拡大します
詳細オプションでは何もせずに、「次へ」をクリックします。
クリックすると拡大します
レビューの機能と変換の項目では、IAM リソースが新たに作成されることを承認する必要があるので、チェックボックスにチェックを入れ、「送信」をクリックします。
クリックすると拡大します
スタックが完全に作成されるまで 5 - 10 分 ほど待ちます。その後ステータスに「CREATE_COMPLETE」と表示されていればスタックが正しく作成されています。
クリックすると拡大します
動作確認
構築したシステムを実際に動かしてみましょう。まず、「Kinesis Data Analytics」を開き、「Studio」をクリックします。
クリックすると拡大します
ここで Studio ノートブック一覧が表示されるので、「KdaStudio」 から始まる Studio ノートブックを選択し、「実行」をクリックします。
クリックすると拡大します
クリックすると拡大します
ステータスが「実行中」になれば完了です。ノートブックが起動できました。
クリックすると拡大します
次に、Kinesis Data Streams にデモデータを送るため、Amazon KPL を使用したアプリケーションを起動します。「Systems Manager」を開きます。
画面左側の一覧から「セッションマネージャー」をクリックします。
クリックすると拡大します
セッション一覧の中で、「セッションを開始する」をクリックします。
クリックすると拡大します
セッションの開始をする画面で、KplInstance という名前のターゲットインスタンスを選択し、「セッションを開始する」をクリックします。
クリックすると拡大します
すると、新しいウィンドウが開き、EC2 への接続が開始されます。ここで、Amazon KPL のデータ送信先である Kinesis Data Streams のストリーム名を取得するために、一旦マネージメントコンソールに戻ります。
次に「Kinesis Data Streams」を開き、画面左側の一覧から「データストリーム」をクリックします。
クリックすると拡大します
ここに、作成した Kinesis Data Streams の一覧が表示されるので「Kinesis-demo-app-02-KdsDataStream・・・」という名前をコピーし、メモに残しておきます。これで、Kinesis Data Streams のストリーム名を取得できました。次に、Amazon KPL を使用したアプリケーションを実際に動かします。先ほど起動した、セッションマネージャーの画面に戻ります。そこで、
sudo java -jar /tmp/aws-kpl-demo.jar <ストリーム名> us-east-1 7200
と入力し、実行します。ここでの <ストリーム名> は先程メモに残した、 Kinesis Data Streams のストリーム名です。
クリックすると拡大します
すると、画像のようになり、Amazon KPL を使用したアプリケーションが起動されます。今回の場合は KPL によって上記のような JSON 形式のデータが毎秒 100 個 Kinesis Data Streams に送られています。
{
"event_time": "2020-08-01 12:00:00.000",
"ticker": "AMZN",
"price": 50
}
次に、Kinesis Data Streams に送られるデータを Kinesis Data Analytics を用いて整形して S3 に保存します。
まず、保存先の S3 のバケット名を取得します。
次に、「S3」を開き、「kinesis-demo-app-outputbucket」から始まる S3 バケットのバケット名をメモします。
クリックすると拡大します
これで、保存先の S3 のバケット名を取得できたので、次に「Kinesis Data Analytics」を開きます。
Studio タブをクリックし、Studio ノートブック一覧を表示させます。
さらに、先程と同じく「KdaStudio」となっている Studio ノートブックを選択し、「Apache Zeppelin で開く」をクリックします。
クリックすると拡大します
すると、新たなウィンドウで Apache Zeppelin (データの取り込みや可視化などを行うことができる Web ベースのノートブック) のページが表示されます。そこで、「Create new note」をクリックします。
クリックすると拡大します
するとポップアップが表示されるので、「Note Name」に「Demo」と入力し、「Create」をクリックします。
クリックすると拡大します
すると、ノートブックが開始されます。
クリックすると拡大します
このノートに、以下のコードをコピーして貼り付けます。この時、 <STREAM-NAME> には前のステップでメモに残した、Kinesis Data Streams のストリーム名、 <AWS-REAGION> には「us-east-1」を入力します。このコードでは、Kinesis Data Stream から送られてくるデータを一時的に溜めておくテーブルを作成しています。
%flink.ssql CREATE TABLE stock_table (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = '<STREAM-NAME>',
'aws.region' = '<AWS-REGION>',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
上記コードをノートに記入したら、Shift + Enter で実行します。コードの実行に成功すると、画像のように、「Table has been created.」と表示されます。
クリックすると拡大します
同様に、下記コードも実行します。このコードでは、Kinesis Data Analytics のチェックポイントを設定しています。チェックポイントとは、アプリケーションが中断したとき、即座に回復するために使用される最新のバックアップです。
%flink.pyflink
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval", "1min"
)
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode", "EXACTLY_ONCE"
)
また、下記コードも実行します。<BUCKET-NAME> には先程保存した、S3 バケット名 kinesis-demo-app-outputbucketxxxxxx を入力します。このコードでは、一時的にデータを溜めておいたテーブルのデータを整形して、S3 バケットに保存しています。
%flink.ssql(type=update)
CREATE TABLE sink_table_s3 (event_time TIMESTAMP, ticker STRING, price DOUBLE, dt STRING, hr STRING)
PARTITIONED BY (ticker, dt, hr)
WITH ('connector' = 'filesystem', 'path' = 's3a://<BUCKET_NAME>/', 'format' = 'json');
INSERT INTO sink_table_s3
SELECT
event_time,
ticker,
price,
DATE_FORMAT(event_time, 'yyyy-MM-dd') as dt,
DATE_FORMAT(event_time, 'HH') as hh
FROM stock_table
WHERE price > 50;
上記コードが全て実行されると画像のようになります。
クリックすると拡大します
これで、Amazon KPL から送られるデータを確認する用意ができました。実際にデータを見てみましょう。マネージメントコンソールに戻り、S3 のバケット一覧を表示します。
その後、先程生成したデータの保存先である「kinesis-demo-app-outputbucket」から始まる S3 バケットを探して、そのバケット名をクリックします。
クリックすると拡大します
Amazon KPL から送られたデータが S3 に保存されていることが確認できます。
クリックすると拡大します
この中で「ticker=AMZN/」となっているフォルダを選択します。日付ごとのフォルダ一覧が表示されるので、今回生成されたフォルダをクリックします。
クリックすると拡大します
このページでは時間ごとのフォルダ一覧が表示されていることがわかります。任意のフォルダをクリックします。
クリックすると拡大します
フォルダ内にあるオブジェクトを見ることができます。任意のオブジェクトをクリックします。
クリックすると拡大します
もし画像のようにエラーが表示がされた場合、違う時間のフォルダを開いて、同様にオブジェクトファイルを開いてください。
クリックすると拡大します
フォルダを開くとこのようなページが表示されます。ここではオブジェクトの詳細情報が載っています。データを確認するため、「ダウンロード」をクリックしてデータをダウンロードします。
データの確認方法には、S3 Select を使う方法もありますが、今回は簡単に確認するダウンロードします。 この時エラーページが表示された場合、オブジェクトが更新されている可能性がありますので、オブジェクトページを開き直して再度ダウンロードを行なってみてください。ダウンロードしたページを開くと、画像のようになり、データが取得され S3 に保存されていることがわかりす。
クリックすると拡大します
Kinesis Data Analytics のページから、作成した Studio ノートブックを開きます。
クリックすると拡大します
Studio ノートブックの詳細ページが表示されるので、画面を下にスクロールすると、Amazon CloudWatch によってメトリクスの監視ができていることがわかります。
このように、Kinesis ではリアルタイムにデータを収集し、整形したのち、データを保存することができます。
クリックすると拡大します
クリーンアップ
最後に、作成したリソースを削除します。まず、セッションマネージャのページに戻り、Ctrl + C によって Amazon KPL を停止します。
クリックすると拡大します
次に「IAM」を開き、左側の一覧から「ロール」をクリックします。
クリックすると拡大します
ロールの検索画面に「Kinesis」と入力し、ロール名が「kinesis-demo-app-02」で始まるもののうち、「信頼されたエンティティ」の欄の AWS のサービスが、「kinesisanalytics」と「ec2」であるものを選択して「削除」をクリックします。
クリックすると拡大します
以下のように表示されるので、テキスト入力フィールドに「削除」と入力して、「削除」ボタンをクリックします。
クリックすると拡大します
次に「CloudFormation」を開き、スタック一覧から「kinesis-demo-app-02」という名前のスタックを選択して、「削除」をクリックします。
クリックすると拡大します
画面のようなポップアップが出たら、「スタックの削除」をクリックします。
クリックすると拡大します
最後に本ハンズオンによって作成した S3 バケット (「kinesis-demo-app-02」から始まるバケット) を削除したら作業完了です。お疲れ様でした!
コスト試算
Amazon Kinesis の料金は従量課金制となっており、使用したリソースの料金のみが発生します。実際に本ソリューションを構築した場合に発生するコストをまとめました。AWS 料金計算ツールを用いることで、簡単に利用料金の見積りができるので、実際に使用される際にはこちらをご活用ください。
ストリーミングデータ量を 100 個/s、データサイズを 1 KB と仮定し、1 ヶ月間使用した場合のコスト試算は下記になります。
AWS Service | 詳細 | コスト(us-east-1) |
Amazon Kinesis Producer Library | EC2 インスタンス(t3.small) 730 時間/月 |
15.18 USD |
Amazon Kinesis Data Streams | 1 シャード | 10.95 USD |
100 レコード(4 KB)/秒 | 3.68 USD | |
168 時間 データ保持 | 14.60 USD | |
Amazon Kinesis Data Analytics | 1 プロセッシングユニット | 80.30 USD |
50 GB の実行中のアプリケーション ストレージ | 165.60 USD | |
Amazon S3 | 1 GB ストレージ | 0.02 USD |
合計 | 199.73 USD |
まとめ
ストリーミングデータをニアリアルタイムに取得し分析できる「Streaming Data Solution for Amazon Kinesis」をご紹介しました。CloudFormation テンプレートを使った簡単な設定ですぐ分析環境を構築できることがご理解いただけたかと思います。データ活用の俊敏性がビジネスに大きなインパクトを及ぼす現代に対応する上で、今回ご紹介したソリューションがストリーミングデータ分析の手助けとなりましたら幸いです。
builders.flash では、他にもさまざまな AWS ソリューションの紹介記事を投稿しています。それでは、次回の記事もぜひお楽しみに!
筆者プロフィール
丸山 友輔
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト
2022 年に AWS へ入社。現在はクラウド活用の技術支援をサポートしつつ、様々な状況に最適ソリューションについて学習中です。最近は旅行にはまっており月一程度で日本国内を飛び回っています。
監修者プロフィール
稲田 大陸 (Riku Inada)
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト
2021 年 4 月に AWS Japan に入社した、筋トレが趣味なソリューションアーキテクトです。現在は製造業のお客様を中心にクラウド活用の技術支援を担当しています。好きな AWS のサービスは AWS Amplify と Amazon Location Service です。週末には美味しいお酒を求めてフラフラしています。
AWS を無料でお試しいただけます