Amazon Web Services ブログ
AWS IoT Core の新しい共有サブスクリプションの利用を開始する
はじめに
AWS IoT Core の新しい機能の共有サブスクリプションは、サブスクライブする複数の MQTT セッションまたはコンシューマーにロードバランシング機能をもたらします。共有サブスクリプションを利用しない場合はすべてのサブスクライバーにすべてのパブリッシュされたメッセージを送信するのに対し、共有サブスクリプションを利用した場合はランダムに選ばれた 1 つのサブスクライバーにのみ、パブリッシュされたメッセージが送信されます。このブログでは、AWS IoT Core で MQTT の共有サブスクリプションを開始する方法を紹介します。この機能を使うための手順を詳しく説明し、共有サブスクリプションの負荷分散の仕組みについてデモを行います。
Non-shared Subscriptions Flow | Shared Subscriptions Flow |
---|---|
共有サブスクリプションは、コネクテッド自動車のソリューション、産業・製造業、コネクテッドホームなどの分野で使用されています。何百万台ものデバイスが共通のトピックにメッセージを発信するような環境では、非常に有利に働きます。受信したデータを処理する必要がある消費者向けアプリケーションもあるでしょう。例えば、前処理を行った後にデータレイクに保存したり、予知保全のための機械学習パイプライン、ロケーションベースのアプリケーションなどです。これらの低速のアプリケーションサーバーをグループ化して、高速の受信データを処理することができます。各グループは、共有サブスクリプション上の共有トピックから読み出すことができます。これにより、グループ内のアプリケーションサーバー間で処理負荷を分散させることができます。共有サブスクリプション機能がない場合、MQTT トピックにパブリッシュされたメッセージは、そのトピックをサブスクライブしているすべてのクライアントに受信されます。共有サブスクリプション機能を使用すると、グループのクライアントのうち 1 つだけがこのトピック上のメッセージを受信します。要約すると、負荷分散された共有サブスクリプショントピックによる利点は以下の通りです。
- ヘルス状態を考慮して負荷分散される高可用なコンシューマー
- パブリッシュされたメッセージの負荷変動に応じたコンシューマーの水平スケーリング
- 時間のかかる処理によりコンシューマーの動作が遅くなることへの対処が可能、など
概要
チュートリアルにおけるサンプルの状況とメッセージフローを以下に示します。共通のトピック cars/data にデータをパブリッシュしている車が N 台あります。このトピックには 5 つのサブスクライバーが存在します。Subscriber 1A と 1B は Consumer Group1 に属し、Subscriber 2A と 2B は Consumer Group2 に属しています。Subscriber 3は独立したコンシューマーです。
MQTT5 規格で定義された共有サブスクリプションは、MQTT3 および MQTT5 クライアントの両方で有効です。MQTT5 固有の機能は使用せず、専用の予約済みトピック空間である $share を使用します。新規および既存の MQTT3 クライアントは、共有サブスクリプションにパブリッシュまたはサブスクライブできます。
共有サブスクリプションを使用するには、クライアントは以下のように共有サブスクリプションのトピックフィルターをサブスクライブします。
$share/{ShareName}/{TopicFilter}
サブスクライブするクライアントごとに、SUBSCRIBE パケットで異なる QoS レベルを要求することが許されています。サーバーが選択したサブスクライバーへの QoS 1 メッセージを処理中に、確認メッセージ (すなわち PubAck) を受信する前に接続が切れた場合、サーバーは共有グループ内の次のサブスクライバーにメッセージを再送します。共有グループのサブスクライバーが切断された場合、メッセージはグループ内の別のサブスクライバーに送信され、メッセージの配信が担保されます。
前提条件
このブログの手順に従うには、AWS アカウント、AWS IoT Core がサポートされているリージョン、AWS IoT Rules、AWS Lambda Functions、AWS Identity and Access Management(IAM)ロールとポリシー、AWS CloudShell へのアクセス権が必要です。また、Linux の bash コマンドの基本を熟知していることを前提としています。
手順
さて、この機能の概要について見てきましたが、次はその実装のための手順を説明します。これから行うセットアップにかかる時間は、30 分以内です。
このチュートリアルでは、Car1 と Car2 をパブリッシャーとして、Subscriber 2A, 2B が属する Consumer Group2 と、独立した Consumer3 をサブスクライバーとして作成します(上のアーキテクチャ図を参照)。共有サブスクリプション機能では、メッセージが Consumer Group2 にパブリッシュされると、サブスクライバーのうちの 1 人(この例では 2A か 2B )だけがランダムにメッセージを受け取ります。また、Consumer3 はすべてのメッセージを取得します。
ステップ 1: AWS CLI または AWS Console を使用して、Car1, Car2, Subscriber 2A, Subscriber 2B および Consumer3 に対応する IoT モノを作成します。仮想デバイスの作成方法に慣れていない場合は、このドキュメントに記載されているステップバイステップのデバイス作成方法を参照してください。
出力:
参考までに、以下のようなフォルダ構成になっています。
ステップ 2: 仮想デバイスを作成した後、mosquitto MQTT クライアント(本ブログでは Eclipse Mosquitto )を使用して、メッセージのパブリッシュとサブスクライブを行うことにします。クライアントのダウンロードとインストールは、このドキュメントの手順を参照してください。このユースケースを検証するために、5 つのターミナルを使用します。パブリッシャーである Car1 と Car2 は cars/data トピックにパブリッシュし、Subscriber 2A と 2B は $share/group2/cars/data トピックにサブスクライブします。ターミナルを開き、それぞれのターミナルでこれらのコマンドを実行してください。Consumer3 はどのグループにも属さず、トピック cars/data をサブスクライブしています。
IoT エンドポイントの環境変数を設定する
macOS, Ubuntu, Red Hat などの *.nix ベースのオペレーティングシステムを使用している場合は、以下のコマンドを実行してください。コマンドを実行する前に、jq ライブラリがインストールされていることを確認してください。
endpoint=`aws iot describe-endpoint --endpoint-type iot:Data-ATS | jq -r '.endpointAddress'`
Windows をお使いの場合は、以下のコマンドで環境変数を設定してください。
$endpoint = aws iot describe-endpoint --endpoint-type iot:Data-ATS | jq -r '.endpointAddress'
サブスクライバーを設定する
ターミナル 1: Subscriber 2A の証明書があるフォルダに移動して、以下のコマンドを実行します。
mosquitto_sub --cafile AmazonRootCA1.pem \
--cert subscriber2a.certificate.pem \
--key subscriber2a.private.key -h $endpoint -p 8883 \
-q 0 -t "\$share/group2/cars/data" -i subscriber2a-sub \
--tls-version tlsv1.2 -d -V mqttv5
出力:
ターミナル 2: Subscriber 2B の証明書があるフォルダに移動して、以下のコマンドを実行します。
mosquitto_sub --cafile AmazonRootCA1.pem \
--cert subscriber2b.certificate.pem \
--key subscriber2b.private.key -h $endpoint -p 8883 \
-q 0 -t "\$share/group2/cars/data" -i subscriber2b-sub \
--tls-version tlsv1.2 -d -V mqttv5
出力:
ターミナル 3: Consumer3 の証明書があるフォルダに移動して、以下のコマンドを実行します。
mosquitto_sub --cafile AmazonRootCA1.pem \
--cert consumer3.certificate.pem \
--key consumer3.private.key -h $endpoint -p 8883 \
-q 0 -t "cars/data" -i consumer3-sub \
--tls-version tlsv1.2 -d -V mqttv5
出力:
パブリッシャーの設定
新しいターミナルを開き、以下のコマンドを実行して cars/data トピックにデータをパブリッシュします。ここでは、車の速度情報を .json ファイルでパブリッシュすることにします。以下の内容をコピーして、cars1 フォルダの messages.json ファイルに保存してください。
{"ID": "car1", "speed": "75"}
{"ID": "car1", "speed": "77"}
{"ID": "car1", "speed": "79"}
以下のコマンドを実行し、トピックにデータをパブリッシュします。
cat messages.json | mosquitto_pub --cafile AmazonRootCA1.pem \
--cert car1.certificate.pem \
--key car1.private.key -h $endpoint -p 8883 \
-q 0 -t cars/data -i car1 \
--tls-version tlsv1.2 -d -V mqttv5 -D publish topic-alias 2 -l
Car2 フォルダに移動し再度上記手順を実施し、サブスクライバーを観察します。
出力:
以下の出力は、Consumer3 がすべてのメッセージを受信するのに対し、Subscriber 2A または 2B のどちらか一方だけが各メッセージを受信することを示しています。
後片付け
料金が継続して発生することを避けるために、このブログで作成されたリソースを削除してください。リソースを削除するには、以下の手順に従ってください。
ステップ 1: IoT モノに紐付いた証明書を削除する。
AWS コンソールで AWS IoT Core に移動し、左ペインで セキュリティ → 証明書 を選択し、全ての証明書を選択してから アクション → 削除 をクリックして、Car1, Car2, Subscriber 2A, 2B, Consumer3 に紐付いた全ての証明書を削除します。
ステップ 2: 作成した IoT モノを削除する。
AWS IoT Core のサービスページで、管理 → すべてのデバイス → モノ を選択します。このチュートリアルで作成された全てのモノを選択し、削除を選択します。
まとめ
この記事では、AWS IoT Core の新しい共有サブスクリプション機能を使い始める方法、機能を使用する前に取るべき重要なステップ、グループを作成してトピックサブスクライバーを負荷分散する方法を学びました。AWS IoT Core での共有サブスクリプションの使用についてのより詳細な情報については、開発者ガイドをご覧ください。AWS がサポートする MQTT5 機能の開始と詳細については、技術ドキュメントを参照してください。
この記事は Aditi Gupta, Harish Rajagopalan によって書かれた How to get started with the new shared subscriptions in AWS IoT Core の日本語訳です。この記事はソリューションアーキテクトの中西貴大が翻訳しました。
筆者について