Amazon Web Services ブログ

新しいAWS IoT Analyticsの機能を使って、IoTデータをデータレイクと統合する

この記事はAWSシニアデータアーキテクトのAsim Kumar SasmalとAWSシニアプロダクトマネージャのVikas Panghalによって投稿されました。

 

AWS IoT AnalyticsはAWS IoT Analyticsを介して取り込まれたIoTデータをお客様のAWSアカウントにあるデータレイクに統合するための2つの新機能を提供します。1つ目はお客様管理のAmazon S3、2つ目はAmazon S3へのデータセットのコンテンツ配信です。

以前までは、AWS IoT AnalyticsのデータはAWS IoT Analyticsサービス内で管理されているAmazon S3バケットしか利用することができませんでした。この記事では、最近リリースされたこれら2つの機能を使用してIoTデータをデータレイクに統合するための、エンドツーエンドの柔軟性を備えた、費用対効果が高い、安全で信頼性の高い、新しいソリューションについて説明します。

概要

AWS IoT AnalyticsはリアルタイムストリーミングIoTデータをフィルター、変換、質を高める(IoTデータを拡充する)ことができるフルマネージドのIoTサービスです。データストアに保存されたデータに対して、ビルトインされたSQLクエリエンジンでの分析や、機械学習の推論のような複雑な分析を行うことができます。

このサービスは、 Amazon QuickSightと連携し、データの可視化を行うこともできます。AWS IoT Analyticsは、Amazon SageMakerがホストおよび管理するJupyter Notebookを使用した高度なデータ探索機能も提供しています。わずか数回のクリックでコンテナ化し、Jupyter Notebookの起動をスケジュールすることもできます。

お客様はよく、AWS IoT Analyticsを使って得たIoTデータを、どのようにしてAWS上のデータレイクにある、非IoTデータをニアリアルタイム(1 -5分)、かつシームレスに取り込むかを尋ねてきます。これらのデータはERP、財務、サードパーティの気象データなどの参照データが考えられます。

そのゴールは以下の通りです。

  • 社内のすべてのデータは、オープンで柔軟かつ一貫性のあるデータストアとして利用できるS3に保存します。
  • セルフサービス分析のためにユーザーへのアクセスを民主化します。
  • 高速ストリーミングデータの取り込み および処理にあたり、AWS IoT Analyticsは他社との差別化につながらない作業を排除します。

(原文内になる「Undifferentiated Heavy Lifting」とは、本来他社と差別化できる価値を生み出す部分に集中したいのに、付加価値を生まない作業をしなければならない作業のことを指します。)

ソリューション

お客様管理のAmazon S3機能では、お客様のAWSアカウントで、S3バケットにAWS IoT Analyticsのチャネルとデータストアを作成することができます。Amazon S3へのデータセットのコンテンツ配信機能では、お客様のAWSアカウントでAWS IoT Analyticsのデータセットのコンテンツ結果(AWS IoT Analyticsデータのマテリアライズドビュー)をS3バケットに送信することができます。
補足:AWS IoT Analyticsデータのマテリアライズドビュー = AWS IoT Analyticsのクエリ結果

その結果、AWS IoT Analyticsのデータセット配信結果のスキーマを含むAWS Glue Data catalogのテーブルを自動作成し、かつAmazon Athenaでクエリを実行できるようになりました。データセットコンテンツの結果はS3バケットに保存されるため、独自のS3権限を適用し、ガバナンスポリシーに従って管理できます。

次のアーキテクチャ図は、このブログ投稿で構築する高レベルのエンドツーエンドソリューションを示しています。

ウォークスルー

この段階的なウォークスルーは次のセクションで構成されています。

  1. 前提条件
  2. AWS IoT Analyticsのチャネル、パイプライン、データストアのセットアップ
  3. サンプルデータをAWS IoT Analyticsチャネルに取り込む
  4. S3に配信するためのデータセットを作成する
  5. IoTデータをAWSのデータレイクに保存されている他のデータと統合する

前提条件

