Amazon Web Services ブログ

AWS Machine Learning と IoT サービスを使用して、芝生のモニタリングと雑草検出ソリューションを構築する

住宅の新規購入者は、芝生のお手入れを効果的に管理するにはどうしたらよいかと頭を抱えることがよくあります。あなたが農家だとして、何平方メートルもの土地にこれをしなければならない場面を想像してみてください。農家として通常直面する課題には、いつ (水を撒くのに適切な時期はいつか)、どこで (水を撒いたり肥料を与えたりするエリアはどこにするか)、 どうやって (雑草の処理方法) 管理するかが含まれます。アメリカ雑草科学協会 (WSSA) が実施した調査研究では、手入れしていない雑草がトウモロコシと大豆の作物にもたらす損失は、年間合計 430 億 USD に上るという結果が出ています。詳細については、WSSA ウェブサイトの「WSSA Calculates Billions in Potential Economic Losses from Uncontrolled Weeds」を参照してください。

この問題を解決するために、コンピュータビジョンと機械学習 (ML) の分野の最新のテクノロジーを活用できたらどうでしょうか?

これは解決するのが難しい問題であり、多くの企業がソリューションを考え出そうと取り組んでいます。この記事では、AWS Starter Kit をどのように使用開始し、ソリューションを構築するかについて説明します。ソリューションには次の 2 つのコンポーネントがあります。

  • 画像分類と AWS DeepLens を使用した雑草検出
  • AWS IoT を使用して、芝生の状態 (土壌の水量、肥沃度レベル、太陽光) をほぼリアルタイムでモニタリング

前提条件

このソリューションを実装するには、次の前提条件が必要です。

  • AWS DeepLens
  • Raspberry Pi
  • 土壌水分センサー (この記事では、Xiaomi の FlowerCare センサーを 4 つ使用しています)

画像分類を使用した雑草の検出

除草剤の耐性がますます一般的になるにつれて、雑草防除の重要性が農業分野で増してきています。農場の生産性を高めるには、雑草を検出し、予防措置を早期に講じることが重要です。これが AWS DeepLens で可能になりました。AWS DeepLens は、世界で初めての開発者向け深層学習対応ビデオカメラです。AWS DeepLens では、完全にプログラム可能なビデオカメラ、チュートリアル、コード、それに深層学習スキルを補うことができるように設計された事前トレーニング済みモデルを数分で使用開始できます。また、AWS DeepLens により、深層学習モデルに基づいてコンピュータビジョンアプリケーションを開発するための最新の人工知能 (AI) ツールとテクノロジーを学習し探索することもできます。

AWS DeepLens を使用すると、画像に写ったものが雑草か草かをリアルタイムで分類できます。AWS DeepLens をマイクロコントローラーに接続して、スプレーをトリガーして雑草を除草することもできます。

この記事では、Amazon SageMaker の画像分類アルゴリズムを使って雑草を検出する方法を説明します。Amazon SageMaker は、フルマネージド ML サービスです。Amazon SageMaker を使用すると、ML モデルをすばやく簡単に構築してトレーニングすることができます。また、用意したモデルを実稼動も可能なホステッド環境に直接デプロイすることもできます。Amazon SageMaker は統合された Jupyter ノートブックを提供し、調査や分析のためにデータソースに簡単にアクセスできるため、サーバーを管理する必要はありません。この記事では、Amazon SageMaker のフルマネージドオンデマンドトレーニングインフラストラクチャでモデルをトレーニングする方法を説明します。さらに、トレーニング済みモデルを AWS DeepLens にデプロイする方法も示します。

次のアーキテクチャ図は、雑草検出ソリューションの概要を示しています。

このソリューションは、Amazon SageMaker の画像分類アルゴリズムを転移学習モードで使用します。これにより、事前トレーニング済みモデル (ImageNet データでトレーニング済み) を微調整して、新しいデータセットを分類できます。転移学習を使用すると、モデルを一からトレーニングする必要があった場合と比べて、大幅に少ないデータでディープネットワークをトレーニングできます。転移学習では、基本的に、モデルが以前のタスクで学習した知識を現在のタスクへ伝達します。ポイントは、2 つのタスクがばらばらではなく、モデルがその広範なトレーニングを通じて学んだネットワークパラメータを使用できるという点にあります。その際、自分でトレーニングする必要はありません。

