ドローンの映像を AWS で簡単に分析して役立てよう !

2022-07-04
日常で楽しむクラウドテクノロジー

中西 貴大

こんにちは、ソリューション アーキテクトの中西です。
近年、ドローンの利用には大きく注目が集まっており、農業、点検、測量、物流など実社会での活用が始まっています。一方で、空撮などの趣味目的を想定した「トイドローン」と呼ばれる安価なドローンが多く販売されています。

その一つに Ryze Technology 社が販売するトイドローン TelloTello EDU があります。これらのドローンにはカメラが搭載されており、公式モバイルアプリを使ってドローンからの映像を確認しながら操作することができますが、それだけではありません。

実はこの Tello や Tello EDU には SDK が用意されており、プログラムからドローンを操作したり、映像を取得したりできるんです ! 安価なトイドローンではありますが、「プログラマブルな飛ぶカメラ」という立派な特徴を活かして、実社会でも役立てることはできないでしょうか ? 

本記事ではトイドローン Tello EDU を題材として、楽しいカメラ付きデバイスを AWS と組み合わせて便利な仕組みを簡単に作る一例をお見せします !

このクラウドレシピ (ハンズオン記事) を無料でお試しいただけます »

毎月提供されるクラウドレシピのアップデート情報とともに、クレジットコードを受け取ることができます。 


1. 作るもの

本記事では、トイドローン Tello EDU と Amazon Rekognition Streaming Video Events (後述) を組み合わせて、「人がいるべきでない場所」をドローンにより警備・監視するソリューションを作ります。上図の番号順に、処理は次のようになっています。

  1. ドローンのカメラ映像を AWS にストリーミングします。ドローンを自動飛行させます。
  2. Amazon Rekognition Streaming Video Events (後述) によりビデオストリームから人物を検出します。
  3. 人物を検出したタイミングでユーザーにメール通知します。
  4. ユーザーはメールのリンクから、人物が写り込んだ画像を確認できます (本記事のスコープ外ですが、必要なら映像を巻き戻して確認することもできます)。

暗視カメラを搭載したドローンを利用すれば、夜間に広大な農地をパトロールし、窃盗犯や害獣を見つけて通知するといったこともできるかもしれませんね。今回のように AWS が提供する AI サービスの機能とユースケースがマッチすれば、ローコードあるいはノーコードで、簡単にアプリケーションを賢くすることができます。


2. 準備

筆者は下記の環境でシステムを構築しました。

  • Tello EDU
  • Wi-Fi (2.4 GHz 推奨) 接続環境
  • PC (Ubuntu 20.04 LTS, Python 3.8.10, AWS CLI 2.7.2)

準備として、PC 側のプログラムから AWS リソースにアクセスするために使用する IAM ユーザーを作成します。今回は Amazon Kinesis Video Streams と Amazon Rekognition へのアクセスのみが必要ですので、アクセス権限をこれらのサービスに絞った IAM ユーザーを作成します。

注) 本記事では手順の簡略化のため、システム構築中のリソース作成に必要なアクセス権限と、システム構築後のデバイスの認証に必要なクレデンシャルを、ひとつの IAM ユーザーにより実現しています。しかし実際の IoT ユースケースではデバイスごとに証明書を組み込み、AWS IoT Core 認証情報プロバイダーによって発行された一時的で制限されたクレデンシャルを使用するケースが多いです。

マネジメントコンソールから AWS IAM コンソールを開き、 IAM ユーザーを追加します。

クリックすると拡大します

ユーザー名」にはわかりやすい名前 (tello-user など) をつけて、「アクセスキー - プログラムによるアクセス」を選択して次の画面に進みます。

クリックすると拡大します

既存のポリシーを直接アタッチ」を選択し、以下の 2 つのポリシーを追加します。

  • AmazonKinesisFullAccess
  • AmazonRekognitionFullAccess

注) 特に本番環境では、さらに権限を絞り、必要なアクションとリソースのみ許可することが推奨されます。