このユースケースでは、次のリソースがあることを確認してください。

  1. AWS IoT Analyticsが利用可能なAWSリージョンとAWSアカウントを用意してください。 このユースケースでは、米国東部(バージニア北部)リージョンを使用します。 AWS IoT Analyticsが利用可能であれば、別のリージョンを選択することが可能です。(補足:別のリージョンを選択する場合は、以降の手順でリージョンを入力するところを、ご自身が選択しているリージョンに読み替えて実施してください。)
  2. AdministratorAccessポリシー(本番環境では、ポリシーを制限することを推奨します)。
  3. ご自身のアカウントでインストール、使えるように設定されたAWS CLI。

必要な前提条件を満たすAWS CloudFormationテンプレートをここから取得します。 テンプレートには、次の8つのパラメーターとそのデフォルト値が必要です。 aks部分はお客様独自のプレフィックスで置き換えてください。 これにより、AWSアカウント全体で一意のバケット名や、AWSアカウント全体のAWSグローバルリソースとの競合が回避することができます。

  • Parameter: DataLakeGoldIoTADatasetS3Bucket; Default Value: aks-datalake-gold-iota-dataset
  • Parameter: DataLakeRawIoTAChannelS3Bucket; Default Value: aks-datalake-raw-iota-channel
  • Parameter: DataLakeSilverIoTADatastoreS3Bucket; Default Value: aks-datalake-silver-iota-datastore
  • Parameter: DataLakeGoldS3Bucket; Default Value: aks-datalake-gold
  • Parameter: IoTAGlueDB; Default Value: aks_datalake_iota_glue_db
  • Parameter: IoTAGlueTable; Default Value: aks_device_telemetry_stats
  • Parameter: DataLakeGoldGlueDB; Default Value: aks_datalake_gold_glue_db
  • Parameter: DataLakeGoldDeviceSpecsGlueTable; Default Value:aks_device_specs

補足:Default Valueの”aks“の部分をお客様独自の名前に置き換えて実行してください。後続手順においても”aks“が使われておりますが、お客様独自の名前に置き換えて手順を実行してください。

  • AWS CloudFormationテンプレートは、次のリソースを作成します。
    データレイクとなる4つのS3バケット。デフォルト暗号化はAES-256(SSE-S3)です。

    • aks-datalake-raw-iota-channel
    • aks-datalake-silver-iota-datastore
    • aks-datalake-gold-iota-dataset
    • aks-datalake-gold
  • AWS IoT Analyticsに必要なアクセス許可を付与するaks-datalake-silver-iota-datastoreバケットポリシー
  • AWS IoT、AWS Glue、AWS IoT Analytics、それぞれと信頼関係があり、必要なIAMアクセス許可を付与するiota_cmsb_roleという名前のIAMロール。このロールにより、AWS IoT CoreルールエンジンはAWS IoT AnalyticsのチャネルにMQTTメッセージを発行でき、リアルタイムストリーミングデータの処理、およびデータセット作成することができます
    • AWSマネジメントコンソールからAWS IoT Analyticsリソース(チャンネル、パイプライン、データストア、データセット)を作成すると、コンソールはIAMロールを作成して、必要なアクセス許可を付与します。ただし、実際の運用環境では、セキュリティ標準ごとに最小限の権限で、IAMロールを事前にセットアップすることを推奨します。
  • AWS IoT Analyticsデータセットの作成に使用するaks_datalake_iota_glue_dbという名前のAWS Glueデータベース。 Athenaがデータコンテンツを照会するために、S3に書き込まれたデータセットコンテンツの必要なスキーマ/メタデータを保存します。
  • Athenaがデータセットコンテンツをクエリするために、aks_datalake_iota_glue_db内に存在するaks_device_telemetry_statsという名前のAWS Glueデータカタログテーブル
  • Athenaクエリで使用するaks_datalake_gold_glue_dbという名前のAWS Glueデータベース。 データレイク上のデバイス仕様書のデータと、S3に書き込まれたAWS IoT Analyticsのデータセットを統合します。
  • S3(既存のデータレイク)上のデバイス仕様ファイルのスキーマ/メタデータがあるaks_device_specsという名前のAWS Glueデータカタログテーブル

AWS CloudFormationテンプレートによって作成されたiota_cmsb_roleという名前のIAMロールのAmazonリソースネーム(ARN)をコピーして保存します。

