如何使用 Python 将 MQTT 消息从我的设备发布到 AWS IoT Core?

上次更新时间:2020 年 6 月 16 日

我无法在 AWS IoT Core 和我的设备或 MQTT 客户端之间发送或接收 MQTT(MQ 遥测传输)消息。如何将 MQTT 消息发布到 AWS IoT Core?

简短描述

按照以下说明操作,确认您的 AWS IoT 事物已正确配置,并且其证书已正确连接。要测试您的设置,您可以使用 AWS IoT MQTT 客户端,以及本文中提供的 Python 代码示例。

解决方法

设置一个目录来测试 MQTT 发布

1.    在您的开发环境中创建一个工作目录(例如,iot-test-publish)。

2.    在您的新工作目录中为证书创建一个子目录(例如,certificates)。

3.    在命令行中,将目录更改为您的新工作目录。

安装适用于 Python 的 pip 和 AWS IoT 开发工具包

1.    如果您尚未安装适用于 Python 3 打包的 pip,请进行安装。有关更多信息,请参阅 Python Packaging Authority (PyPA) 网站上的安装

2.    通过从命令行运行以下命令,安装适用于 Python v2 的 AWS IoT 开发工具包:

pip install awsiotsdk

-或者-

如果您希望通过运行以下命令,请安装适用于 Python 的 AWS IoT 设备开发工具包(前一个开发工具包版本):

pip install AWSIoTPythonSDK

有关更多信息,请参阅 GitHub 上的适用于 Python v2 的 AWS IoT 开发工具包适用于 Python 的 AWS IoT 设备开发工具包

注意:虽然建议使用这些开发工具包连接到 AWS IoT Core,但并不是必需的。您还可以使用任何兼容的第三方 MQTT 客户端进行连接。

创建 AWS IoT Core 策略

1.    打开 AWS IoT Core 控制台

2.    在左侧导航窗格中,选择安全

3.    在安全下,选择策略

4.    如果您现有任何 AWS IoT Core 策略,请选择创建以创建新策略。
-或者-
您尚不具有任何策略页面上,选择创建策略

5.    在创建策略页面上,为您的策略输入名称。例如,admin

6.    在添加语句下,执行以下操作:
对于操作,输入 iot:*
注意:允许所有 AWS IoT 操作 (iot:*) 对于进行测试非常有用。但是,最佳实践是提高生产设置的安全性。有关更多的安全策略示例,请参阅 AWS IoT 策略示例
对于资源 ARN,输入 *
对于效果,选中允许复选框。

7.    选择创建

有关更多信息,请参阅创建 AWS IoT Core 策略AWS IoT Core 策略

创建 AWS IoT 事物

注意:您无需创建事物即可连接到 AWS IoT。但是,事物允许您使用更多安全控件和其他 AWS IoT 功能,例如队列索引作业设备影子

1.    在 AWS IoT Core 控制台中的左侧导航窗格中,选择管理

2.    如果您现有任何事物,请选择创建以创建新事物。
-或者-
您尚不具有任何事物页面上,选择注册事物

3.    在创建 AWS IoT 事物页面上,选择创建单一事物

4.    在将您的设备添加到事物注册表页面上,执行以下操作:
为事物输入名称。例如,Test-Thing
(可选)在为此事物添加类型下,选择或创建事物类型
(可选)在将此事物添加到组下,选择或创建组。有关组的更多信息,请参阅静态事物组动态事物组
(可选)在设置可搜索事物属性(可选)下,将属性添加为键/值对。
选择下一步

5.    在为您的事物添加证书页面上,选择创建证书。您会看到一些通知,确认您的事物和已为事物创建证书。

6.    在创建的证书页面上,执行以下操作:
要连接设备,您需要下载以下内容下,为证书、公钥和私钥选择下载
将每个下载的文件保存到您之前创建的证书子目录下。
您还需要下载 AWS IoT 的根 CA 下,选择下载服务器身份验证页面将打开到用于服务器身份验证的 CA 证书

7.    在 Amazon Trust Services 终端节点(首选) 下,选择 Amazon 根 CA 1。证书将在浏览器中打开。

8.    复制证书(从 -----BEGIN CERTIFICATE----------END CERTIFICATE----- 的所有内容)并将其粘贴到文本编辑器中。

9.    将证书以名为 root.pem 的 .pem 文件形式另存到证书子目录中。

10.    在 AWS IoT Core 控制台中的创建的证书页面上,选择激活。该按钮将变为停用

11.    选择附加策略

12.    在为您的事物添加策略页面上,执行以下操作:
选择您之前创建的 AWS IoT Core 策略。例如,admin
选择注册事物

有关更多信息,请参阅以下页面:

复制 AWS IoT Core 终端节点 URL

