測距センサーと鉄道おもちゃを使って 2 次元地図を作る簡易スキャナを作ってみた

2023-04-03
日常で楽しむクラウドテクノロジー

大井 友三

こんにちは。ソリューションアーキテクトの大井です。

最近時々お客様から、デジタルツインを作る際に必要となる 3D モデルの生成や物体の設備の配置状況をモデル化するために必要な 2 次元地図生成についてご相談を受けることが増えてきました。

スマートフォンでも測距センサーが内蔵され始め、現実のものから 3D モデルを生成する事自体はハードルが下がってきました。一方で自分の周囲を撮影して地図を作成するなどの場合に使える市販の 3D スキャナはまだまだ高価なため、簡素なデバイスを組み合わせて地図を作るためのスキャナを作れないか・・・と思い立ちました。

このクラウドレシピ (ハンズオン記事) を無料でお試しいただけます »

毎月提供されるクラウドレシピのアップデート情報とともに、クレジットコードを受け取ることができます。 


1. 今回作るもの

一般的にものの形をスキャンするためには「LiDAR」(ライダー : Laser Imaging Detection And Ranging) と呼ばれるレーザーや赤外線などの光の反射を利用した測距センサー (距離を測るためのセンサー) を用います。身近なものではスマートフォンやお掃除ロボット、車の近接センサー、ゴルフの距離計・・・など様々な場面で用いられています。

具体的な仕組みは後段の中で一緒にお伝えしていければと思いますので、一旦本題に移りたいと思います。

さて、物体の距離までを測るには測距センサーと数値を取得するデバイスがあれば事足りますが、物体の形をとるとなると下記の要素が必要になります。

  1. 距離を測る
  2. データを取得する
  3. 自転もしくは対象物の周りを周回しながら連続で距離を測る
  4. 取得したデータを可視化のために座標に変換する
  5. 変換した座標を元に可視化する

1. については測距センサーを利用しますが、精度も価格もピンキリです。今回はなるべく簡素な形で実現したいと考え、“TF mini-S” というセンサーデバイスを利用することにしました。測定可能な距離は 0.1 m ~ 12 m、測定単位が 1 cm なので、対象物が大きなものであれば形状を捉えるのが簡単そうです。

2. については M5Stack の製品の一つである “M5StickC Plus” というデバイスを利用しました。このデバイスであればバッテリーも内蔵しているため、長時間の稼働でなければどこにでも持ち運ぶことが出来ます。

問題は 3. です。物体の周りを周回しながら形をスキャンしようと思うと、センサーが空間上のどこにいるのかを距離計測しながら別で把握する必要があります。

一つの方法として加速度センサーや角速度センサー (ジャイロセンサー) などが内蔵された IMU と呼ばれるデバイスを利用することもできます。

M5StickC Plus にはこれらのセンサーも内蔵されているため、まずはこれを使ってみたいと思います。

・・・・が、そうは問屋がおろしませんでした。考えてみると、上で記載した通り物体の周りをまわるということは、センサー自身が空間を認識して自分がその中でどこにいるのかというのを常時計算し続ける必要があります。M5StickC Plus に内蔵されている IMU からの値を計算することで実現できそうな気もしますが、あまりに計算が複雑になりすぎるため、もう少し手軽にセンサーの位置を把握する方法が無いか模索していました。

そんなとき、私の子供が遊んでいた鉄道のおもちゃに目が留まりました。

「この鉄道のおもちゃなら線路の上しか走れないし一定の速度で走ってくれそうなので、これにセンサーを搭載して走らせれば位置情報は推測にして代替できるかもしれない。」

子供の車両を親の趣味で拝借するのはさすがに気が引けたので、これ用に個別に入手することにしました。用意したのは電気機関車と M5StickC Plus がすっぽりと収まる貨車です。

そして軌道は、子供のレールを借りて外径約 50 cm の円形軌道を作成しました。

これで先ほどの貨車にセンサーを搭載することで正確な円を描きながら計測ができそうです。


2. 測距センサーから距離データの読み取り

次はセンサーからデータを読み取る必要があります。

TF mini-S のデータシート を見ると、インターフェースは UART となっています。

M5StickC Plus のデータシートによると、UART で接続するには Grove のポートが利用できそうです。

まずは、距離データがをきちんと読み取れるかテストしてみます。

テスト用にピンを付けたケーブルとブレッドボードを用意し、配線が間違えていてもつなぎ変えが楽にできるようにしました。

TFmini-S のデータシートを参照しながら UIFlow でプログラムを組んでいきます

