Amazon Web Services ブログ

Amazon Kinesis Video Streams および Amazon SageMaker を使用したリアルタイムでのライブビデオの分析

Amazon SageMaker の Amazon Kinesis Video Streams Inference Template (KIT) の発売を発表できることを嬉しく思っています。この機能により、顧客はほんの数分で Amazon SageMaker エンドポイントに Kinesis Video streams をアタッチすることが可能です。これにより、他のライブラリを使用したり、カスタムソフトウェアを作成してサービスを統合することなく、リアルタイムの推論が可能になります。KIT は、Docker コンテナとしてパッケージ化された Kinesis Video Client Library ソフトウェアと、すべての必要な AWS リソースのデプロイを自動化する AWS CloudFormation テンプレートとで構成されています。 Amazon Kinesis Video Streams を使用すると、アナリティクス、機械学習 (ML) 、再生、その他の処理のために、接続したデバイスからオーディオ、ビデオ、関連メタデータを AWS に確実にストリームすることができます。Amazon SageMaker は、開発者やデータサイエンティストが ML モデルを迅速かつ容易に構築、トレーニング、および展開するための管理プラットフォームです。

ホームセキュリティカメラ、エンタープライズ IP カメラ、トラフィックカメラ、AWS DeepLens 、携帯電話などのソースからオーディオおよびビデオフィードを Kinesis Video Streams へと取り込みます。スマートホームのざまな業界からスマートシティへ、インテリジェント製造から小売業に渡るさまざまな業界のデベロッパーとデータサイエンティストが、AWS クラウドでこれらの動画フィードを分析するために、独自の 機械学習アルゴリズムを導入したいと考えています。これらの顧客が Kinesis Video Streams を Amazon SageMaker エンドポイントに確実に接続することで、操作上のオーバーヘッドを最小限に抑えた ML 駆動型ビデオ分析パイプラインをスケーラブルでリアルタイムに構築できます。

このブログ記事では、新しい機能を紹介し、Kinesis Video Streams クライアントライブラリと CloudFormation テンプレートとの両方の機能について説明します。また、Kines を使用して Kinesis Video Streams をAmazon SageMaker に統合するための作業手順の例をステップバイステップでご紹介します。

Kinesis Video Streams および Machine-Learning 駆動型分析

Amazon Kinesis Video Streams が re:Invent 2017 で公開されました。起動時には、Amazon Kinesis Video Streams が Amazon Rekognition Video とすでに統合されており、顔メタデータのプライベートデータベースを使用して、リアルタイムで簡単に顔認識を実行することができます。以前のブログ記事では、Amazon Kinesis Video Streams や Amazon Rekognition Video でハイエンドの消費者体験を実現するために顔認識を使用する方法について詳しく説明しています。

Kinesis Video Streams を使って顧客がさまざまなビデオフィードを取り込むことで、そのユースケース、トレーニングデータセット、実行される推論のタイプも多様化します。例えば、主要なホームセキュリティプロバイダは、 Kinesis Video Streams を使って自宅のセキュリティカメラからオーディオとビデオを取り込むことを望んでいます。その後、Amazon SageMaker で動作する独自のカスタム ML モデルを添付して、ペットや被写体の検出や分析を行い、さらに豊かなユーザーエクスペリエンスを構築したいと考えています。インストア (実店舗) のインテリジェンスプロバイダは、 Amazon SageMaker を使って、店内に設置されたカメラからビデオをストリーミングして、来店者数の計測モデル (custom person-counting model) をトレーニングしたいと考えています。これにより、店舗運営側に知らせるために店内の買い物客数を推定し、リアルタイム推論を行うことが可能になります。 

KIT を使って Amazon SageMaker へ統合する Kinesis Video Streams

Amazon SageMaker の KIT を構成する 2 つのコンポーネントについて説明します。

