機械学習を使って魚の画像判定アプリを作ってみた !

2021-11-02
日常で楽しむクラウドテクノロジー

Author : 奥村 美生 (監修 : 岡田 信夫)

こんにちは。AWS ソリューションアーキテクト インターン生の奥村美生です。インターン生として 3 ヶ月間、AWS でお世話になりました。

私は水族館巡りの全国制覇を夢に持つほど水族館が好きです。あるときこんな経験をしました。「この魚、写真を撮ったけど何て魚なんだろう。人が多くてパネルを探すのも大変。鱗が赤いから、インターネットで「魚 鱗 赤 名前」なんて調べてみたけど、検索結果が多すぎてわからない・・・。」誰でも簡単に始められて、魚の名前を教えてくれる。そんな便利なアプリを作ってみたいと思ったことが制作のきっかけです。

今回インターンで制作した、魚の画像を送ったら機械学習を使って判定してくれるアプリ「Aqua passha (アクア・パッシャ)」をご紹介いたします。イタリアの郷土料理「Acqua pazza (アクアパッツァ)」と、カメラのシャッター音「パシャッ」をかけまして、「Aqua passha (アクアパッシャ)」と名付けました。「Aqua passha」は、LINE と連携しています。例えば水族館で、名前や生息地などの情報が知りたい魚を撮影し LINE 上で画像を送信すると、機械学習で魚を判定し、魚の情報が LINE に送られてきます。

まずは、出来上がったアプリをご覧ください。


1. アーキテクチャ

アーキテクチャは以下の図のようになります。

img_ml-fish-search-app_01

フロントエンドについては、LINE を使用しており、LINE Messaging API を介して AWS 上のシステムと連係します。LINE Messaging API はユーザーからのメッセージを受けると、Webhook で指定した Amazon API Gateway へリクエストを送信します。API Gateway と繋がっている AWS Lambda で、署名の検証を行います。署名の検証とは、「意図した送信元 (LINE Platform) から送られてきたかどうか」を確認することを言います。そして、AWS Step Functions を実行します。ここで一旦同期処理が終了します。

そこから順番に、LINE に送られてきた画像を Amazon S3 に保存し、その画像を Amazon Rekognition のモデルで推論します。推論結果で出た魚の名前をキーとし、Amazon DynamoDB から魚の情報を取得します。それらを、次の Lambda に渡し、LINE にメッセージを送ります。ちなみに、モデルに使用した魚の画像は、全て京都水族館に実際に行って撮影して用意しました。

リポジトリは GitLab を使用しており、AWS CodeCommit にミラーリングします。ここから、AWS CodePipeline によって、CI/CD パイプラインを構築しています。


2. AWS SAM のインストール

「SAM (Serverless Application Model)」は、サーバーレスアプリケーション構築用のオープンソースフレームワークです。リソースごとにわずか数行で、任意のアプリケーションを定義して YAML を使用してモデリングできます。デプロイ中、SAM が SAM 構文を AWS CloudFormation 構文に変換および拡張することで、サーバーレスアプリケーションの構築を高速化することができます。」SAMについて、詳しくは こちら を参照してください。

今回は、

  • チャット Bot はリクエストを受けて返すだけなので、常時サーバーを動かしておく必要がない。
  • チャット Bot なので、多少の遅延もユーザー体験的には問題にならない。
  • API Gateway と Lambda を繋ぐ作業や、IAM ロールの権限を付けたりすることが、sam init というコマンドを実行するだけでテンプレートから開発を進めることができる。
  • コンパクトで宣言型の JSON/YAML テンプレートでインフラストラクチャを定義できる。
  • AWS CodePipeline で実行する CI/CD のパイプライン処理を sam pipeline init –bootstrap というコマンドを実行するだけで簡単に作ることができる。

といった、サーバーレスのメリットが活かしやすいシステムであるという理由から、SAM を使用することにしました。SAM のデベロッパーガイドに、SAM の開始方法が記載されています。こちらの「チュートリアル:Hello World アプリケーションのデプロイ」を見ながら、SAM を使用してサンプルの Hello World アプリケーションをダウンロード、構築、およびデプロイします。


3. LINE Depvelopers のセットアップ

このステップで行う処理はこちらです。