TFmini-S は 9 bytes の長さのデータで構成されており、はじめの 2 bytes はヘッダ、3 bytes 目と 4 bytes 目が距離データが格納されています。それらを踏まえてこちらのようにプログラムを組んでいきます。

画像が小さいので少しわかりづらいですが、このようにデータが取れていることがわかります。

さて、配線も問題なさそうなのでケーブルをつなぎ合わせます。

接続するための端子が共に特殊なため、製品に付属しているケーブルを改造して結線することにしました。

センサー用のケーブルの結線が終わったら M5StickC Plus に取り付け、センサーと一緒に貨車に取り付けます。

周囲を横目で見ながら計測するため、貨車の側面にセンサーを取り付けます。


3. AWS サービスの設定

このデモでは、下記のように構成していきます。

はじめに、測距センサーを取り付けた M5StickC Plus から測定したデータを AWS IoT にトピック名「m5stick/scanner」で送ります。AWS IoT 側ではこのトピックあてに送られたデータは高頻度になるため、一旦 Amazon Kinesis Data Firehose を経由して Amazon S3 に格納するようにします。

M5StickC Plus は内蔵メモリが多くはないため、測距センサー (TFmini-S) の測定頻度 (1000 Hz) で測定された値をすべて内蔵メモリに格納しようとすると、メモリ不足のエラーが発生しやすくなります。そこで、一定回数ごとに Publish するように構成していきます。

S3 に格納されたデータは GPU が搭載された Amazon EC2 G4dn インスタンス の Windows Server に 3D 編集ツールである Blender を導入し、Blender から S3 のデータを読み込んで可視化するようにしていきます。

描画には Blender を活用することにします。Python を利用したプログラミングでの描画ができるので、S3 からのデータ取得から描画までをコードで記述していきます。また、蛇足ですがこのコード開発のために Amazon SageMaker Studio を利用してデータの可視化を行いながら開発をしていくことにしました。

3-1. AWS IoT の設定

M5StickC Plus からのデータを受けるために AWS IoT Core の設定を行っていきます。

まずは IoT Core のポリシーを作成します。

  1. マネジメントコンソールにて AWS IoT Core の画面を開き、「管理」→「セキュリティ」→「ポリシー」を開きます。
  2. ポリシー名 に scanner-demo-policy、ポリシードキュメントに下記の設定を行い、「作成」をクリックします。アカウント ID やモノの名前の部分は適宜差し替えてください。
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:ap-northeast-1:accountid:thing/scanner-demo"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:iot:ap-northeast-1:accountid:thing/scanner-demo"
    }
  ]
}

クリックすると拡大します

続いてモノの作成を行います。「管理」→「すべてのデバイス」→「モノ」をクリックし、「一つのものを作成」 を選択して「次へ」をクリックします。

クリックすると拡大します

モノのプロパティの「モノの名前」に scanner-demo と入力し、他はデフォルトのまま 「次へ」をクリックします。

クリックすると拡大します

新しい証明書を自動生成 (推奨)を選択し、「次へ」をクリックします。

クリックすると拡大します

先に作成したポリシーを選択し、「モノの作成」をクリックします。

クリックすると拡大します

証明書とキーをダウンロード にてデバイス証明書、パブリックキーファイル、プライベートキーファイル、ルート CA 証明書をそれぞれダウンロードします。

フォルトのファイル名は M5StickC Plus で受け入れられるファイル名の長さを超えているため、デバイス証明書は certificate.pem.crt、プライベートキーは priavte.pem.key などの短い名称に変更します。

*キーはここでダウンロードし忘れると再度発行になるため、忘れないようにしてください。

クリックすると拡大します

3-2. Amazon S3 バケットの作成

データの受け入れ先となる S3 バケットを作成します。

マネジメントコンソールで S3 の画面を開き、「バケットの作成」をクリックします。

バケット名を入力し、他はデフォルトのままで「バケットを作成」をクリックします。

クリックすると拡大します

3-3. Amazon Kinesis Data Firehose の設定

Kinesis Data Stream のデータを S3 に流すために Kinesis Data Firehose を利用します。

マネジメントコンソールで Kinesis の画面開きます。

ソースと送信先を選択」にて ソース に「Direct PUT」、送信先に「Amazon S3」をそれぞれ選択します。

クリックすると拡大します

送信先の設定で先に作成した S3 バケットを選択します。

また、動的パーティショニングマルチレコードのディスアグリゲーションJSON のインライン解析もそれぞれ「有効」にします

クリックすると拡大します

計測ごとに自動的に格納フォルダーを分けることで識別しやすくするため、動的パーティショニングを利用します。

レコード内の UUID をキーとしてプレフィックスを付けて格納するようにします。