Kinesis Video Streams クライアントライブラリを使用して、分散した一連のワーカーに対してメディアのスケーラブルな、少なくとも 1 回の処理を可能にし、 Amazon SageMaker エンドポイントの信頼できる起動を管理し、その後の処理のために Kinesis data stream に推論結果を公開することが可能です。具体的には、処理が必要な Kinesis Video streams ライブラリで決定し、Streams に接続し、定期的に更新してストリームを処理を含めるまたは除外します。ソフトウェアは、任意の時点で Kinesis Video streams の処理を担当するコンシューマーを動作させるワーカーをインスタンス化します。また、すべてのコンシューマーのリースを維持します。各コンシューマーは、さまざまなストリームを処理するために各ワーカーを連携させて動作します。また、リースストリームベース単位でチェックポイントを管理することにより、メディアフラグメントの信頼性の高い少なくとも 1 回の処理を保証します。

このソフトウェアは、リアルタイムでの Kinesis Video Streams GetMedia API 操作を使ってストリームからメディアフラグメントを取り出し、メディアフラグメントを解析して H264 チャンクを抽出し、復号化が必要なフレームをサンプリングし、そして I フレームを復号化して、Amazon SageMaker エンドポイントを起動する前に、JPEG / PNG形式などの画像形式に変換します。 Amazon SageMaker がホストするモデルが推論を返すと、KIT はその結果をキャプチャして Kinesis data stream に公開します。顧客は、AWS Lambda など好きなサービスを使ってその結果を使うことが可能できます。最後に、ライブラリでは Amazon CloudWatch にさまざまなメトリックを公開し、顧客によるダッシュボードの構築、実稼働環境に展開する際の閾値の監視および警告を可能にいます。

AWS CloudFormation テンプレートは、Kinesis Video Streams からメディアを読み込み、ML ベースの分析用の Amazon SageMaker エンドポイントを呼び出すために、顧客自身のアカウントに関連するすべての AWSインフラストラクチャの展開を自動化します。 これにより、統合された機能を構築、運用、拡張する時間を節約します。

CloudFormation テンプレートは、Docker コンテナでホストされているライブラリソフトウェアを実行する AWS Fargate 計算エンジンを使用してまず最初に Amazon Elastic Container Services (ECS) クラスタを作成します。

Fargate Tasks と Amazon Kinesis Data Streams で動作するワーカー間でチェックポイントや関連状態を管理する Amazon DynamoDB テーブルも起動し、Amazon SageMaker から生成された推論出力をキャプチャします。  このテンプレートは、インフラストラクチャ全体を監視するために必要な AWS Identity and Access Management (IAM) ポリシーと Amazon CloudWatch リソースも作成します。 Amazon SageMaker の KIT は、画像データを受け取るあらゆる Amazon SageMaker エンドポイントと互換性を備えています。特定のユースケースに合わせて、顧客は必要に応じたテンプレートの変更が可能です。

KIT のセットアップ方法

前提条件

KIT デプロイのためのステップバイステップの手順

  • CloudFormation によって Web サイトをデプロイする
  • CloudFormation は、反復可能なインフラストラクチャリソースのデプロイのためのインフラストラクチャとしてのコードテンプレートの作成を容易にする強力なツールです。
    1. AWS アカウントにログインしていない場合は、ログインする。既にログインが完了している場合は、次のURLを使用して手順 2 へ進む。 https://xxxxxxxxxxxx.signin.aws.amazon.com/console replacing the Xs with your account number.
    2. AWS Services の検索バーで CloudFormation を選択する。
    3. ここからターゲット領域の CloudFormation テンプレートを選択する。
    4. スタックに名前を付け、パラメータを入力して、[Next] を選択する。
      • AppName – すべてのリソースを作成するために使用される一意のアプリケーション名
      • DockerImageRepository – Kinesis Video Streams と SageMaker ドライバーの Docker イメージ
      • EndPointAcceptContentType – image/jPEG または image/png イメージ形式は、現在 SageMaker エンドポイントを呼び出すためにサポートされる
      • LambdaFunctionKey – カスタム Lambda 関数用のAmazon S3 バケットの場所
      • LambdaFunctionKey – カスタム Lambda 関数コード zip ファイル用の Amazon S3 オブジェクトキー
      • SageMaker Endpoint – カスタム Machine Learning モデルをホストする Amazon SageMaker エンドポイント
      • StreamNames – ストリーム名を指定する文字列の CSV リスト
      • TagFilters – タグフィルタの JSON 文字列
    5. [Options] ページのパラメータをデフォルトのままにして、[Next] を選択する。
    6. Review Acknowledge の設定情報と IAM ロールチェックボックスの作成を確認し、[Creat] を選択する。