今回作成したアプリは、ユーザーに簡単に初めていただけるように、(アプリをダウンロードする手間が省けるために) LINE アプリと連携させました。

UI (ユーザーインターフェース) の開発、メンテナンスコスト削減にもなります。そのため、まずは LINE Developers のコンソールから、チャネルを作成します。

img_ml-fish-search-app_02

私が作成したチャネルだと、こちらの通りです。チャネルの作り方は こちら を参照してください。

img_ml-fish-search-app_03

4. Amazon API Gatewayと AWS Lambda で、LINE にメッセージが送れるかどうか確認

まずは、ここで一気に作り始めてしまうのではなく、API Gateway, Lambda, LINE が繋がる一連の流れを作成します。SAM の雛形をダウンロードしたことで、API Gateway と Lambda はもう繋がっている状態からのスタートです。

まずは、API Gateway のエンドポイントを LINE の Webhook に設定することで、LINE Messaging API と API Gateway を繋げることができます。

img_ml-fish-search-app_04

クリックすると拡大します


5. CI/CD パイプラインのセットアップ

次に、CI/CD パイプラインのセットアップを実行します。今回のアプリ作成では、「GitLab」というリポジトリを使用しました。GitLab においてミラーリングの設定をし、GitLab → AWS CodeCommit にリポジトリをミラーリングします。CodeCommit にリポジトリを作成できたら、次は SAM パイプラインを構築します。

sam pipeline init –bootstrap というコマンドを実行します。利用する CI/CD 基盤は、AWS CodePipeline です。実行を終えると、パイプライン構築用のテンプレートや BuildSpec ファイルなどが作られているのでリモートブランチへプッシュします。sam pipline init の使い方は、こちら をご参照ください。

AWS CloudFormation から、先ほど自動生成された codepipeline.yaml をアップロードします。CloudFormation スタックが作成されたら、CodePipeline でパイプラインが進行されていることを確認してください。

img_ml-fish-search-app_05

クリックすると拡大します


6. Amazon S3 のバケットを作成

このステップで行う処理はこちらです。

img_ml-fish-search-app_06

template.yaml に以下の情報を追記し、S3 のバケットを作成します。

template.yaml

S3Bucket:
        Type: "AWS::S3::Bucket"
        Properties:
            BucketName: <<YOUR_BUCKET_NAME>>#バケット名を指定してください。

template.yaml の書き方のテンプレートについては、こちら をご参照してください。
LambdaにS3へのアクセスを許可するポリシーを付与することも忘れないようにしましょう。ポリシーについては、こちら をご参照ください。

template.yaml

            Policies:
                - S3WritePolicy:
                    BucketName: !Ref S3Bucket

7. S3に Lambdaで画像を保存できるようにする

このステップで行う処理はこちらです。

img_ml-fish-search-app_07

クリックすると拡大します

次に、LINE で送られてきた画像を S3 に保存する Lambda を作成します。Lambda のコードは以下の通りです。

import os
import logging
import tempfile
import boto3
from linebot import LineBotApi, WebhookHandler
from linebot.models import TextSendMessage

s3 = boto3.resource('s3')
# 環境変数を取得
channel_access_token = os.getenv('CHANNEL_ACCESS_TOKEN')
backet_name = os.getenv('BUCKET_NAME')
line_bot_api = LineBotApi(channel_access_token)

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

def lambda_handler(event, context):
    logger.info('image event')
    imageid = event["imageid"]
    replytoken = event["replytoken"]
    user_id = event["user_id"]
    image = line_bot_api.get_message_content(imageid)

    with tempfile.TemporaryFile() as tmp:
        for chunk in image.iter_content():
            tmp.write(chunk)
        tmp.seek(0)

    bucket = s3.Bucket(backet_name)
    bucket.put_object(
            Body=tmp,
            Key=f'{imageid}.png'
        )
    line_bot_api.reply_message(
            replytoken,
            TextSendMessage(text='アップロードに成功しました')
        )
    return {
        "bucketname": backet_name,
        "imagename": str(imageid) + ".png",
        "user_id": user_id
    }

8. AWS Step Functions のセットアップ

Step Functions のステートマシンを作成します。template.yaml に以下のようなコードを追記して作成できます。ポリシーを付与することも忘れないようにしましょう。

template.yaml