エラーは error/ の文字列をプレフィックスとして出力するようにします。

クリックすると拡大します

また、1 回の計測時間は短いため、バッファ間隔も既定の 300 秒から 60 秒に変更します。その後「配信ストリームを作成」をクリックします。

3tene から引用
(クリックすると拡大します)


4. M5StickC Plus の設定

このデモでは、センサーが移動するため、計測の開始と停止をリモートでできるように M5StickC Plus の Remote+ を利用して、スマートフォンからWeb画面を経由してリモートアクセスができるように構成していきます。

https://flow.m5stack.com にアクセスします。

M5StickC Plus 起動時に表示される API Key を入力し、Device から M5StickC Plus (最上段の右から 2 番目) を選択して「OK」をクリックします。

クリックすると拡大します

Blockly の画面から「Remote+」を選択し、「Remote qrcode show in x ......」をドラッグして Setup 直下に結合します。

Remote Preview が表示されるので、BTN の部品を 2 つスマートフォンの画面上に配置して、それぞれ startstop と名前を付けます。

クリックすると拡大します

AWS」を選択し、Init things name などの名称がある部品を Remote の下に結合し、同ブロックの書類アイコンをクリックして、AWS IoT のモノを作成した際にダウンロードしたデバイス証明書とプライベートキーをアップロードします。

その後、keyFile にはプライベート証明書、certFile にはデバイス証明書をそれぞれプルダウンから割り当てます。

クリックすると拡大します

Python の画面に切り替えると、Blockly で入力した内容が python で記述されていることがわかります。

クリックすると拡大します

本番用のコードに差し替えます。

from m5stack import *
from m5ui import *
from uiflow import *
from IoTcloud.AWS import AWS
import imu
import utime
import json
import ntptime
import time

setScreenColor(0x111111)
lcd.setRotation(2)
remoteInit()

iot_topic = None
recv = None
checksum = None
i = None
distance = None
data = None
status = False
nData = None
dt = None
st = None
starttime = None
endtime = None
records = None
record = None
nSkip = None

title = M5Title(title="Scanner", x=3, fgcolor=0xFFFFFF, bgcolor=0x0000FF)
l_dist = M5TextBox(15, 80, "Distance", lcd.FONT_DejaVu18, 0xFFFFFF, rotate=0)
messages = M5TextBox(0, 20, "Messages", lcd.FONT_Default, 0xFFFFFF, rotate=0)

lcd.qrcode('https://flow.m5stack.com/remote?id=123456789012345678', 0, 100, 130)

#---------------
# 初期化
#---------------
def values():
  global recv, distance, data, status, records, record, nSkip, nData, st, dt
  recv = None
  distance = 0
  data = {}
  status = False
  records = []
  record = {}
  nSkip = 30
  nData = 0
  dt = 0

#------------------------------
# AWS IoT Core へ Publish する
#------------------------------
def pubData():
  global iot_topic, st, records
  
  data = {
    'uuid': st,
    'records': records
  }
  
  aws.publish(str(iot_topic),json.dumps(data))
  
  # 送り終わったreccordsは空にする
  records = []

#---------------
# 計測開始
#---------------
def button_start_callback():
  global status, iot_topic, st, starttime, records
  
  # Set UUID for record  
  st = '' # uuid
  records = []
  try:
    now = time.localtime()
    st = '{}{:0=2}{:0=2}{:0=2}{:0=2}{:0=2}'.format(now[0],now[1],now[2],now[3],now[4],now[5])
  except Exception as e:
    messages.setText(str(e.args[0]))
  
  # Start to scan
  starttime = utime.ticks_ms()
  l_dist.setText('Scanning...')
  status = True

#---------------
# 計測終了
#---------------
def button_stop_callback():
  global status, iot_topic, nData, starttime
  status = False
  
  try:
    pubData()
    messages.setText(str(nData) + ' records were published.\nDuration: ' + str(utime.ticks_ms() - starttime))
    l_dist.setText('Finished')
  except Exception as e:
    messages.setText(str(e))
  
  records = []
  data = {}
  nData = 0
  