アーキテクチャには次の手順が含まれます。

  1. 雑草と草で構成される画像のデータセットをローカルコンピュータにダウンロードします。ローカルコンピュータの 2 つの異なるフォルダに画像を整理します。
  2. RecordIO ツール im2rec を使用して、2 つの .lst ファイルを作成します。1 つのファイルは、データセットのトレーニングパート用です (80%)。もう 1 つはテスト用です (20%)。詳細については、MXNet ウェブサイトの「Create a Dataset Using RecordIO」を参照してください。
  3. .lst ファイルから両方の .rec ファイルを生成し、両方の .rec ファイル (トレーニングおよび検証イメージ) を Amazon S3 にコピーします
  4. 転移学習モードで Amazon SageMaker 画像分類アルゴリズムを使用してモデルをトレーニングします。
  5. モデルをトレーニングした後、トレーニングジョブはモデルアーティファクトを S3 バケットにアップロードします。または、トレーニングされたモデルを AWS DeepLens にデプロイして、エッジでリアルタイム推論を行います。

以下のセクションでは、ソリューションの実装について詳しく説明します。

データセットを理解する

Kaggle ウェブサイトの Open Sprayer 画像データセットには、広葉樹のドックの写真 (雑草)と広葉樹のドックのない土地の写真 (草) が含まれています。データセットには、1,306 枚の雑草の画像と 5,391 枚の草の画像が含まれており、通常のサイズは約 256 x 256 ピクセルです。データセットは、推奨されるトレーニングと検証の分割を可能にします。次の画像は、雑草と草の画像のコレクションを示しています。

画像データセットを用意する

この記事では、パイプモードを使用した画像分類に、RecordIO ファイル形式をどう使用するかを説明します。パイプモードでは、トレーニングジョブは Amazon S3 から直接データをストリーミングします。パイプモードを使用すると、トレーニングインスタンスの Amazon EBS ボリュームのサイズを縮小できます。詳細については、「Amazon SageMaker アルゴリズムでパイプ入力モードを使用する」を参照してください。画像分類アルゴリズムは、ファイルモードまたはパイプ入力モードで使用できます。大規模なデータセットにはパイプモードが推奨されますが、ファイルモードは、メモリに収まる小さなファイルやアルゴリズムに多数のエポックがある場合にも役立ちます。

MXNet には、データセットの RecordIO ファイルを作成するための im2rec というツールが用意されています。このツールを使用するには、一連の画像を説明するリストファイルを用意します。im2rec の詳細については、im2rec GitHub リポジトリを参照してください。

画像データセットを用意するには、ローカルの Python インタープリターを使用するか、Amazon SageMaker の Jupyter ノートブックを使い、次の手順を実行します。これらの手順は、画像データセットを保存したパスから実行します。

  1. im2rec.py を使用してリスト化ファイルを生成します。次のコードを参照してください。

    python3 im2rec.py --list --recursive --train-ratio .75 --test-ratio .25 </path/to/folder>
  2. 次のコードを入力して、im2rec ユーティリティを使用して RecordIO ファイルを作成します。
    python3 im2rec.py --num-thread 2 --resize 50 --quality 80 </path/to/folder>

    im2rec は次のパラメータを取ります。

    • list – im2rec はルートフォルダをたどってイメージリストを作成し、<prefix>.lst に出力します。
    • recursive – 再帰的にサブディレクトリを一通り実行し、各フォルダの画像に一意のラベルを割り当てます。
    • train-ratio – トレーニングに使用する画像の比率。
    • test-ratio – テストに使用する画像の比率。
    • num-thread – エンコードに使用するスレッドの数。この値が大きいほど、処理が速くなります。
    • resize – 画像の短い方のエッジのサイズを新しいサイズに変更します。元の画像はデフォルトでパックされます。
    • quality – エンコーディングの JPEG 品質は 1~100。またはエンコード用の PNG 圧縮は 1~9 (デフォルト: 95)。

詳細については、「MXNet によりシンプルに: Image RecordIO と im2rec およびデータのロード」を参照してください。

コードの実装

ソリューションの各ステップを示すために、この記事では GitHub リポジトリの Jupyter ノートブックを使用しています。

S3 バケットのセットアップ

S3 バケットを作成して、モデルアーティファクトに加えて、トレーニングファイルと検証ファイルを RecordIO 形式でアップロードします。SageMaker Python SDKdefault_bucket() セッションパラメータを使用します。S3 バケットを手動で作成する手順については、「バケットの作成」をご覧ください。

bucket = sagemaker.Session().default_bucket()

RecordIO ファイルは、画像分類アルゴリズムへの入力データとして機能します。次のように、トレーニングデータは train というサブディレクトリ内にあり、検証データは validation というサブディレクトリ内にある必要があります。