1.    在 AWS IoT Core 控制台的左侧导航窗格中,选择设置

2.    在设置页面上,在自定义终端节点下,复制终端节点。此 AWS IoT Core 自定义终端节点 URL 是您的 AWS 账户和区域所专用的。

创建 Python 程序文件

将以下 Python 代码示例之一另存为名为 publish.py 的 Python 程序文件。如果您之前安装了适用于 Python v2 的 AWS IoT 开发工具包,请使用以下示例代码:

注意:customEndpointUrl 替换为您的 AWS IoT Core 自定义终端节点 URL。将 certificates 替换为您的证书子目录名称。将 a1b23cd45e-certificate.pem.crt 替换为您的客户端 .crt 的名称。将 a1b23cd45e-private.pem.key 替换为您的私有密钥的名称。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json

# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE
ENDPOINT = "customEndpointUrl"
CLIENT_ID = "testDevice"
PATH_TO_CERT = "certificates/a1b23cd45e-certificate.pem.crt"
PATH_TO_KEY = "certificates/a1b23cd45e-private.pem.key"
PATH_TO_ROOT = "certificates/root.pem"
MESSAGE = "Hello World"
TOPIC = "test/testing"
RANGE = 20

# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=ENDPOINT,
            cert_filepath=PATH_TO_CERT,
            pri_key_filepath=PATH_TO_KEY,
            client_bootstrap=client_bootstrap,
            ca_filepath=PATH_TO_ROOT,
            client_id=CLIENT_ID,
            clean_session=False,
            keep_alive_secs=6
            )
print("Connecting to {} with client ID '{}'...".format(
        ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
# Publish message to server desired number of times.
print('Begin Publish')
for i in range (RANGE):
    data = "{} [{}]".format(MESSAGE, i+1)
    message = {"message" : data}
    mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
    print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'")
    t.sleep(0.1)
print('Publish End')
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()

如果您安装了适用于 Python 的 AWS IoT 设备开发工具包(前一个开发工具包版本),请使用以下示例代码:

注意:customEndpointUrl 替换为您的 AWS IoT Core 自定义终端节点 URL。将 certificates 替换为您的证书子目录名称。 将 a1b23cd45e-certificate.pem.crt 替换为您的客户端 .crt 的名称。将 a1b23cd45e-private.pem.key 替换为您的私有密钥的名称。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

import time as t
import json
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE
ENDPOINT = "customEndpointUrl"
CLIENT_ID = "testDevice"
PATH_TO_CERT = "certificates/a1b23cd45e-certificate.pem.crt"
PATH_TO_KEY = "certificates/a1b23cd45e-private.pem.key"
PATH_TO_ROOT = "certificates/root.pem"
MESSAGE = "Hello World"
TOPIC = "test/testing"
RANGE = 20

myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883)
myAWSIoTMQTTClient.configureCredentials(PATH_TO_ROOT, PATH_TO_KEY, PATH_TO_CERT)

myAWSIoTMQTTClient.connect()
print('Begin Publish')
for i in range (RANGE):
    data = "{} [{}]".format(MESSAGE, i+1)
    message = {"message" : data}
    myAWSIoTMQTTClient.publish(TOPIC, json.dumps(message), 1) 
    print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'")
    t.sleep(0.1)
print('Publish End')
myAWSIoTMQTTClient.disconnect()

测试您的设置

1.    在 AWS IoT Core 控制台中的左侧导航窗格中,选择测试

2.    在 MQTT 客户端页面上,对于订阅主题,输入 test/testing

3.    选择订阅主题。一个名为 test/testing 的测试主题已准备就绪,可以测试消息发布。有关更多信息,请参阅查看 AWS IoT MQTT 客户端的设备 MQTT 消息

4.    从命令行运行以下命令:

python3 publish.py

Python 程序会将 20 个测试消息发布到您在 AWS IoT Core 控制台中创建的主题 test/testing。在控制台中查看该主题,以查看发布的消息。

提示:您还可以测试其他开发工具包功能,例如,使用所包含的 pubsub 示例,通过 WebSockets 进行订阅和连接。有关更多信息,请参阅 GitHub 上的 pubsub(适用于 Python v2 的 AWS IoT 开发工具包)或 BasicPubSub(适用于 Python 的 AWS IoT 设备开发工具包)。

(可选)启用 AWS IoT 将日志记录到 Amazon CloudWatch

您可以监控事件日志中发布到 AWS IoT Core 的 MQTT 消息。有关设置说明,请参阅配置 AWS IoT 日志记录使用 CloudWatch Logs 监控 AWS IoT


AWS IoT Core 入门

注册设备

常见问题(MQTT 消息协议网站)

这篇文章对您有帮助吗?

我们可以改进什么?


需要更多帮助?