次の画面で必要であればタグを付け、さらに次の画面で「ユーザーの作成」をクリックします。

クリックすると拡大します

IAM ユーザーの作成が完了しました。アクセスキー ID とシークレットアクセスキーが発行されましたので、これを安全な場所にメモしておきます。

注) これらのクレデンシャルを使ってプログラムから AWS リソースへアクセスしますが、これらを絶対にプログラムコードに埋め込まないでください

クリックすると拡大します

PC のほうの作業に移ります。いま作成した IAM ユーザーの認証情報と、使用する AWS リージョンを環境変数に設定します。本記事の AWS リソースはオレゴン (us-west-2) リージョンで作成します。下記を実行します。

export AWS_ACCESS_KEY_ID="メモした「アクセスキーID」"
export AWS_SECRET_ACCESS_KEY="メモした「シークレットアクセスキー」"
export AWS_DEFAULT_REGION="us-west-2"

3. Tello EDU と仲良くなる

3-1. Tello EDU の動作モード

Tello EDU には、デフォルト (親機モード) と Station mode (子機モード)の2種類の動作モードがあります。

デフォルト (親機モード) では Tello EDU が Wi-Fi アクセスポイントとなり、PC やモバイル端末が Tello EDU のネットワーク配下となります。この状態では PC からインターネットにアクセスできず (別途工夫すれば可能ですが)、そのままでは AWS サービスと連携することができません。 そのためまずは Tello EDU を Station mode (子機モード)に設定します。これは Tello EDU をご自宅の Wi-Fi ルータに接続させて使用できるモードです (複数の Tello EDU を編隊飛行させることもできます !)。

それでは Station mode (子機モード)に設定する方法を知るためにも、Tello EDU で使える制御コマンドを見てみましょう。

3-2. Tello EDU の制御コマンド

Tello EDU で使用できるのは Tello SDK 2.0 であり、ドキュメントは こちら です。これを読んでいくと、Tello EDU には下表のような制御コマンドがあることがわかります。

Tello EDU の中で UDP サーバーが立っているので、これらの制御コマンドをTello EDU の 8889 番ポート宛に送ることで Tello EDU を制御できます。送ったコマンドに対するレスポンス (ok / error など) を受信するには、PC 側でも UDP サーバーを立てて 8889 番ポートで待ち受けていれば大丈夫です。

本記事で使用するTello EDU 制御コマンド

コマンド 説明 レスポンス
command Enter SDK mode.

(まず最初に送る必要があるコマンドです。このコマンドに対してokというレスポンスが返ってきたのを確かめてから、他のコマンドを送っていくことになります)
ok / error
takeoff Auto takeoff.
land Auto land.
forward {x} Fly forward for {x} cm.
{x} = 20-500
up {x} Ascend to {x} cm.
{x} = 20-500
cw {x} Rotate {x} degrees clockwise.
{x} = 1-360
streamon Enable video stream.
streamoff Disable video stream.
ap {ssid} {pass} Set the Tello to station mode, and connect to a new access point with the access point’s ssid and password.
{ssid} = updated Wi-Fi ssid
{pass} = updated Wi-FI password

(Tello EDUをStation mode (子機モード)に設定します。)

初回のみの作業となりますが、今回は前述の通り Tello EDU を Statio mode (子機モード)に設定しますので、例えば下記のような Python スクリプトを実行して ap {ssid} {pass} コマンドを Tello EDU に送ります。

import socket

SSID = 'Wi-Fi SSID'
PASS = 'Wi-Fi パスワード'

tello_addr = ('192.168.10.1', 8889)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', 8889))

# Send command: 'command'
sock.sendto('command'.encode('utf-8'), tello_addr)
response, addr = sock.recvfrom(100)
print(f'{addr}: {response}' % (addr, response))

# Send command: 'ap {ssid} {pass}'
sock.sendto(f'ap {SSID} {PASS}'.encode('utf-8'), tello_addr)
response, addr = sock.recvfrom(100)
print(f'{addr}: {response}' % (addr, response))