try:
  # AWS IoT 用初期設定
  messages.setText('Initializing for AWS...')
  aws = AWS(things_name='Scanner', host='xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com', port=8883, keepalive=1200, cert_file_path="/flash/res/certificate.pem.crt", private_key_path="/flash/res/private.pem.key")
  iot_topic = 'm5stickc/scanner' # IoT Topicを定義
  aws.start()
  messages.setText('Initializing for AWS...done!')
  
  # NTP サーバーとの時刻同期
  messages.setText('Get current time using NTP...')
  ntp = ntptime.client(host='jp.pool.ntp.org', timezone=9)
  rtc.setTime((ntp.year()), (ntp.month()), (ntp.day()), (ntp.hour()), (ntp.minute()), (ntp.second()))
  messages.setText('Get current time using NTP...done!')
  
  # 測距センサーの初期化
  messages.setText('Initializing scanning device...')
  uart1 = machine.UART(1, tx=32, rx=33)
  uart1.init(115200, bits=8, parity=None, stop=1)  
  messages.setText('Initializing scanning device...done!')

  # 各種変数の初期化
  values()
  
  # 計測
  while True:
    try:
      # Start ボタンが押されると計測開始
      if status == True:
        
        # 測距センサーからのデータ取得
        if uart1.any():
          recv = uart1.read(9)
          if len(recv) == 9:
            if recv[0] == 0x59 and recv[1] == 0x59:
              checksum = 0
              for i in range(0, 8, 1):
                checksum = checksum + recv[int(i)]
              checksum = checksum % 256
              if checksum == recv[8]:
                distance = recv[2] + recv[3] * 256
                
                # 計測時の Uptime と距離を JSON 形式で纏める
                record = {
                  'time': utime.ticks_ms(),
                  'distance': distance
                }
                
                # 配列に格納
                records.append(record)
                
                # nSkip 回に 1 回まとめて AWS IoT に Publish する
                if nData % nSkip == 0:
                  pubData()
                
                nData = nData + 1
    
    except Exception as e:
      messages.setText("Internal Message: " + str(e))

except Exception as e:
  messages.setText(str(e))

Run」をクリックし、M5Stick へのデプロイを行います。

デプロイ後に LCD に表示される QR コードへアクセスします。

QR コードで読み取った URL にアクセスすると下記のサイトが表示されます。

Start / Stop をそれぞれ押下して、計測が開始・停止されることを確認します。

クリックすると拡大します


5. 計測の実施

今回は、図のような形状の部屋をスキャンしてみました。

クリックすると拡大します

部屋の真ん中にレールを敷設してセンサーが円の外側を向くようにして電車を走らせます。ある程度走らせてから「Start」を押し、ちょうど一周したところで「Stop」を押します。

しばらくすると S3 バケットにデータが格納されていきます。1 回の計測で 1 フォルダ作成されます。

クリックすると拡大します

フォルダ内にはこのようなファイルが 1 つだけ格納されています。

クリックすると拡大します

ファイルを開くと、下記のような形式でデータが格納されます。

{
    'uuid':xxxxxxxxxxxxxx,
    'records':[
        {
            'time':xxxxx,
            'distance':xxxx
        },
        {
            'time':xxxxx,
            'distance':xxxx
        },
        ・・・
        {
            'time':xxxxx,
            'distance':xxxx
        }
    ]
}
{
    'uuid':xxxxxxxxxxxxxx,
    'records':[
        {
            'time':xxxxx,
            'distance':xxxx
        },
        {
            'time':xxxxx,
            'distance':xxxx
        },
        ・・・
        {
            'time':xxxxx,
            'distance':xxxx
        }
    ]
}

クリックすると拡大します


6. (おまけ) 計測データを Blender で可視化するための Python コード開発

データの成型方法の検討のために Amazon SageMaker Studio 利用して Python のコードの開発をしていきたいと思います。

6-1. Amazon SageMaker Studio の準備

マネジメントコンソールから「共通セットアップ」を選択し、ドメイン名を入力して「送信」をクリックします。

クリックすると拡大します

SageMaker Domain で使用する適当なVPCを選択し、「保存して続行」をクリックします。

起動後にドメインの詳細から「起動」→「Studio」の順に起動します。

クリックすると拡大します

Blender の空間上に配置するにあたっての座標の算出方法をこのように計算していきます。

クリックすると拡大します

SageMaker Studio 上で図のように可視化しながらコード開発していきます。

一番下の図が散布図に配置した計測データです。それっぽい形にはなっていそうですね。

クリックすると拡大します


7. Blender の設定とデータの取り込み

さて、ここからBlenderと連携していくことにします。

Blender についての解説は本記事の範疇外のため、ご存じない方は是非調べてみてください。一言でいうと、3D のモデリングやレンダリングをするためのオープンソースソフトウェアです。

7-1. Blender のインストール

Blender 自体はノート PC などでも動作しますが、S3 へのアクセスなどで権限割り当てが必要になるため、EC2 に IAM ロールを割り当てて利用することにします。3D を扱うため、G4dn などの GPU を搭載した Windows の EC2 インスタンスがお勧めです。