S3RekognitionDBStateMachine:
        Type: AWS::Serverless::StateMachine
        Properties:
            DefinitionUri: statemachine / sfn.asl.json
            DefinitionSubstitutions:
                LineS3Function: !GetAtt LineS3Function.Arn
                RekognitionFunction: !GetAtt RekognitionFunction.Arn
                LineReplyFunction: !GetAtt LineReplyFunction.Arn
                FailFunction: !GetAtt FailFunction.Arn
            Policies:  
                - LambdaInvokePolicy:
                    FunctionName: !Ref LineS3Function
                - LambdaInvokePolicy:
                    FunctionName: !Ref RekognitionFunction
                - LambdaInvokePolicy:
                    FunctionName: !Ref LineReplyFunction
                - LambdaInvokePolicy:
                    FunctionName: !Ref FailFunction

sfn.asl.json というファイルを作成し、Step Functions で Lambda を使ったワークフローを定義します。

sfn.asl.json

{
    "StartAt": "LINEImageS3",
    "States": {
        "LINEImageS3": {
            "Type": "Task",
            "Resource": "${LineS3Function}",
            "Next": "RekognitionDB",
        },
        "RekognitionDB": {
            "Type": "Task",
            "Resource": "${RekognitionFunction}",
            "Next": "LineReply",
        },
        "LineReply": {
            "Type": "Task",
            "Resource": "${LineReplyFunction}",
            "End": true
        }
    }
}

このワークフローを可視化すると、このようなイメージです。

img_ml-fish-search-app_08

9. Step Functions 実行用の Lambda を作成

このステップで行う処理はこちらです。

img_ml-fish-search-app_09

Step Functions を実行するための Lambda を作成します。API Gateway から直接 Step Functions を呼び出すこともできますが、そうすると Step Functions が終わるまでの処理が同期的な処理になってしまうことにご注意ください。同期的な処理だと、一連の処理が終わるまで Webhook のリクエスト側が待っていることになります。

そこで今回は Webhook のリクエストに対しての応答を同期的に済ませておいて、後の処理は非同期処理で行うという処理にします。

コードは以下の通りです。

step.py

import json
import os
from types import BuiltinMethodType
import datetime
import base64
import hashlib
import hmac
import boto3
from linebot import LineBotApi, WebhookHandler
from linebot.models import TextSendMessage

client = boto3.client('stepfunctions')

# 環境変数を取得
channel_secret = os.getenv('CHANNEL_SECRET')
channel_access_token = os.getenv('CHANNEL_ACCESS_TOKEN')
statemachine_arn = os.getenv('STATEMACHINE_ARN')
line_bot_api = LineBotApi(channel_access_token)

def lambda_handler(event, context):
    # 署名の検証(後述)
    body = event["body"]
    signature = event["headers"]["x-line-signature"]
    hash = hmac.new(channel_secret.encode('utf-8'),
                    body.encode('utf-8'), hashlib.sha256).digest()
    if signature != base64.b64encode(hash).decode():
        return {
            "statusCode": 401,
            "body": 'Unauthorized'
        }

    now = datetime.datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
    jsonbody = json.loads(body)
    imageid = jsonbody["events"][0]["message"]["id"]
    replytoken = jsonbody["events"][0]["replyToken"]
    user_id = jsonbody["events"][0]["source"]["userId"]

    type = jsonbody["events"][0]["message"]["type"]
    if type == "image":
        client.start_execution(
            stateMachineArn=statemachine_arn,
            name="job-" + str(now),
            input=json.dumps({
                "imageid": imageid,
                "replytoken": replytoken,
                "user_id": user_id
            })
        )
    # きたメッセージが画像以外の場合、ステートマシンは実行せず、「判定したい魚の写真を送ってください」と返信して終了。
    else:
        line_bot_api.reply_message(replytoken, TextSendMessage(
            text='判定したい魚の写真を送ってください'))
    return {'body': 'ok'}

10. Amazon Rekognition でモデルを作成する

Amazon Rekognition を使用して、モデルを作成します。実際に京都水族館に行き、魚の写真をたくさん撮影しました。その魚の画像をもとに、カスタムラベルを使って作成しました。カスタムラベルについては こちら を参照してください。

また、Rekognition のモデル作成の手順については、こちら を参照してください。