sock.close()

なお、既に公式コードサンプルとして、親機モードで動作する Tello-Python と、子機モードの複数の Tello EDUを編隊飛行させる Multi-Tello-Formation が提供されています。

今回のケースは子機モードなので後者を利用できそうにも思えますが、このプログラムではsleepを多用して送受信タイミングの同期を図っており、それにより待ち時間や送受信タイミングのズレが発生してしまう場合があります。これを避けるため、今回はTello SDK 2.0のドキュメントに基づいて Tello EDU を扱うための「Tello クラス」を作成しました。UDPで通信するTello EDUの仕様上、パケットが到達しないこともたまに起こり得ますが、これに対応すべくタイムアウトや再送制御を組み込んでいます。

下記のコードを tello_util.py として保存しておきます。

import netaddr
import netifaces
import queue
import socket
import subprocess
import threading
import time


class Tello:
    def __init__(self, netmask='255.255.255.0', tello_cmd_port=8889):
        self.cmd_port = tello_cmd_port

        # Queue for received responses from Tello
        self.q = queue.Queue()

        # AF_INET (IPv4), SOCK_DGRAM (UDP)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(('', tello_cmd_port))

        # Receiver for responses from Tello
        self.recv_thread = threading.Thread(target=self._udp_receiver)
        self.recv_thread.daemon = True
        self.recv_thread.start()

        # Search for Tello
        self.ip_addr = self._find_tello(self._get_subnets(netmask))
        print(f'Found Tello: {self.ip_addr}')
        
        
    def __del__(self):
        self.sock.close()


    def _udp_receiver(self):
        """
        Puts all data received on self.cmd_port into the queue
        """
        while True:
            try:
                response, addr = self.sock.recvfrom(1024)
                response = response.decode('utf-8')
                print(f'[UDP RECV] {addr} {response}')
                self.q.put((addr, response))
            except Exception as e:
                print(e)


    def _get_subnets(self, netmask):
        """
        Returns a list of CIDRs of subnets with given netmask
        """
        subnets = []
        ifaces = netifaces.interfaces()
        for iface in ifaces:
            my_addr = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
            if my_addr['netmask'] == netmask:
                cidr = netaddr.IPNetwork(f'{my_addr["addr"]}/{my_addr["netmask"]}')
                subnets.append(cidr)

        return subnets


    def _find_tello(self, subnets):
        """
        Returns (IP Address, Port) of found Tello.
        """
        addrs = []
        for subnet in subnets:
            # Adds addresses to list except network address and broadcast address
            addrs.extend(str(addr) for addr in subnet[1:-1])
        
        for addr in addrs:
            print(f'Trying {addr} ...')
            self.sock.sendto('command'.encode('utf-8'), (addr, self.cmd_port))

            if not self.q.empty():
                tello_addr, response = self.q.get()
                if response == 'ok':
                    # Tello found !
                    return tello_addr[0]

        raise Exception('No Tello found.')


    def send_cmd(self, cmd, timeout=3, attempts=2):
        """
        Sends command (cmd) to Tello (ip_addr).
        Returns response from Tello.
        """
        print(f'Sending command "{cmd}" ...')

        response = None
        for _ in range(attempts):
            self.sock.sendto(cmd.encode('utf-8'), (self.ip_addr, self.cmd_port))
            try:
                _, response = self.q.get(timeout=timeout)
                break
            except queue.Empty as e:
                print(f'Command "{cmd}" timeout.')

        return response


    def emergency_land(self):
        """
        Sends 'land' command to Tello as soon as a flight command fails.
        """
        self.send_cmd('land')
        self.send_cmd('streamoff')

3-3. Tello EDU のカメラ映像