def upload_to_s3(channel, file):
    s3 = boto3.resource('s3')
    data = open (file, "rb")
    key = channel + '/' + file
    s3.Bucket(bucket).put_object(Key=key, Body=data)

s3_train_key = "image-classification-transfer-learning/train"
s3_validation_key = "image-classification-transfer-learning/validation"
s3_train = 's3://{}/{}/'.format(bucket, s3_train_key)
s3_validation = 's3://{}/{}/'.format(bucket, s3_validation_key)

upload_to_s3(s3_train_key, 'Data_train.rec')
upload_to_s3(s3_validation_key, 'Data_test.rec')

詳細については、GitHub リポジトリをご覧ください。

Amazon SageMaker 組み込みアルゴリズムを使用した画像分類モデルのトレーニング

Amazon S3 で利用できる画像が用意されたので、モデルをトレーニングする準備が整いました。この記事には、いくつかのハイパーパラメータがあります。

  • クラスの数とトレーニングサンプル
  • バッチサイズ、エポック、画像サイズ、事前トレーニング済みモデル

詳細については、「画像分類ハイパーパラメータ」を参照してください。

Amazon SageMaker 画像分類アルゴリズムでは、ml.p2.xlarge などの GPU インスタンスタイプでモデルをトレーニングする必要があります。ハイパーパラメータを次の値に設定します。

  • num_layers = 18。ネットワークのレイヤー数 (深さ)。この記事では 18 を使用していますが、50 や 152 などの他の値を使用できます。
  • image_shape = 3,224,224。ネットワークの入力画像の寸法、num_channels、高さおよび幅。実際の画像サイズ以下にする必要があります。チャネル数は実際の画像と同じにする必要があります。この記事では、ImageNet データセットと同様に、3、224、224 の画像寸法を使用しています。
  • num_training_samples = 4520。入力データセットのサンプルトレーニングの数。
  • num_classes = 2。新しいデータセットの出力クラスの数。雑草と草の 2 つのオブジェクトカテゴリがあるため、この記事では 2 を使用しています。
  • mini_batch_size = 128。各ミニバッチに使用されるトレーニングサンプルの数。
  • epochs = 10。トレーニングエポックの数。
  • learning_rate = 001。トレーニングの学習率。
  • top_k = 2。トレーニング中のトップ k の精度を報告します。トップ 1 のトレーニング精度はすでに報告されている通常のトレーニング精度と同じであるため、このパラメータは 1 より大きくする必要があります。
  • use_pretrained_model = 1。転移学習に事前トレーニング済みモデルを使用するには、1 に設定します。

データセットを Amazon S3 にアップロードし、ハイパーパラメータを設定したら、Amazon SageMaker CreateTrainingJob API を使用してトレーニングを開始できます。トレーニングに RecordIO 形式を使用しているため、CreateTrainingJob リクエストの InputDataConfig パラメータの値として、トレーニングチャネルと検証チャネルの両方を指定します。トレーニングチャネルで 1 つの RecordIO (.rec) ファイルを、検証チャネルで 1 つの RecordIO ファイルを指定します。両方のチャネルのコンテンツタイプを application/x-recordio に設定します。次のコードを参照してください。

"InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_train,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_validation,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        }
    ]

トレーニングジョブが完了すると、次のメッセージが表示されます。

Training job ended with status: Completed

出力モデルは、training_params ['OutputDataConfig'] で指定された出力パスに保存されます。

"OutputDataConfig": {
        "S3OutputPath": 's3://{}/{}/output'.format(bucket, job_name_prefix)

モデルトレーニングの完全なコードについては、GitHub リポジトリをご覧ください。

リアルタイム推論のためのモデルのデプロイ

次に、モデルを使用して推論を実行します。この記事における推論は、画像を雑草または草として予測することを意味します。

このセクションに含まれる手順は、以下のとおりです。

  1. トレーニング出力のモデルを作成します。
  2. リアルタイム推論のためのモデルをホストします。推論エンドポイントを作成し、リアルタイム推論を実行します。これは次の手順で構成されています。
    1. エンドポイントを定義する設定を作成します。
    2. 設定を使用して推論エンドポイントを作成します。
    3. エンドポイントを使用して、一部の入力データで推論を実行します。

モデルを作成する

次の Python コードを使用して、トレーニング出力から SageMaker モデルを作成します。

model_name="deeplens-image-classification-model"
info = sage.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)

hosting_image = get_image_uri(boto3.Session().region_name, 'image-classification')

primary_container = {
    'Image': hosting_image,
    'ModelDataUrl': model_data,
}