img_ml-fish-search-app_10

11. 画像をモデルで推論する Lambda のコードを書く

このステップで行う処理はこちらです。

img_ml-fish-search-app_11

画像をモデルで推論する Lambda のコードはこちらです。

rekog.py

import os
import boto3

# 環境変数を取得
project_arn = os.getenv('PROJECT_ARN')
table_name = os.getenv('TABLE_NAME')

rekognition = boto3.client('rekognition')
dynamodb = boto3.client('dynamodb')

def lambda_handler(event, context):
    response = rekognition.detect_custom_labels(
        ProjectVersionArn=project_arn,
        Image={
            'S3Object': {
                'Bucket': event["bucketname"],
                'Name': event["imagename"],
            }
        },
    )
    user_id = event["user_id"]

    if not response["CustomLabels"]:
        message = "この写真ではうまく判定できませんでした・・・。"

    else:
        resultname = response['CustomLabels'][0]['Name']
        resultpercent = response['CustomLabels'][0]['Confidence']
        user_id = event["user_id"]

        response = dynamodb.get_item(
            TableName=table_name,
            Key={
                "FishName": {"S": resultname}
            }
        )
        item = response["Item"]
        per = round(resultpercent, 2)
        env = item["Env"]["S"]
        bool = item["bool"]["S"]

        message = '''**判定結果**

名前 : {0}

信頼度 : {1}%

生息地 : {2}

人への影響 : {3}'''.format(resultname, str(per), env, bool)

    return {
        "user_id": user_id,
        "message": message
    }

12. Amazon DynamoDB のテーブルを作成

DynamoDB のテーブルを作成します。template.yaml に以下を記載します。

template.yaml

FishDynamoDB:
        Type: AWS::Serverless::SimpleTable
        Properties:
            PrimaryKey:
                Name: FishName
                Type: String
            TableName: Fish-Aquapassha

template.yaml の書き方のテンプレートについては、こちら をご参照ください。

Lambda に DynamoDB へのアクセスを許可するポリシーを付与することも忘れないようにしましょう。ポリシーについては、こちら をご参照ください。

template.yaml

                - DynamoDBReadPolicy:
                    TableName: !Ref FishDynamoDB

テーブルが作成されたら、AWS マネジメントコンソールにログインし、項目を追加します。

img_ml-fish-search-app_12
img_ml-fish-search-app_13

13. DynamoDB からデータを取得する Lambda を書く

このステップで行う処理はこちらです。

11. 画像をモデルで推論する Lambda のコードを書く のコード rekog.py をご参照ください。

img_ml-fish-search-app_14

14. LINE に判定結果を送信する Lambda を書く

このステップで行う処理はこちらです。

img_ml-fish-search-app_15

今までの Lambda 関数から message を受け取って、LINE にプッシュメッセージを送る Lambda 関数を作成します。

今までの Lambda 関数では、event の中の message という変数にメッセージを格納しており、この最後の Lambda 関数に渡しています。この Lambda 関数では、message を受け取り、それを LINE に送信します。

app.py

import os
from linebot import LineBotApi, WebhookHandler
from linebot.models import TextSendMessage

# 環境変数を取得
channel_access_token = os.getenv('CHANNEL_ACCESS_TOKEN')
line_bot_api = LineBotApi(channel_access_token)

def lambda_handler(event, context):
    line_bot_api.push_message(event["user_id"], TextSendMessage(
        text=event["message"]))

15. LINE の署名の検証を行う

このステップで行う処理はこちらです。

img_ml-fish-search-app_16

クリックすると拡大します

LINE の署名の検証を行います。署名の検証とは、「意図した送信元 (LINE Platform) から送られてきたかどうか」を確認する作業です。

もし不正な人物が API Gateway のエンドポイント URL を入手しアクセスしようとしても、利用できなくする仕組みです。セキュリティ強化のために行います。

9. Step Functions 実行用のLambda を作成 のコード step.py の署名の検証の部分は以下の通りです。

step.py

def lambda_handler(event, context):
    body = event["body"]
    signature = event["headers"]["x-line-signature"]
    hash = hmac.new(channel_secret.encode('utf-8'),
                    body.encode('utf-8'), hashlib.sha256).digest()
    if signature != base64.b64encode(hash).decode():
        return {
            "statusCode": 401,
            "body": 'Unauthorized'
        }