Tello SDK 2.0 のドキュメントに記載されていますが、streamon コマンドを Tello EDU に送ると、PC の UDP 11111 番ポート宛に動画のストリーミングが始まります。ストリーミングの形式についてはドキュメントに記載ありませんが、公式サンプルコード Tello-Python には受信したパケットデータを H.264 decode する 実装が見られます。

まとめると「Tello EDU から H.264 エンコードされたビデオストリームが PC の UDP 11111 番ポート宛に送られてくる」仕様だとわかります。


4. ドローン映像を AWS にストリーミングする

4-1. Amazon Kinesis Video Streams とは

Amazon Kinesis Video Streams を使用すると、分析、機械学習 (ML)、再生、およびその他の処理のために、接続されたデバイスから AWS へ動画を簡単かつ安全にストリーミングできるようになります。

今回は Tello EDU からの映像を Amazon Kinesis Video Streams で受信し、その後段に AWS サービスを連携させるような使い方となります。

4-2. ビデオストリームの作成

Amazon Kinesis Video Streams のコンソールを開き、ビデオストリームを作成します。

クリックすると拡大します

ビデオストリーム名」として tello-kvstream と入力し、「ビデオストリームを作成」 をクリックします。

クリックすると拡大します

4-2. GStreamer と Amazon Kinesis Video Streams Producer SDK C++ のインストール

オープンソースのマルチメディアフレームワークである GStreamer と、Amazon Kinesis Video Streams 向けの GStreamer プラグイン kvssink を使って、ドローン映像を AWS にストリーミングしましょう。

まずは Installing GStreamer のページに従って GStreamer をインストールします。次に、 Amazon Kinesis Video Streams Producer SDK C++ をダウンロードし、README の指示に従ってビルドします。この手順には 30 分ほど待ち時間が発生しますが、これが完了すれば Tello EDU からの映像をコードを書かずに AWS にストリーミングする準備が整います !

その具体的な方法について説明します。Tello EDU からの映像の仕様について「H.264 エンコードされたビデオストリームが PC の UDP 11111番ポート宛に送られてくる」と前述しました。このビデオソースを Amazon Kinesis Video Streams にストリーミングするには、GStreamer とプラグイン kvssinkを用いて、下記のようなパイプラインを構成します。

注) このコマンドは今は実行の必要はありません (後ほど掲載するPythonスクリプトに含まれています)。

gst-launch-1.0 udpsrc uri=udp://0.0.0.0:11111 \
    ! h264parse ! video/x-h264,stream-format=avc,alignment=au \
    ! kvssink stream-name="{KVSストリーム名}" storage-size=512 \
    access-key="$AWS_ACCESS_KEY_ID" secret-key="$AWS_SECRET_ACCESS_KEY" aws-region="$AWS_DEFAULT_REGION"

5. 映像から価値を得る

5-1. Amazon Rekognition Streaming Video Events とは ?

まず Amazon Rekognition Video について説明すると、保存された動画やライブストリーム動画からモーションベースのコンテキストを抽出し、分析できるというサービスです。その新機能として、2022 年 4 月 28 日に Amazon Rekognition Streaming Video Events が発表 されました。

Amazon Rekognition Streaming Video Events は指定したオブジェクトがライブのビデオストリーミングで検出されるとすぐに、ユーザーに通知を送信します。このイベント通知機能を使用すると、例えば「裏庭にペットがいます」などタイミングよく、有効なスマートアラートをユーザーに送信できます。また、人の動きが確認されたときに車庫の照明をつけるなどホームオートメーションなどにも活用できます。

本記事では Amazon Rekognition Streaming Video Events をドローン映像に使用して、飛行中に人間を検出した場合にすぐにユーザーに通知し、検出時のカメラ画像をユーザーに共有する警備・監視ソリューションを実現します。

5-2. ビデオストリームから特定オブジェクトを検出し通知する

Amazon Rekognition Streaming Video Events を使用してビデオストリームからラベルを検出するには、分析を開始および管理する「ストリームプロセッサ」を作成 (CreateStreamProcessor) します。