create_model_response = sage.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

リアルタイム推論

これで、エンドポイントを使用してモデルをホストし、リアルタイムの推論を実行できます。

エンドポイント設定を作成する

Amazon SageMaker ホスティングサービスがモデルのデプロイに使用するエンドポイント設定を作成します。次のコードを参照してください。

endpoint_config_name = job_name_prefix + '-epc-' + timestamp
endpoint_config_response = sage.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

エンドポイントを作成する

以前に定義した名前と設定を指定して、モデルを提供するエンドポイントを作成します。結果は、検証して本番アプリケーションに組み込むことができるエンドポイントです。これは、m4.xlarge インスタンスで完了するまでに約 9~11 分かかります。次のコードを参照してください。

endpoint_name = job_name_prefix + '-ep-' + timestamp
print('Endpoint name: {}'.format(endpoint_name))

endpoint_params = {
    'EndpointName': endpoint_name,
    'EndpointConfigName': endpoint_config_name,
}
endpoint_response = sagemaker.create_endpoint(**endpoint_params)

次のコードでエンドポイントを作成します。

response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print('EndpointStatus = {}'.format(status))
# wait until the status has changed
sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
# print the status of the endpoint
endpoint_response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))

if status != 'InService':
    raise Exception('Endpoint creation failed.')

エンドポイントの設定とステータスは、Amazon SageMaker コンソールの [Endpoints] タブで確認できます。

これで、ランタイムオブジェクトを作成し、それによりエンドポイントを呼び出すことができます。

推論を行う

モデルが使えるかどうかを検証するには、前の操作の結果を使用してクライアントライブラリからエンドポイントを取得し、そのエンドポイントを使用してトレーニング済みモデルから分類を生成します。このコードでは、以下のテストのために雑草のサンプル画像を使用しています。次のコードを参照してください。

runtime = boto3.Session().client(service_name='runtime.sagemaker')
file_name = 'rsz_photo-1.jpg'
# test image
from IPython.display import Image
Image(file_name)

import json
import numpy as np
with open(file_name, 'rb') as f:
    payload = f.read()
    payload = bytearray(payload)
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/x-image',
                                   Body=payload)
result = response['Body'].read()
# result will be in json format and convert it to ndarray
result = json.loads(result)
# the result will output the probabilities for all classes
# find the class with maximum probability and print the class index
index = np.argmax(result)
object_categories = ['weed','grass']
print("Result: label - " + object_categories[index] + ", probability - " + str(result[index]))

Result: label - weed, probability - 0.9998767375946045

AWS DeepLens を使用してエッジでモデルを実行する

AWS DeepLens を使用すると、深層学習をエッジで試すことができます。これにより、トレーニング済みモデルをデプロイし、Python コードを使用して興味深いアプリケーションを簡単に思いつくことができます。雑草の識別ツールとして、芝生を見下ろすワイヤーに AWS DeepLens デバイスをマウントできます。デバイスは、検出した雑草と作物のトリミングされた画像を Amazon S3 にフィードします。雑草を検出すると、携帯電話にテキストを送信することもできます。

AWS DeepLens プロジェクトは、トレーニング済みモデルと AWS Lambda 関数で構成されています。この関数は、AWS DeepLens で AWS IoT Greengrass を使用して次のタスクを実行します。

  • ビデオストリームから画像をキャプチャする
  • そのイメージを使用してデプロイされた ML モデルに対して推論を実行します
  • 結果を AWS IoT および出力ビデオストリームに供給する

AWS IoT Greengrass では、Lambda 関数をローカルで実行できるため、組み込みソフトウェアの開発の複雑さが軽減されます。詳細については、「AWS DeepLens 推論 Lambda 関数の作成と公開」を参照してください。

Amazon SageMaker によって生成されたカスタム画像分類モデルを使用する場合、AWS DeepLens 推論 Lambda 関数の手順がさらにあります。推論関数は、モデルを使用して推論を実行する前に MXNet のモデルオプティマイザを呼び出す必要があります。モデルを最適化して読み込むには、次のコードを参照してください。

model_path = '/opt/awscam/artifacts/image-classification.xml'
error, model_path = mo.optimize(model_name,224,224,aux_inputs={'--epoch':10}) model = awscam.Model(model_path, {'GPU': 1})

AWS DeepLens でモデル推論を実行する

Lambda 関数からのモデル推論は、Amazon SageMaker がホストするエンドポイントを使用してモデルを呼び出すための前のステップと非常に似ています。次の Python コードは、AWS DeepLens ビデオカメラが提供するフレームで雑草と草を検出します。