LINE Developers の仕様では、「リクエストが LINE プラットフォームから送られたことを確認するために、ボットサーバーでリクエストヘッダーの x-line-signature に含まれる署名を検証します。」と記載されています。署名の検証の方法は以下の通りです。

  1. チャネルシークレットを秘密鍵として、HMAC-SHA256 アルゴリズムを使用してリクエストボディのダイジェスト値を取得します。
  2. ダイジェスト値を Base64 エンコードした値と、リクエストヘッダーの x-line-signature に含まれる署名が一致することを確認します。

詳細については、LINE Developers の Messaging API リファレンス を参照してください。

当初は Lambda オーソライザーで署名の検証をしようとしていました。しかし、signature が body の値に基づいて作成される仕様のため、オーソライザーの関数ではリクエストボディを参照できず、Step Functions を呼び出す Lambda 関数で署名の検証を実施しました。


16. Step Functions のエラー処理を行う

このステップで行う処理はこちらです。

img_ml-fish-search-app_17

S3 に画像を保存する Lambda、あるいは画像をモデルで推論する Lambda でエラーが起こった場合の処理を行う Lambda を作成します。

今回は、FailFunction という Lambda 関数を作成し、Step Functions に繋げます。こちらのようなイメージです。

img_ml-fish-search-app_18

このワークフローを作成するには、sfn.asl.json にエラーをキャッチするコードを追記します。

 sfn.asl.json

{
    "StartAt": "LINEImageS3",
    "States": {
        "LINEImageS3": {
            "Type": "Task",
            "Resource": "${LineS3Function}",
            "Next": "RekognitionDB",
            "Catch": [
              {
                  "ErrorEquals": [
                      "States.ALL"
                  ],
                  "Next": "FailFunction",
                  "ResultPath": null
              }
            ]
        },
        "FailFunction": {
            "Type": "Task",
            "Resource": "${FailFunction}",
            "Next": "LineReply"
        },
        "RekognitionDB": {
            "Type": "Task",
            "Resource": "${RekognitionFunction}",
            "Next": "LineReply",
            "Catch": [
                {
                    "ErrorEquals": [
                        "States.ALL"
                    ],
                    "Next": "FailFunction",
                    "ResultPath": null
                }
            ]
        },
        "LineReply": {
            "Type": "Task",
            "Resource": "${LineReplyFunction}",
            "End": true
        }
    }
}

FailFunction のコードは以下の通りです。

error.py

def lambda_handler(event, context):
    return{
        "user_id": event["user_id"],
        "message": "エラーが起きました。"
    }

LineReplyFunction に、「エラーが起きました。」というメッセージを渡しています。

エラーが出た時、それぞれの Step Functions のワークフローは以下のようになります。

img_ml-fish-search-app_19
img_ml-fish-search-app_20

まとめ

機械学習での画像判定をサーバーレスアプリケーションで構築しました。その中で、同期処理 / 非同期処理をどうするべきか、どのようにセキュリティを高めるのかについて悩みました。

3 ヶ月間のインターンで、一から一つのアプリを作成することができ、このことは、私にとって大きな自信となりました。アプリ作成では、わからないことが本当に多く、AWS の色々な方々に質問させていただきました。本当にありがとうございました。アプリ制作は私にとって初めての経験で、一生の宝物です。今後もこの経験をもとに、ステップアップにチャレンジしたいです。

みなさまが AWS を使った機械学習アプリ開発をお試しいただくのに、少しでも参考になれば幸いです。


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます

筆者プロフィール

photo_okumura-mimi

奥村 美生
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト 長期インターン

水族館巡りの全国制覇を夢に持つほど水族館が好きです。

監修者プロフィール

photo_okuda-nobuo

岡田 信夫
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト

研究開発やカレンダーサービス開発のエンジニアを経て 2020 年にアマゾン ウェブ サービス ジャパンへ入社。
Kotlin, Python, サーバーレス, 釣りとお酒が好きです。

AWS のベストプラクティスを毎月無料でお試しいただけます

さらに最新記事・デベロッパー向けイベントを検索

下記の項目で絞り込む
絞り込みを解除 ≫
1

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する