Amazon Rekognition Video は、Amazon Kinesis Video Streams を使用して、ビデオストリームを受信して処理します。ストリームプロセッサを作成する際に、ストリームプロセッサに検出させたいラベルを指定します。本記事執筆時点では、PERSON (人物) PET (犬などのペット) PACKAGE (段ボール箱などの荷物) の 3 種類を選択できます。分析結果は、Amazon S3 バケットと Amazon SNS トピックに出力されます。それではこれらのリソースを作っていきます。

5-3. ストリームプロセッサを使用する準備

まず S3 バケットを作成します。ここには人物やオブジェクトが初めて検出された画像フレームが出力されます。S3 のバケット名は世界で一意になる必要があるので、rekognition-results-xxxxx のように末尾をユニークな文字列にします。

クリックすると拡大します

次に SNS トピックを作成します。人物やオブジェクトが初めて検出されたタイミングで通知するためです。Amazon SNS コンソールから「トピックの作成」を選択し、rekognition-notification という名前でトピックを作成します。

クリックすると拡大します

次に Amazon Rekognition 用サービスロールを作成します。Amazon Rekognition Streaming Video Events を使用するには、Amazon Kinesis Video Streams を使用してストリームを受信し、分析結果を S3 や SNS に出力できなければなりません。ですので、それらに対する適切な権限を Amazon Rekognition に付与する必要があります。

IAM コンソールを開き、「ポリシーの作成」 をします。

クリックすると拡大します

エディタの種類として JSON を選択し、下記のポリシーをエディタ画面にペーストします。このとき、ARN (3 箇所) を正しい値に書き換えてください。

次の画面に進んで (必要であればタグを付け) 、さらに次の画面に進みます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesisvideo:GetDataEndpoint",
                "kinesisvideo:GetMedia"
            ],
            "Resource": [
                "{ビデオストリームのARN}"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": [
                "{S3バケットのARN}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "{SNSトピックのARN}"
        }
    ]
}

クリックすると拡大します

ポリシーの名前」として rekognition-service-policy と入力し、「ポリシーの作成」 をクリックすれば完了です。

クリックすると拡大します

IAM の管理コンソールから「ロール」を選択し、「ロールを作成」をクリックします。

クリックすると拡大します

AWS サービスとして Rekognition を選択します。

クリックすると拡大します

ロールの名前として ServiceRoleForRekognition を入力して、「ロールを作成」をクリックします。

クリックすると拡大します

IAM コンソールのロール一覧画面から作成したロールを探して、そのロール名をクリックします。

クリックすると拡大します

アクセス許可を追加」から「ポリシーをアタッチ」をクリックします。

クリックすると拡大します

さきほど作成したポリシー (rekognition-service-policy) にチェックを入れて、「ポリシーをアタッチ」します。

クリックすると拡大します

5-4. ストリームプロセッサを作成

最後にストリームプロセッサを作成します。下記 JSON をコピーして、これまでに作成したリソースの ARN をコピー&ペースト (4 箇所) し、input_for_CreateStreamProcessor.json として保存します。

{
  "DataSharingPreference": { "OptIn": true },
  "Input": {
    "KinesisVideoStream": {
      "Arn": "{ビデオストリームのARN}"
    }
  },
  "Name": "tello_stream_processor",
  "Output": {
    "S3Destination": {
      "Bucket": "{S3バケットのARN}",
      "KeyPrefix": ""
    }
  },
  "NotificationChannel": {
    "SNSTopicArn": "{SNSトピックのARN}"
  },
  "RoleArn": "{Rekognition用サービスロールのARN}",
  "Settings": {
    "ConnectedHome": {
      "Labels": [
        "PERSON"
      ],
    "MinConfidence": 80
    }
  }
}

AWS CLI を使って下記を実行し、ストリームプロセッサの ARN が返ってきたら成功です。

$ aws rekognition create-stream-processor --cli-input-json file://input_for_CreateStreamProcessor.json