frame_resize = cv2.resize(frame, (512, 512))

# Run the images through the inference engine and parse the results using
# the parser API. Note it is possible to get the output of doInference
# and do the parsing manually, but since it is a ssd model,
# a simple API is provided.
parsed_inference_results = model.parseResult(
                                 model_type,
                                 model.doInference(frame_resize))

この画像分類モデルで AWS DeepLens が使用する完全な推論 Lambda 関数の詳細については、GitHub リポジトリを参照してください。Lambda 関数は、AWS IoT Greengrass SDK を使用して、テキストベースの出力を IoT トピックに JSON 形式で発行します。こちらに、IoT コンソールで出力を表示する方法の手順を示します。

以下の画像は、サンプルのセットアップを示しています。

農場の状態をほぼリアルタイムでモニタリング

適切な量の水と栄養分が農場の生産性を向上させるカギとなります。これを実現する方法の 1 つは、畑をリアルタイムでモニタリングすることです。これにより、水と肥料の消費コストを節約できます。リアルタイムのモニタリングから収集した情報は、いつ、どこで、土壌に水を撒いたり、肥料を与えたりするかを特定するのに役立ちます。

土壌の状態をモニタリングするために、この記事では 4 つの土壌センサーを使用し、2 フィート x 1 フィートの植物床のパッチをモニタリングしました。これらのセンサーにより、土壌の現在の水量と肥沃度、受けた光量、および周囲の温度を検出できます。Bluetooth が有効になっている Raspberry Pi を介してこの情報を収集します。実際の運用では、LoRa のようなはるかに長い範囲のプロトコルを使用する必要があります。Raspberry Pi はセンサーをポーリングし、この情報を収集して、AWS IoT Core に送信します。IoT ルールを使用してデータを収集し、それをリアルタイム分析のために Amazon Kinesis Data Analytics に転送します。これで、農場の特定の部分の水量がしきい値を下回ると、アラート通知がトリガーされるようになりました。この情報を使用して、スプリンクラーシステムの制御を自動化することもできます。

このシステムには、地域の気象条件をポーリングするための組み込みロジックもあります。この情報を使用して、スプリンクラーをオンにするかどうか、およびその時期を決定できます。

アーキテクチャ

次の図は、このワークフローのアーキテクチャを示しています。

アーキテクチャには、おおまかに次の手順が含まれています。

  • Raspberry Pi はエッジゲートウェイデバイスとして機能し、Bluetooth を介して土壌水分センサーから情報を収集します。
  • 認証後、Raspberry Pi は IoT トピックをサブスクライブし、IoT トピックへのメッセージの発行を開始します。
  • IoT ルールエンジンを使用して、データは Amazon Kinesis Data Firehose に移動します。データの宛先は 2 つあります。永続ストレージ用の Amazon S3 とリアルタイム処理用の Kinesis Data Analytics です。
  • Kinesis Data Analytics の出力は Lambda 関数で、水量がしきい値を下回ると SNS アラートをトリガーします。
  • 最後に、Amazon S3 データを使用して Amazon QuickSight を構築します

このアーキテクチャを準備するには、Raspberry Pi と土壌水分センサーが必要です。

IoT デバイスのセットアップ

Raspberry Pi をセットアップしてデータを生成するには、以下の手順を実行します。

  1. Raspberry Pi を設定して AWS IoT に接続します。詳細については、「Raspberry Pi で AWS IoT SDK を使用する」を参照してください
  2. SSH または PuTTY のコマンドプロンプトで、最新バージョンの Python AWS IoT SDK を Raspberry Pi にダウンロードします。pip3 プログラムコマンドを使用できます。次のコードを参照してください。
    pip3 install AWSIoTPythonSDK

    miflora センサーに使用される bltewrap ライブラリは Python 3.4 以降でのみ機能するため、Python 3.4 以降を使用してください。

  3. miflora センサーライブラリをダウンロードして、次のコードでセンサーと通信します。
    pip3 install miflora

    このコードは、miflora に必要な bltewrap ライブラリも自動的にダウンロードします。詳細については、GitHub リポジトリをご覧ください。

  4. farmbot.py スクリプトを GitHub からダウンロードし、次のパラメータを更新します。
    • clientID – IoT および IoT シャドウクライアント ID (ステップ 1 から)
    • configureEndpoint – AWS マネジメントコンソールからの IoT および IoT シャドウエンドポイント
    • configureCredentials – Raspberry Pi 上の IoT および IoT シャドウクレデンシャルの場所 (ステップ 1 から)
    • Sensors – センサーの MAC アドレス値を更新
  1. Raspberry Pi に SSH 接続し、Python py スクリプトを開始します。スクリプトは、前のステップで設定した土壌水分センサーをポーリングします。次のコードを参照してください。
    python3 farmbot.py