構築した EC2 にログインし、Blender をインストールします。Blender は こちらの公式サイト からダウンロードしてください。本記事では執筆時点の最新版である 3.4.0 を利用しています。

7-2. Boto3 のインストール

Python.org の公式サイトから Windows 版の Python をダウンロードしてインストールします。

Blender を開いている場合は閉じます。

コマンドプロンプトを起動して下記のコマンドを実行します。

# Blender の python\bin フォルダ直下の python にて実行します
c:\Program Files\Blender Foundation\Blender 3.4\3.4\python\bin>python -m pip install -U boto3
Collecting boto3
  Using cached boto3-1.26.74-py3-none-any.whl (132 kB)
Collecting botocore<1.30.0,>=1.29.74
  Using cached botocore-1.29.74-py3-none-any.whl (10.4 MB)
Collecting jmespath<2.0.0,>=0.7.1
  Using cached jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.7.0,>=0.6.0
  Using cached s3transfer-0.6.0-py3-none-any.whl (79 kB)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in c:\program files\blender foundation\blender 3.4\3.4\python\lib\site-packages (from botocore<1.30.0,>=1.29.74->boto3) (1.26.8)
Collecting python-dateutil<3.0.0,>=2.1
  Using cached python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Collecting six>=1.5
  Using cached six-1.16.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: six, jmespath, python-dateutil, botocore, s3transfer, boto3
Successfully installed boto3-1.26.74 botocore-1.29.74 jmespath-1.0.1 python-dateutil-2.8.2 s3transfer-0.6.0 six-1.16.0

Blender を起動し、 「Scripting」を選択し、コンソール内で import boto3 と入力して実行した際にエラーにならなければ完了です。

クリックすると拡大します

7-3. Blender での描画

スクリプト欄に下記のスクリプトを貼り付けて実行します。

import boto3
import json
import base64
import math
import bpy


client = boto3.client('s3')
bucket_name = 'scanner-demo'

directory = "データが格納されているディレクトリ/"

response = client.list_objects_v2(
    Bucket=bucket_name,
    Prefix=directory
)
keys = [obj.get('Key') for obj in response['Contents']]

records = []
latest_time = 0
json_stream = None
for key in keys:
    response = client.get_object(
        Bucket=bucket_name,
        Key=key
    )
    json_stream = response['Body'].read().decode()
    json_stream_array = []
    json_stream_array = json_stream.split('\n')
    
    for i_json_stream_array in json_stream_array:
        records.append(json.loads(i_json_stream_array))

distances = []        
for record in records:
    for data in record['records']:
        distances.append(data)

#distances
new_records = []
new_records = sorted(distances, key=lambda tmp_records: tmp_records['time'])

# 角速度の計算
duration = new_records[len(new_records) - 1]['time'] - new_records[0]['time']
omega = 360 / (duration / 1000)

print("Omega: ", omega)
print("Start Time: ", new_records[0]['time'])
print("End Time: ", new_records[len(new_records) - 1]['time'])
print("Duration: ", duration / 1000)


# Calculate Position
radius = 50 / 2
for i_records in new_records:
    degree = (i_records['time'] - new_records[0]['time']) / 1000 * omega
    target_pos_x = (radius + i_records['distance']) * math.cos(-math.radians(degree))
    target_pos_y = (radius + i_records['distance']) * math.sin(-math.radians(degree))
    bpy.ops.mesh.primitive_uv_sphere_add(location=(target_pos_x,target_pos_y,0), scale=(0.05,0.05,0.05))

実行するとこのように小さな球体が各所に描画され、粗くはありますが、なんとなく部屋の形の雰囲気がでていそうです。

クリックすると拡大します

レイアウトの画面にすると、見やすくなります。

クリックすると拡大します


8. さいごに

いかがでしたでしょうか。簡易的ではありますが、測距センサーとある程度正確な軌道さえ用意できれば比較的簡単に周囲の形状をスキャンすることが出来ることがお分かりいただけたと思います。

今回は平面での計測ですが、垂直方向への移動も加えると三次元でも計測できそうです。3 次元スキャンについては、また別な機会にお届けできればと思います。


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます

筆者プロフィール

大井 友三
アマゾン ウェブ サービス ジャパン合同会社
ソリューションアーキテクト

主に鉄鋼・化学などの製造業のお客様を担当。
趣味のベースをスタジオで鳴らせないので、憂さ晴らしに始めた M5Stack に手を出したところ面白さにはまる。最近は自宅の家庭菜園を IoT を使って監視できるようにするなど、M5Stack や Raspberry PI でいかに遊べるかを模索中。

さらに最新記事・デベロッパー向けイベントを検索

下記の項目で絞り込む
1

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する