{
"StreamProcessorArn": "arn:aws:rekognition:us-west-2:{アカウントID}:streamprocessor/tello_stream_processor"
}

6. ユーザーに通知する

Amazon Rekognition は、人物が初めて検出された瞬間のビデオフレーム (画像) を S3 バケットに出力します。同時に、SNS トピックへも詳細情報を出力します。

これらの情報から AWS Lambda 関数を使って通知メッセージを生成し、画像へのリンクと検出時刻などの情報をユーザーにメールで通知しましょう。

メール送信用の SNS トピックを作成します。

Amazon SNS コンソール から「トピックの作成」を選択し、user-notification という名前でトピックを作成します。

クリックすると拡大します

トピックの作成が完了したら「サブスクリプションを作成」 をクリックします。

クリックすると拡大します

今回はメール通知ですので、プロトコルに「E メール」を選択し、エンドポイントに自分のメールアドレスを入力し、「サブスクリプションの作成」をクリックします。

すると入力したメールアドレス宛にAWSから AWS Notification - Subscription Confirmation という件名のメールが届きますので、「Confirm subscription」のリンクからサブスクリプションを確認してください。

クリックすると拡大します

次に通知メールの件名や本文を生成するための Lambda 関数を作ります。

rekognition_notification という名前で、ランタイムとして「Python 3.9」を選択します。

クリックすると拡大します

Lambda にトリガーを追加します。SNS トピック「rekognition-notification」を選択してください。

クリックすると拡大します

Lambda の実行ロールにアクセス権限を追加する必要がありますので、「設定」タブから「 アクセス権限」を選び、ロール名をクリックして AWS IAM コンソールに飛びます。

クリックすると拡大します

このロールにインラインポリシーとして権限を追加します。「アクセス許可を追加」 から「インラインポリシーを作成」します。

編集画面で下記のポリシーを貼り付けます。2 つの ARN を正しいものに書き換えてください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "sns:Publish"
            ],
            "Resource": [
                "{S3バケットのARN}/*",
                "{SNSトピックuser-notificationのARN}"
            ]
        }
    ]
}

クリックすると拡大します

Lambda 関数のコード編集画面に移動します。

下記スクリプトをペーストし、SNS トピックの ARN だけ正しいものに書き換えます。「Deploy」ボタンでデプロイすれば、通知部分は完成となります。

import boto3
from botocore.exceptions import ClientError
import json
from datetime import datetime

s3_client  = boto3.client('s3')
sns_client = boto3.client('sns')

def create_presigned_url(bucket_name, object_name, expiration=3600*24*7):
    # Generate a presigned URL for the S3 object
    try:
        response = s3_client.generate_presigned_url('get_object',
                                                    Params={'Bucket': bucket_name,
                                                            'Key': object_name},
                                                    ExpiresIn=expiration)
    except ClientError as e:
        print(e)
        return None

    return response