Lambda 変換を使用して Firehose 配信ストリームを作成する

この記事では、Amazon Kinesis Data Firehose 配信ストリームを使用してデータをストリーミングしています。Firehose 配信ストリームからのデータを Kinesis Data Analytics の入力データとして使用し、Amazon QuickSight ダッシュボードを構築するために Amazon S3 に保存します。配信ストリームを作成するには、次の手順を実行します。

  1. Kinesis Data Firehose コンソールで、[配信ストリームの作成] を選択します。
  2. 名前が「IoT-data-Stream」の Firehose 配信ストリームを作成します。
  3. ソースをデフォルトの Direct PUT または他のソースとして選択し、[サーバー側の暗号化] をデフォルトのままオフにします。
  4. [次へ] を選択します
  5. [AWS Lambda の変換元レコード] では、[有効] を選択します。
  6. [新規作成] を選択します。
  7. この Lambda 関数を使用して、配信ストリームから Amazon S3 に送信されたデータの各レコードの最後に改行文字を追加します。詳細については、GitHub リポジトリをご覧ください。

これは、Amazon QuickSight で視覚化データを作成するために必要です。次のコードを参照してください。

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
  
    /* Process the list of records and transform them */
    /* The following must be the schema of the returning record 
      Otherwise you will get processing-failed exceptions
      {recordId: <id>, result: 'Ok/Processing/Failed', data: <base64 encoded JSON string> } 
    */ 
    const output = event.records.map((record) => ({
        /* This transformation is the "identity" transformation, the data is left intact */
        recordId: record.recordId,
        result: 'Ok',
        data: record.data+"Cg==",
    }));
    console.log(`Processing completed. Successful records ${output.length}.`);
    callback(null, { records: output });
};
  1. レコード形式の変換 では、[無効] を選択します。
  2. [次へ] を選択します。
  3. [宛先] では、[Amazon S3] を選択します。
  4. <your unique name>-kinesis」という名前の新しい S3 バケットを作成します。
  5. 他のすべてのフィールドはデフォルトのままにします。
  6. [次へ] を選択します。
  7. [S3 バッファ条件] の [バッファ間隔] に、「60 秒」を入力します。
  8. [バッファサイズ] には、「1 MB」と入力します。
  9. [IAM ロールの作成] を選択します。Kinesis Data Firehose はこのロールを使用してバケットにアクセスします。
  10. 他のすべてのフィールドはデフォルトのままにします。
  11. [次へ] を選択します。
  12. すべてのフィールドを確認し、[配信ストリームの作成] を選択します。

AWS IoT をセットアップして、データを配信ストリームに転送する

データを配信ストリームに転送するには、次の手順を実行します。

  1. AWS IoT Core コンソールで、[アクション] を選択します。
  2. [ルールの作成] を選択します。
  3. 次のフィールド値を使用して新しい AWS IoT ルールを作成します
     Name: 'IoT_to_Firehose'
     Attribute :' * '
     Rule Querry Statement : SELECT * FROM 'farmbot/#'
     Add Action : " Send messages to an Amazon Kinesis Data Firehose stream (select    IoT-Source-Stream from the Stream name dropdown)"
     Select Separator: " \n (newline) "
  4. IAM ロールについては、[ロールの新規作成] を選択します。

コンソールは、AWS IoT が Kinesis Data Firehose にアクセスするための適切なアクセス許可を持つ IAM ロールを作成します。既存のロールを使用する場合は、ドロップダウンメニューからロールを選択し、[ロールの更新] を選択します。これにより、選択した IAM ロールに必要なアクセス許可が追加されます。

分析アプリケーションの宛先を作成する

分析アプリケーションの宛先を作成するには、以下の手順を実行します。

  1. Amazon SNS コンソールでトピックを作成し、E メールまたはテキストでサブスクライブします。詳細については、「Amazon SNS の開始方法」のステップ 1 とステップ 2 を参照してください。
  2. SNS トピックの ARN をテキストファイルにコピーします。

データを処理する分析アプリケーションを作成する