次のようなAWS CLIを使用して、ここからdevice_specs.csvという名前のデバイス仕様情報の入ったCSVファイルをaks-datalake-goldバケットにコピーします。

aws s3 cp device_specs.csv s3://aks-datalake-gold/aks_datalake_gold_glue_db/aks_device_specs/ --region 'us-east-1'
upload: ./device_specs.csv to s3://aks-datalake-gold/aks_datalake_gold_glue_db/aks_device_specs/device_specs.csv

AWS IoT Analyticsチャネル、パイプライン、データストアのセットアップ

ストレージタイプとしてお客様管理のAmazon S3バケットを使用して、aks_datalake_raw_iota_channelという名前のAWS IoT Analyticsのチャネルを作成します。 aksプレフィックスを独自のプレフィックスに変更し、IAMロールを先ほどメモしたARNに置き換えます。

  • CLI
aws iotanalytics create-channel --cli-input-json file://mychannel.json --region 'us-east-1'
  • CLI結果

{
"channelName": "aks_datalake_raw_iota_channel",
"channelArn": "arn:aws:iotanalytics:us-east-1:xxxxxxxx:channel/aks_datalake_raw_iota_channel"
}

ファイルmychannel.jsonには次のコードが含まれています。

  • JSONファイルの中身
{
    "channelName": "aks_datalake_raw_iota_channel",
    "channelStorage": {
        "customerManagedS3": {
            "bucket": "aks-datalake-raw-iota-channel",
            "keyPrefix": "myhome_raspberrypi/",
            "roleArn": "arn:aws:iam::xxxxxxxx:role/iota_cmsb_role"
        }
    }
}

補足:CLI実行環境にて、mychannel.jsonを作成します。上記、mychannel.jsonの内の“channelName”、“bucket”のaksをCloudFormation作成時に指定したPrefixに変更します。“roleArn”はAWSマネジメントコンソールのサービス一覧の“IAM”を選択し、“Roles”を選択、“iota_cmsb_role“を選択することで、Role ARNを確認することができます。mychannel.json作成後、”aws iotanalytics“から始まるAWS CLIを実行します。後続手順のmydatastore.json、mypipeline.json、mydataset.jsonも同様です。

前に作成したチャネルにメッセージを送信するAWS IoT Analyticsルールを作成します。 IAMロールARNを先ほどメモしたものに置き換え、aksプレフィックスを独自のプレフィックスに置き換えます。

aws iot create-topic-rule --rule-name aks_iota_datalake_raw_iota_channel --topic-rule-payload file://rule.json --region 'us-east-1'
{
    "sql": "SELECT * FROM 'iota/topic/myhome_raspberrypi'",
    "ruleDisabled": false,
    "awsIotSqlVersion": "2016-03-23",
    "actions": [
        {"iotAnalytics":
            {
                "channelName": "aks_datalake_raw_iota_channel",
                "roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/iota_cmsb_role"
            }
        }
    ]
}

ストレージタイプとしてお客様管理のAmazon S3バケットを使用して、aks_datalake_silver_iota_datastoreという名前のAWS IoT Analyticsデータストアを作成します。 IAMロールARNを先ほどメモしたものに置き換え、aksプレフィックスを独自のプレフィックスに置き換えます。

  • CLI
aws iotanalytics create-datastore --cli-input-json file://mydatastore.json --region 'us-east-1'
  • CLI結果

{
"datastoreName": "aks_datalake_silver_iota_datastore",
"datastoreArn": "arn:aws:iotanalytics:us-east-1:xxxxxxxxxxxx:datastore/aks_datalake_silver_iota_datastore"
}

ファイルmydatastore.jsonには次のコードが含まれています。

  • JSONファイルの中身

{
"datastoreName": "aks_datalake_silver_iota_datastore",
"datastoreStorage": {
"customerManagedS3": {
"bucket": "aks-datalake-silver-iota-datastore",
"keyPrefix": "myhome_raspberrypi/",
"roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/iota_cmsb_role"
}
}
}

aks_datalake_raw_iota_channelをパイプラインソースとして、aks_datalake_silver_iota_datastoreをパイプライン出力として、aks_datalake_iota_pipelineという名前のAWS IoT Analyticsパイプラインを作成します。 aksプレフィックスを独自のプレフィックスに置き換えます。

  • CLI