ソリューションの拡張

ユースケースに応じて、Lambda 関数を更新し他の AWS サービスと統合することでこのソリューションはを拡張することができます。

この例では、Kinesis Video のフラグメントを取得し、検出データとともに Amazon S3 バケットに格納します。

  1. Amazon S3 バケットの作成
  2. 正しいバケット名と Kinesis Video Stream ARN に置き換えて、AWS Lambda Execution ロールに以下の追加権限を追加する。これらの追加アクセス権限により、AWS Lambda は Kinesis Video Stream からフラグメントを取得し、S3 バケットに書き込むことが可能です。
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject",
        ],
        "Resource": [
            "arn:aws:s3:::<<YOUR BUCKET>>/*",
        ]
    },
    {
        "Effect": "Allow",
        "Action": [
            "kinesisvideo:GetMediaForFragmentList",
            "kinesisvideo:GetDataEndpoint",
        ],
        "Resource": [
            "<< YOUR KINESIS VIDEO STREAM ARNs>>",
        ]
    }
    
  3. <<YOUR BUCKET>> を次のコードに置き換え、 Lambda 関数コードを置き換える。
    from __future__ import print_function
    import base64
    import json
    import boto3
    import os
    import datetime
    import time
    from botocore.exceptions import ClientError
    
    bucket='<<YOUR BUCKET>>'
    
    #Lambda function is written based on output from an Amazon SageMaker example: 
    #https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/object_detection_pascalvoc_coco/object_detection_image_json_format.ipynb
    object_categories = ['person', 'bicycle', 'car',  'motorbike', 'aeroplane', 'bus', 'train', 'truck', 'boat',
                         'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog',
                         'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag',
                         'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat',
                         'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
                         'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot',
                         'hot dog', 'pizza', 'donut', 'cake', 'chair', 'sofa', 'pottedplant', 'bed', 'diningtable',
                         'toilet', 'tvmonitor', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven',
                         'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
                         'toothbrush']
    
    def lambda_handler(event, context):
      for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        #Get Json format of Kinesis Data Stream Output
        result = json.loads(payload)
        #Get FragmentMetaData
        fragment = result['fragmentMetaData']
        
        # Extract Fragment ID and Timestamp
        frag_id = fragment[17:-1].split(",")[0].split("=")[1]
        srv_ts = datetime.datetime.fromtimestamp(float(fragment[17:-1].split(",")[1].split("=")[1])/1000)
        srv_ts1 = srv_ts.strftime("%A, %d %B %Y %H:%M:%S")
        
        #Get FrameMetaData
        frame = result['frameMetaData']
        #Get StreamName
        streamName = result['streamName']
       
        #Get SageMaker response in Json format
        sageMakerOutput = json.loads(base64.b64decode(result['sageMakerOutput']))
        #Print 5 detected object with highest probability
        for i in range(5):
          print("detected object: " + object_categories[int(sageMakerOutput['prediction'][i][0])] + ", with probability: " + str(sageMakerOutput['prediction'][i][1]))
        
        detections={}
        detections['StreamName']=streamName
        detections['fragmentMetaData']=fragment
        detections['frameMetaData']=frame
        detections['sageMakerOutput']=sageMakerOutput
    
        #Get KVS fragment and write .webm file and detection details to S3
        s3 = boto3.client('s3')
        kv = boto3.client('kinesisvideo')
        get_ep = kv.get_data_endpoint(StreamName=streamName, APIName='GET_MEDIA_FOR_FRAGMENT_LIST')
        kvam_ep = get_ep['DataEndpoint']
        kvam = boto3.client('kinesis-video-archived-media', endpoint_url=kvam_ep)
        getmedia = kvam.get_media_for_fragment_list(
                                StreamName=streamName,
                                Fragments=[frag_id])
        base_key=streamName+"_"+time.strftime("%Y%m%d-%H%M%S")
        webm_key=base_key+'.webm'
        text_key=base_key+'.txt'
        s3.put_object(Bucket=bucket, Key=webm_key, Body=getmedia['Payload'].read())
        s3.put_object(Bucket=bucket, Key=text_key, Body=json.dumps(detections))
        print("Detection details and fragment stored in the S3 bucket "+bucket+" with object names : "+webm_key+" & "+text_key)
      return 'Successfully processed {} records.'.format(len(event['Records']))
    