分析アプリケーションを作成するには、以下の手順を実行します。

  1. Amazon Kinesis コンソールで、[データの分析] を選択します。
  2. [アプリケーションの作成] を選択します。
  3. アプリケーション名として、「Farmbot-application」と入力します
  4. ランタイムには、[SQL] を選択し、[アプリケーションの作成] をクリックします
  5. ソースとして、[ストリーミングデータの接続] を選択し、[ソースの選択] をクリックします。
  6. [Kinesis Data Firehose 配信ストリーム] を選択します。
  7. [IoT-Source-Stream] を選択します。
  8. [Lambda で前処理を記録する] はそのままにします
  9. [アクセス許可] で、コンソールに Kinesis Data Analytics で使用する IAM ロールを作成および更新させます。

Kinesis Data Analytics が引き受けることができる適切な IAM ロールをすでに持っている場合は、ドロップダウンメニューからそのロールを選択します。

  1. [スキーマの発見] を選択します。
  2. アプリケーションが結果を表示するのを待ち、[保存して続行] を選択します。

すべてを正しく設定した場合は、センサー名、パラメータ、およびタイムスタンプを含むテーブルが表示されます。次のスクリーンショットを参照してください。

  1. リアルタイム分析処理では、[SQL エディタに移動] を選択します。
  2. 次のパラメータを入力します。
  • SOURCE_SQL_STREAM_001 – 入力ストリームからの値とタイムスタンプを持つ名前とセンサーパラメータを含めます。
  • INTERMEDIATE_SQL_STREAM – Moisture 値が 25 未満のすべてのレコードを含め、より少ないパラメータにフィルタリングします (この記事では Moisture および Conductivity)。
  • DESTINATION_SQL_HHR_STREAM – 指定した列の 2 分のスライディングウィンドウで集計行に対して関数を実行します。センサーが常に 2 分間低湿度レベルを報告しているかどうかを検出します。

次のコード例を参照してください。

-- Create an output stream with seven columns, which is used to send IoT data to the destination
SELECT STREAM "Moisture","Temperature","Name","Conductivity","Light","Battery","DateTime" FROM "SOURCE_SQL_STREAM_001";


CREATE OR REPLACE STREAM "INTERMIDIATE_SQL_STREAM" (Moisture INT, Conductivity INT, Name VARCHAR(16));
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "INTERMIDIATE_SQL_STREAM"

-- Select all columns from source stream
SELECT STREAM "Moisture","Conductivity","Name"
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE "Moisture" < 25;


-- ** Aggregate (COUNT, AVG, etc.) + Sliding time window **
-- Performs function on the aggregate rows over a 2 minute sliding window for a specified column.
--          .----------.   .----------.   .----------.             
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               

CREATE OR REPLACE STREAM "DESTINATION_SQL_HHR_STREAM" (Name VARCHAR(16), high_count INTEGER);
-- Create a pump which continuously selects from a source stream (SOURCE_SQL_STREAM_001)

-- performs an aggregate count that is grouped by columns ticker over a 2-minute sliding window
CREATE OR REPLACE PUMP "STREAM_PUMP_002" AS INSERT INTO "DESTINATION_SQL_HHR_STREAM"
-- COUNT|AVG|MAX|MIN|SUM|STDDEV_POP|STDDEV_SAMP|VAR_POP|VAR_SAMP)
SELECT STREAM *
FROM (
      SELECT STREAM 
             Name,
             COUNT(*) OVER THIRTY_SECOND_SLIDING_WINDOW AS high_count
      FROM "INTERMIDIATE_SQL_STREAM"
      WINDOW TWO_MINUTE_SLIDING_WINDOW AS (
        PARTITION BY Name
        RANGE INTERVAL '2' MINUTE PRECEDING)
) AS a
WHERE (high_count >3);

[リアルタイム分析] タブで、SQL クエリの結果を確認できます。次のスクリーンショットを参照してください。

分析アプリケーションの宛先を接続する

分析アプリケーションの宛先を接続するには、以下の手順を実行します。

  1. [宛先] では、[AWS Lambda 関数] を選択します。
  2. 新しい Lambda 関数を作成し、Lambda ブループリントを選択します。
  3. 検索バーに「SNS」と入力します。
  4. [kinesis-analytics-output-sns] を選択します。
  5. 関数名を入力します。
  6. JSON の出力形式を選択する
  7. 実行ロールについては、[基本的な Lambda 権限で新しいロールを作成する] を選択します。