aws iotanalytics create-pipeline --cli-input-json file://mypipeline.json --region 'us-east-1'
  • CLI結果

{
"pipelineName": "aks_datalake_iota_pipeline",
"pipelineArn": "arn:aws:iotanalytics:us-east-1:xxxxxxxxxxxx:pipeline/aks_datalake_iota_pipeline"
}

ファイルmypipeline.jsonには次のコードが含まれています。

  • JSONファイルの中身

{
"pipelineName": "aks_datalake_iota_pipeline",
"pipelineActivities": [
{
"channel": {
"name": "mychannelactivity",
"channelName": "aks_datalake_raw_iota_channel",
"next": "mystoreactivity"
}
},
{
"datastore": {
"name": "mystoreactivity",
"datastoreName": "aks_datalake_silver_iota_datastore"
}
}
]
}

サンプルデータをAWS IoT Analyticsチャネルに取り込む

下記、Unixシェルスクリプトは、次のコードでサンプルMQTTメッセージを生成します(お客様環境に合わせて、mqtttopicregion、およびprofileパラメーターを置き換えます)。 AWSデモ環境用に利用可能なAWS CLIが用意されている場合、ラップトップまたはAmazon EC2からシェルスクリプトを実行します。 このユースケースでは、1000メッセージ(iterations = 1000)を送信しています。

#!/bin/bash

mqtttopic='iota/topic/myhome_raspberrypi'
iterations=1000
wait=5
region='us-east-1'
profile='default'

for (( i = 1; i <= $iterations; i++)) {

  CURRENT_TS=`date +%s`
  DEVICE="P0"$((1 + $RANDOM % 5))
  FLOW=$(( 60 + $RANDOM % 40 ))
  TEMP=$(( 15 + $RANDOM % 20 ))
  HUMIDITY=$(( 50 + $RANDOM % 40 ))
  VIBRATION=$(( 100 + $RANDOM % 40 ))

  # 3% chance of throwing an anomalous temperature reading
  if [ $(($RANDOM % 100)) -gt 97 ]
  then
    echo "Temperature out of range"
    TEMP=$(($TEMP*6))
  fi

  echo "Publishing message $i/$ITERATIONS to IoT topic $mqtttopic:"
  echo "current_ts: $CURRENT_TS"
  echo "deviceid: $DEVICE"
  echo "flow: $FLOW"
  echo "temp: $TEMP"
  echo "humidity: $HUMIDITY"
  echo "vibration: $VIBRATION"

  aws iot-data publish --topic "$mqtttopic" --payload "{\"deviceid\":\"$DEVICE\",\"current_ts\":$CURRENT_TS,\"flow\":$FLOW,\"temp\":$TEMP,\"humidity\":$HUMIDITY,\"vibration\":$VIBRATION}" --profile "$profile" --region "$region"

  sleep $wait
}

Unixシェルスクリプトが開始されると、次の出力が表示されます。

Publishing message 1/ to IoT topic iota/topic/myhome_raspberrypi:
current_ts: 1559504319
deviceid: P03
flow: 92
temp: 29
humidity: 81
vibration: 127
Publishing message 2/ to IoT topic iota/topic/myhome_raspberrypi:
current_ts: 1559504324
deviceid: P01
flow: 67
temp: 21
humidity: 87
vibration: 134
……

サンプルスクリプトは、AWS IoT Analyticsチャネル– aks_datalake_raw_iota_channelにデータを取り込みます。 AWS IoT Analyticsは取り込み時に、チャネルとデータストアの両方で1日あたりのデータを分割し、クエリパフォーマンスを支援します。 チャンネルとデータストアのそれぞれのS3バケットに移動します。 次のスクリーンショットのようなデータが表示されることを確認します。

次のグラフに示すように、AWS IoT Analyticsコンソールで、チャンネルaks_datalake_raw_iota_channelを選択してIncomingMessagesを確認します。

データセットコンテンツをS3に配信するデータセットを作成する

ストリーミングデータは、パイプラインを使用してAWS IoT Analyticsチャネルとデータストアに取り込まれています。 ストレージタイプとしてお客様管理のAmazon S3バケットを使用して、aks_datalake_gold_iota_datasetという名前のAWS IoT Analyticsデータセットを作成します。