def lambda_handler(event, context):
    msg = json.loads(event['Records'][0]['Sns']['Message'])
    if msg['eventNamespace']['type'] != 'LABEL_DETECTED':
        return None
        
    detected = msg['labels'][0]
    email_subject = f'ドローンが {detected["name"]} を検出しました'
    timestamp = detected['videoMapping']['kinesisVideoMapping']['producerTimestamp']
    time_detected = datetime.fromtimestamp(timestamp//1000).strftime('%Y-%m-%d %H:%M:%S')
    email_body  = f'Time:  {time_detected}\n'
    email_body += f'Label: {detected["name"]}\n'
    email_body += f'Confidence: {detected["confidence"]} %\n\n'
    
    # Get image URL
    s3_uri = detected['frameImageUri']
    bucket_name = s3_uri.partition('//')[2].partition('/')[0]
    object_key  = s3_uri.partition('//')[2].partition('/')[2]
    image_url = create_presigned_url(bucket_name, object_key)
    if image_url:
        email_body += f'Image URL: {image_url}\n'
        
    response = sns_client.publish(
        TopicArn='{SNSトピックuser-notificationのARN}',
        Message=email_body,
        Subject=email_subject
    )

    return response

7. ローカルで動かすスクリプト

いよいよ最後の手順です ! 先ほど保存した tello_util.py と同じディレクトリに、以下のスクリプトを main.py として保存します。このスクリプトは今回作成した警備・監視ソリューション全体を開始させ、コントロールします。以下のような流れになっています(番号は main 関数中のコメント (1) - (6) と対応しています)。

  1. PC が接続しているネットワーク内で Tello を探し、その IP アドレスを特定します。
  2. Tello EDU から PC へのビデオストリームを開始してから、PC で GStreamer プロセスを起動して Amazon Kinesis Video Streams へビデオストリーミングを開始します。
  3. ビデオが Amazon Kinesis Video Streams に受信され永続化されるのを待って、最新の fragment number (動画の断片である fragment に割り当てられた一意な番号) を取得します。
  4. その fragment number を開始位置としてストリームプロセッサを開始し、Amazon Rekognition Streaming Video Events によるラベル検出を開始します。
  5. Tello EDU を事前定義された飛行コマンドに従って飛行させます (飛行コマンドは main.py 冒頭の FLIGHT_CMDS で与えます)。
  6. Tello EDU の飛行が終わったら、ストリームプロセッサの終了を待ってから Tello EDU からのビデオストリーミングを終了します。
import boto3
import subprocess
import time
from tello_util import *


"""
Sequence of flight commands for Tello.
List of tuple (command, timeout [sec]).
To find available flight commands, see https://dl-cdn.ryzerobotics.com/downloads/Tello/Tello%20SDK%202.0%20User%20Guide.pdf
"""
FLIGHT_CMDS = [
   ('takeoff', 15),
   ('up 50', 10),
   ('forward 160', 20),
   ('cw 180', 20),
   ('forward 100', 20),
   ('land', 10)
]
KVS_STREAM_NAME = 'tello-kvstream'
STREAM_PROCESSOR_NAME = 'tello_stream_processor'
STREAM_PROCESSOR_MAX_DURATION = 60
GSTREAMER_CMD = 'gst-launch-1.0 udpsrc uri=udp://0.0.0.0:11111 '\
    '! h264parse ! video/x-h264,stream-format=avc,alignment=au '\
    f'! kvssink stream-name="{KVS_STREAM_NAME}" storage-size=512 '\
    'access-key="$AWS_ACCESS_KEY_ID" secret-key="$AWS_SECRET_ACCESS_KEY" aws-region="$AWS_DEFAULT_REGION"'


def list_latest_video_fragments(attempts=10):
    """
    Performs ListFragments API for Kinesis Video Streams
    """
    kvs = boto3.client('kinesisvideo')
    endpoint = kvs.get_data_endpoint(
        APIName='LIST_FRAGMENTS',
        StreamName=KVS_STREAM_NAME
    )['DataEndpoint']

    kvam = boto3.client('kinesis-video-archived-media', endpoint_url=endpoint)

    # Some attempts may be needed until media fragments are persisted in KVS
    fragments = []
    for _ in range(attempts):
        timestamp_now = time.time()
        fragments = kvam.list_fragments(
            StreamName=KVS_STREAM_NAME,
            MaxResults=10,
            FragmentSelector={
                'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
                'TimestampRange': {
                    'StartTimestamp': timestamp_now-5,
                    'EndTimestamp': timestamp_now
                }
            }
        )['Fragments']

        if fragments:
            return fragments
        
        print('Failed to list fragments. Retrying ...')
        time.sleep(0.5)
    
    raise Exception('Failed to list fragments.')


def main():
    # (1) Create Tello instance and find Tello's IP address
    tello = Tello()

    # (2) Video stream on (Tello -> PC)
    response = tello.send_cmd('streamon')
    if response != 'ok':
        print(response)
        raise Exception('Failed to start video stream.')
    
    # (2) Start video stream to Kinesis Video Streams using GStreamer
    print('Start GStreamer process')
    devnull = open('/dev/null', 'w')
    gst_proc = subprocess.Popen('exec ' + GSTREAMER_CMD, shell=True, stdout=devnull, stderr=devnull)

    # (3) Get latest video fragments persisted in Kinesis Video Streams
    fragments = list_latest_video_fragments()
    print(f'Got fragments: {fragments}')
    fragment_numbers = [f['FragmentNumber'] for f in fragments]
    latest_fragment_number = max(fragment_numbers)

    # (4) Start stream processor for Rekognition Streaming Video Events
    print('Start stream processor')
    rekognition = boto3.client('rekognition')
    response = rekognition.start_stream_processor(
        Name=STREAM_PROCESSOR_NAME,
        StartSelector={
            'KVSStreamStartSelector': {
                'FragmentNumber': latest_fragment_number
            }
        },
        StopSelector={
            'MaxDurationInSeconds': STREAM_PROCESSOR_MAX_DURATION
        }
    )
    print(response)

    # (5) Start a flight !
    for cmd, timeout in FLIGHT_CMDS:
        response = tello.send_cmd(cmd, timeout=timeout)
        if not response or 'error' in response:
            gst_proc.kill()
            tello.emergency_land()
            raise Exception(f'Failed to execute command "{cmd}". Landing ...')

    # (6) Wait until stream processor stops
    while True:
        response = rekognition.describe_stream_processor(
            Name=STREAM_PROCESSOR_NAME
        )
        status = response["Status"]
        print(f'Stream processor {STREAM_PROCESSOR_NAME}: {status}')
        if status == 'STOPPED':
            break
        elif status == 'FAILED':
            print(f'Stream processor {STREAM_PROCESSOR_NAME} failed.')
            break

        time.sleep(5)

    # (6) Stop video stream to Kinesis Video Streams
    print('Stop Gstreamer process')
    gst_proc.kill()

    # (6) Video stream off (Tello -> PC)
    response = tello.send_cmd('streamoff')
    if response != 'ok':
        raise Exception('Failed to stop video stream.')


if __name__ == '__main__':
    main()

これを python main.py として実行します !


8. デモ動画

ドローンが事前定義されたコマンドによって飛行し、飛行中の映像から人物 (不審者・・・?) が検出され、メールで通知が送られてくるのを確認できました !

このまま実用化できるかはさておき、Amazon Rekognition Streaming Video Events を使用して、カメラ映像を最初から最後まで人手で監視する手間を簡単に削減しています。想定される運用として、メールのリンク先の画像に写った人物に心当たりがない場合に限り、この人物 (不審者・・・?) について詳しく知るために改めて映像を巻き戻して確認、といったフローになるかと思います。


9. おわりに

本記事ではトイドローン Tello EDU と Amazon Rekognition Streaming Video Events を使用して、飛行中の映像に人物を捉えると通知してくれる警備・監視ソリューションを構築しました。

本記事の内容だけでは実用面で様々なツッコミどころがありうるかと思いますが、今回の簡易な構成を発展させると、捜索活動や設備点検などの場でドローン映像を解析し人間の判断をアシストするようなユースケースに繋がっていきます。

本記事をお読みいただいたことで、カメラを搭載したデバイスと AWS のマネージドサービスを組み合わせて、実社会で役立つシステムや面白い仕組みが比較的簡単に構築できるイメージを湧かせていただけたら嬉しく思います。今回使用した Amazon Kinesis Video Streams と Amazon Rekognition の組み合わせで、例えば 顔の検出や認識もシンプルに実現 することもできますので、是非試してみてください。


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます

筆者プロフィール

中西 貴大 (Takahiro Nakanishi)
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト

製造業のお客様の AWS 利用をご支援させていただいています。
古いクルマを直したりイジったりしながら乗っています。機械も含めてものづくり全般が好きです

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する