Lambda 関数はアプリケーションの結果を処理し、作成した SNS トピックに結果を送信します。

  1. 次の関数コードを変更し、記録したトピック ARN を追加します。
    import base64
    import json
    import boto3
    from botocore.vendored import requests
    
    #import requests
    
    snsClient = boto3.client('sns')
    print('Loading function')
        
        for record in event['records']:
            # Kinesis data is base64 encoded so decode here
            print 'Number of records {}.'.format(len(event['records']))
            payload = json.loads(base64.b64decode(record['data']))
            #payload = base64.b64decode(record['kinesis']['data'])
            print payload
            response = snsClient.publish(
            TopicArn= 'arn:aws:sns:<region>:<account_id>:<topic_name>',
            Message='Based on current soil moisture level you need to water your lawn! ',
            Subject='Lawn maintenance reminder ',
            MessageStructure='string',
            MessageAttributes={
            'String': {
                'DataType': 'String',
                'StringValue': 'New records have been processed.'
                }
            }
        )
        return 'Successfully processed {} records.'.format(len(event['records']))
  2. [関数の作成] を選択します。
  3. Kinesis Data Analytics コンソールの [アプリケーション内ストリーム] で、[既存のアプリケーション内ストリーム] を選択します。
  4. ドロップダウンメニューから、JSON 出力形式で [DESTINATION_SQL_HHR_STREAM] を選択します。
  5. [アクセス許可] で、Kinesis Data Analytics で使用する IAM ロールの作成または更新を選択します。

Kinesis Data Analytics が引き受けることができる適切な IAM ロールをすでに持っている場合は、ドロップダウンメニューからそのロールを選択します。

Amazon QuickSight に接続してデータを視覚化する

Amazon QuickSight ダッシュボードを構築するには、Kinesis Data Firehose から JSON 生データを取り込む必要があります。次の手順を実行します。

  1. コンソールで、[QuickSight] を選択します。

Amazon QuickSight を初めて使用する場合は、新しいアカウントを作成する必要があります。

  1. ログイン後、[新しい分析] を選択します。
  2. [新しいデータセット] を選択します。
  3. 利用可能なソースから、[S3] を選択します。
  4. データソース名を入力します。
  5. マニフェストファイルを選択してアップロードします。

マニフェストファイルは、S3 バケットの場所と Amazon S3 のデータ形式を示します。詳細については、「Amazon S3 マニフェストファイルでサポートされている形式」を参照してください。次のコード例を参照してください。

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"
                ]}
     ],
     "globalUploadSettings": { 
     "format": "JSON"
    }
}

Amazon QuickSight はデータをインポートして解析します。

  1. 取り込んだデータを変換してフォーマットするには、[データの編集/プレビュー] を選択します。
  2. フォーマットが完了したら、[保存してデータを視覚化] を選択します。

視覚化画面で分析を作成するには、以下の手順を実行します。

  1. [ビジュアルタイプ] では、[折れ線グラフ] を選択します。
  2. [DateTime] フィールドを Field Wells X 軸にドラッグアンドドロップします。
  3. LightValue に移動します
  4. NameColor に移動します

これにより、4 つのセンサー位置の一定期間の光量を示すダッシュボードが作成されます。

  1. [DateTime] 集計オプションを DAY から MINUTE に変更します。
  2. LightSum から Average に変更します。
  3. X 軸、Y 軸、または凡例のプロパティを変更するには、表示形式のオプションを選択します。

次のグラフは、時間の経過に伴う平均光量を示しています。同様に、水分、温度、土壌の肥沃度レベル用に 1 つ作成できます。

まとめ

この記事では、AWS DeepLens と Amazon SageMaker の組み込みの画像分類アルゴリズムを使用して、公開されているデータセットに基づいて草から雑草を見分け、Amazon Kinesis と AWS IoT を使用してリアルタイムの芝生監視システムを構築する方法を示しました。Amazon QuickSight を使用してデータを視覚化することもできます。この例を複製して、独自のユースケースに合わせて拡張することができます。


著者について

Ravi Gupta は、アマゾン ウェブ サービスのエンタープライズソリューションアーキテクトです。彼は情熱的なテクノロジー愛好家で、お客様と協力することを楽しみ、革新的なソリューションの構築を支援しています。彼の主な重点分野は、IoT、分析、機械学習です。余暇は、家族と過ごしたり、写真撮影を楽しんだりしています。

 

 

 

Shayon Sanyal は、アマゾン ウェブ サービスのグローバル金融サービス部門のシニアデータアーキテクト (データレイク) です。彼は AWS のお客様とパートナーが複雑で困難な AI/ML、IoT、データ分析の問題を解決できるようにし、データ駆動型エンタープライズのビジョンの実現を支援しています。余暇には、旅行、ランニング、ハイキングを楽しんでいます。