先日、AWS IoT Analyticsは、SQLデータセットの更新間隔の高速化をサポートする新しい拡張機能をリリースしました。これにより、SQLデータセットを1分間隔で更新できるようになりました。 この例では、次のコードに示すように、5分ごとにスケジュールを更新します。 IAMロールARNを先ほどメモしたものに置き換え、aksプレフィックスを独自のプレフィックスに置き換えます。

  • CLI
aws iotanalytics create-dataset --cli-input-json file://mydataset.json --region 'us-east-1'
  • CLI結果

{
"datasetName": "aks_datalake_gold_iota_dataset",
"datasetArn": "arn:aws:iotanalytics:us-east-1:xxxxxxxxxxxx:dataset/aks_datalake_gold_iota_dataset"
}

ファイルmydataset.jsonには次のコードが含まれています。

  • JSONファイルの中身

{
"datasetName": "aks_datalake_gold_iota_dataset",
"actions": [
{
"actionName": "myaction",
"queryAction": {
"sqlQuery": "SELECT current_timestamp dtts, deviceid, avg(temp) avg_temp, avg(flow) avg_flow, avg(humidity) avg_humidity, avg(vibration) avg_vibration FROM aks_datalake_silver_iota_datastore where from_unixtime(cast(current_ts as double)) > current_timestamp - interval '5' minute group by deviceid"
}
}
],
"contentDeliveryRules": [
{
"destination": {
"s3DestinationConfiguration": {
"bucket": "aks-datalake-gold-iota-dataset",
"key": "aks_datalake_iota_glue_db/aks_device_telemetry_stats/!{iotanalytics:scheduleTime}_!{iotanalytics:versionId}.csv",
"glueConfiguration": {
"tableName": "aks_device_telemetry_stats",
"databaseName": "aks_datalake_iota_glue_db"
},
"roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/iota_cmsb_role"
}
}
}
],
"triggers": [
{
"schedule": {
"expression": "cron(0/5 * * * ? *)"
}
}
]
}

データセット作成後、データセットがスケジュールどおりに実行されるまで5分間待つか、AWS CLIから次のように手動で1回実行します。

aws iotanalytics create-dataset-content --dataset-name "aks_datalake_gold_iota_dataset" --region 'us-east-1'
{
 "versionId": "02f6f531-ee49-4a8e-a43d-bf808e00a26b"
}

次のコマンドを実行して、コンテンツが作成されるのを待ちます。 データセットコンテンツをS3で使用可能にするには、状態が「SUCCEEDED」と表示されている必要があります。