ビデオフラグメント検出の詳細を含む S3 バケット

以下のスクリーンショットは、Amazon SageMaker の KIT が検出したビデオフラグメントと対応する推論を Amazon S3 バケットに送出していることを示しています。

処理された出力を示す AWS Lambda 関数ログ

このソリューションは、さまざまなユースケースに拡張することができます。例えば、Computer Vision OpenCV ライブラリと Amazon SageMaker の予測の詳細を組み合わせることで、バウンディングボックスをビデオフレーム内の検出されたオブジェクトに追加し、リアルタイムアラートポータルへのフィードインが可能です。

KIT 管理インフラストラクチャのモニタリング

ライブラリソフトウェアは、デフォルトでさまざまな CloudWatch メトリックを提供しており、顧客は個々のストリームを処理するための進捗状況をモニタリングすることができます。これには、クラスタ内のワーカーのリソース消費量、Amazon SageMaker エンドポイントの呼び出し速度、および推論結果がどのようにして Kinesis Data Stream に公開されるかについて決定するメトリックが含まれます。CloudFormation テンプレートは、すぐに使用が可能な CloudWatch ダッシュボードを作成し、顧客は目的に合わせてさらに拡張することが可能です。デフォルトでは、ダッシュボードは、ソソフトウェアのレイテンシ、信頼性、およびスケーリング特性に特有の KIT およびカスタムメトリックを強化するような基本サービスの主要メトリックをキャプチャします。

CloudWatch ダッシュボード – KIT メトリック

結論

Amazon SageMaker の KIT を通じて、信頼性と拡張性のある手法で、リアルタイムの ML 駆動型メディアストリームの処理を単純化しました。顧客は、すべての Kinesis Video streams を Amazon SageMaker エンドポイントに接続して、最小限の操作オーバーヘッドで ML 駆動型ユースケースの強化することが可能です。この機能の詳細については、ドキュメントをご参照ください。すべての開発者がユースケースをさらなるカスタマイズを実現できるように顧客からのフィードバックに基づいて、基本となる Kinesis Video Client ライブラリソフトウェアの反復処理を期待しています。


著者について

Aditya Krishnan 氏は、Amazon Kinesis Video Streams の代表です。この役割において、大規模なインターネット対応カメラデバイスからビデオをストリーミングするのは驚くほど容易にするビジョンを実現するため、同氏は顧客、ハードウェアとソフトウェアのパートナー、驚異的なエンジニアチームと共に働くという幸運を持ち合わせています。

 

 

 

Jagadeesh Pusapadi 氏は アマゾン ウェブ サービス のソリューションアーキテクトであり、顧客との戦略的イニシアチブを担当しています。同氏は 望ましいビジネス成果達成に向けたアーキテクチャガイダンスを提供して、顧客による AWS クラウドの革新的なソリューション構築を支援します。