aws iotanalytics get-dataset-content --dataset-name "aks_datalake_gold_iota_dataset" --region 'us-east-1'
{
    "entries": [
        {
            "dataURI": "https://aws-iot-analytics-dataset-43af09f0-a8dd-418e-a13d-0a2fbc46eeee.s3.amazonaws.com/results/4e5e1fd7-0cea-46c6-ae5b-0fff951a33f4.csv?X-Amz-Security-Token=AgoGb3JpZ2luEKL%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSKAAhMtFbDcxGLCP9k3zF1jBMD3jUY5lKG5DUQPIc6CE%2F5qxkchbjfo0qVnY4qjmH04n9bCXl%2F0kY8uKBM3h%2F8p9ZWlKGmZZ01eQd2tXmmjGjUZQ3uoJfO8nMrv4lpe%2BSNOyhKhLT8sjTN7jf5hRGRDV9zWmNra1Pp1d8bMWfLiMAwe4FNy%2Bncma9xuutMuhGUuClxcxD12Ez1YanrS1qcvkLBMR3AwJAOuEDiyYOjV8GTWTDjdn%2Bz1EperZjgRd4mbCwIOTSsJu4fVkoEPuP8DJKidcSI2IE7mIit5Tpl6O0RaZ0aDy6sBVFsHHygdhpvr9LgcqgJoff4Eefez%2FgBOJRYq8gIIdxABGgwyMjE5OTE5MjIyNDciDGs74tSk4qge1k9cvCrPAqw%2Fvgbh%2FUpvbAAVOZB%2Fh%2FfW0uXyUm%2FSqAjs9kptzfI8lXxGci6etkBeFZqLuSXUSEIiy0OScGS%2Bu3sPudxQL9WgoRj0NzgReJN6Mw7IGGdBgmKouAhb1WeNq2QavtZ1jpnxGZwzKg0rC9qC7p%2Fyklx%2BGbG7xpkWynWHVnW%2FFcV2dlWcmy61aKjIo%2B5OHi1mYoP8dNRFIIofE%2BjnSwP1PX9nWwgPwOeuU6hUxduitUTDSMq30e1oAfDJp6wh43PPSGodeXCEHOKohx0bFmQ54Ua51YhiXkT7DHrRF78oxuFVuinYcIceMczpLb9lXK%2Fs1TM5g2dYFdz9Nq17vWP8XLqTVEe6eYBLcQq80ZDe4YYKgml3rFV2a4NjV1ZMXxq9C5YjZLpKTFmslewRXK7uOoLPV5UaymokNrFUF0mLff1e0r9JXTtnyJDynwCLBSt5MIGr9OcF&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20190609T141945Z&X-Amz-SignedHeaders=host&X-Amz-Expires=7200&X-Amz-Credential=ASIATHL575JD6AI7UM53%2F20190609%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=66635d16dbe1631d6fea60113a27c801e8ae1a57d6d0ec58766b790b5f54f960"
        }
    ],
    "timestamp": 1560089957.29,
    "status": {
        "state": "SUCCEEDED"
    }
}

前のステップで作成したAWS IoT Analyticsデータセットから10行を表示するには:

  1. Athenaコンソールのデータベースで、aks_datalake_iota_glue_dbを選択します。
  2. 新しいクエリ1で、aks_device_telemetry_statsという名前のAWS Glueデータカタログテーブルに対して、新しいクエリを設定し、クエリの実行を選択します。

補足:実行するクエリは以下:

SELECT * FROM "aks_datalake_iota_glue_db"."aks_device_telemetry_stats" limit 10;

AWSのデータレイクに保存されている他のデータとIoTデータを統合する

この記事の冒頭で、データレイクにaks_device_specsという名前のAWS Glueカタログテーブルを設定しました。 このテーブルは、S3(既存のデータレイク)に保存されているファイルを指し、Athenaで使用したクエリに使用できます。

aks_device_specsテーブルのデータを表示するには:

  1. Athenaコンソールのデータベースで、aks_datalake_gold_glue_db(既存のデータレイク)を選択します。
  2. 新しいクエリ1で、AWS Glueデータカタログテーブルaks_device_specsに対して、新しいクエリを設定し、クエリの実行を選択します。

AWS IoT Analyticsのaks_device_telemetry_statsテーブルのIoTデータを、既存のデータレイクのaks_device_statsテーブルと統合できるようになりました。 次のAthenaクエリを使用して、過去2時間で各デバイスが最大温度および湿度のしきい値を超えた回数を確認します。

select
        a.deviceid ,
        sum(case 
                when cast(a.avg_temp as double) > b.max_temp_c 
                then 1 
                else 0 
        end) count_max_temp_crossed,
        sum(case 
                when cast(a.avg_humidity as double) > b.max_humidity 
                then 1 
                else 0 
        end) count_max_humidity_crossed
from
        "aks_datalake_iota_glue_db"."aks_device_telemetry_stats" a
inner join "aks_datalake_gold_glue_db"."aks_device_specs" b on a.deviceid = b.deviceid
where date_parse(substr(dtts,1,19),'%Y-%m-%d %H:%i:%s') > current_timestamp - interval '2' hour
group by 1
order by 1;

次の表に結果を示します。

ここで、aks_device_telemetry_statsデータの5分間の平均を使用する代わりに、AWS IoT Analyticsデータストア(aks_datalake_silver_iota_datastore)に保存されている最小粒度のデータに対して同じしきい値チェックを実行します。 このチェックにより、デバイスからの実際の測定値の浮き沈みが損なわれるのを防ぎます。

AWS IoT AnalyticsデータストアS3バケット(aks-datalake-silver-iota-datastore)には、ニアリアルタイムで最も低い粒度のテレメトリデータが既にあります。 Athenaを使用して同じデータをクエリする前に、AWS Glueクローラーを設定して、5分ごとに実行する必要があります。これは、設定可能な最低スケジュールの頻度です。 aks_datalake_silver_iota_datastoreという名前のAWS Glueカタログテーブルを、ニアリアルタイム(5分以内のデータ取り込み)でテレメトリデータで最新の状態に保ちます。

aws glue create-crawler --cli-input-json file://mycrawler.json --region 'us-east-1'

ファイルmycrawker.jsonには次のコードが含まれています。

{
    "Name": "aks_datalake_silver_iota_datastore_glue_crawler",
    "Role": "arn:aws:iam::xxxxxxxx:role/iota_cmsb_role",
    "DatabaseName": "aks_datalake_iota_glue_db",
    "Description": "",
    "Targets":
        {
            "S3Targets": [
                    {
                        "Path": "s3://aks-datalake-silver-iota-datastore/myhome_raspberrypi/datastore/aks_datalake_silver_iota_datastore"
                    }
                ]
        },
    "Schedule": "cron(0/5 * * * ? *)"
}

AWS Glueクローラーが正常に作成されたら、クローラーがスケジュールどおりに実行されるまで5分間待つか、次のコマンドを使用してAWS CLIから手動で1回実行します。

aws glue start-crawler --name "aks_datalake_silver_iota_datastore_glue_crawler" --region "us-east-1"

クローラーの実行が完了したら、AWS IoT Analyticsデータストアからの最小粒度データを使用して次のAthenaクエリを実行します。 過去2時間に各デバイスが最大温度および湿度のしきい値を超えた回数を確認します。

select
        a.deviceid ,
        sum(case 
                when cast(a.temp as double) > b.max_temp_c 
                then 1 
                else 0 
        end) count_max_temp_crossed,
        sum(case 
                when cast(a.humidity as double) > b.max_humidity 
                then 1 
                else 0 
        end) count_max_humidity_crossed
from "aks_datalake_iota_glue_db"."aks_datalake_silver_iota_datastore" a
inner join "aks_datalake_gold_glue_db"."aks_device_specs" b on a.deviceid = b.deviceid
where from_unixtime(current_ts) > current_timestamp - interval '2' hour
group by 1
order by 1;

次の表に結果を示します。

最小粒度のデータから、デバイスが最大温度および湿度のしきい値を超えた回数は、5分間の平均に比べてはるかに多いことに注目してください。 他にも多くの統合ユースケースを実行できます。 たとえば、データレイクに保存されているサードパーティの気象データと統合して、気象の変化と相関があるかどうかを確認できます。

また、Amazon QuickSightを使用して、aks_device_specs(データレイク内のデバイス仕様データ)からの最大温度と湿度のしきい値でオーバーレイされたaks_datalake_silver_iota_datastoreからの過去2時間の最低粒度データを可視化できます。 次のサンプルAthenaクエリを使用します(Amazon QuickSightのドキュメントを参照して、AthenaクエリでAmazon QuickSightデータセットを作成します)。

select 
    from_unixtime(current_ts) as timestamp ,
    a.deviceid ,
    a.temp ,
    a.humidity,
    b.min_temp_c ,
    b.max_temp_c ,
    b.min_humidity,
    b.max_humidity 
from "aks_datalake_iota_glue_db"."aks_datalake_silver_iota_datastore" a 
inner join "aks_datalake_gold_glue_db"."aks_device_specs" 
b on a.deviceid = b.deviceid where from_unixtime(current_ts) > current_timestamp - interval '2' hour

まとめ

この投稿では、2つの新しいAWS IoT Analytics機能を使用して、IoTデータをAWSのデータレイクに保存されている残りのデータと統合する方法を学びました。 お客様管理のAmazon S3を使用して、独自のプロジェクトでデータセットコンテンツをAmazon S3機能に配信してみてください。

この記事が有益であり、お客様のソリューションに役立つことを願っています。 いつものように、AWSはフィードバックを歓迎します。 以下のコメントまたは質問を送信してください。

 

原文はこちらをご覧ください。

このブログ記事はソリューションアーキテクトの倉光が翻訳しました。