The Internet of Things on AWS – Official Blog
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
Introduction
We are excited to announce that AWS IoT Core now supports MQTTv5 features that help enhance communications of large-scale device deployments and innovate device messaging patterns. Customers who already have MQTTv3.1.1 deployments can make use of the new MQTTv5 features as AWS IoT Core provides seamless integration between both versions and supports heterogeneous deployments during migrating. In this blog post, we summarize some of MQTTv5 features with bite-sized implementation examples in real-world IoT scenarios to show how you can design more flexible and efficient IoT architecture patterns. We also show how MQTTv5 brings new possibilities for your existing device fleet running AWS IoT Core.
After a successful run with MQTTv3.1.1, OASIS (Organization for the Advancement of Structured Information Standards) improved the specifications with a key goal of enhancing for scalability and large-scale systems. These enhancements resulted in the release of MQTT version 5 (MQTTv5) as the new standard in March 2019. Refer to the MQTT 5 supported features documentation page for details.
Prerequisites
- AWS account
- A development environment, or computer with AWS CLI and Python 3 installed.
Getting Started
- Access example scripts: Example script files provided in this blog post are stored in aws-samples/aws-iot-mqttv5-examples git repository. Navigate to the git repository link to download to your development environment.
- Download an MQTT client library: In this blog post, we use open-source Eclipse Paho™ MQTT Python Client library.
- Install pip by running the following command, if not installed:
python -m ensurepip --upgrade
- Install paho-mqtt library by running the following command:
pip install paho-mqtt
You can also check the paho-mqtt source code repository and other installation options of the client.
- Install pip by running the following command, if not installed:
- Obtain AWS IoT Core device data endpoint: AWS IoT Core device data endpoint is your AWS account’s region-specific AWS IoT Core endpoint that your devices connect to.
- Navigate to the AWS IoT Core console.
- On the left navigation menu, choose Settings.
- Under Device data endpoint, copy Endpoint.
On later sections of this blog, you’ll be asked to provide this endpoint as a parameter for the example scripts.
- Create an AWS IoT thing, obtain and place certificates:
- Create an AWS IoT thing and download device certificates.
- Follow instructions in Create AWS IoT resources page to use Amazon Root certificate authority (CA) signed client certificates.
- If you use AWS IoT Core with choosing your root or intermediate certificate authorities (CA), follow instructions in Create your own client certificates page.
- Example scripts on this blog post expect certificates in “certificates” folder by default, but you can also override this with
--certificates-path
parameter.- Rename the certificate files as following:
- Rename the certificate file as “client-cert.pem”
- Rename the downloaded root certificate file as “AmazonRootCA1.pem”
- Rename the downloaded private key as “private-key.pem”
- Create an AWS IoT thing and download device certificates.
Now, you’re ready to start experimenting with new features of MQTTv5 that AWS IoT Core now supports.
1. More transparent messaging with the Request/Response pattern
The Request/Response messaging pattern is a method to track responses to client requests in an asynchronous way. It’s a mechanism implemented in MQTTv5 to allow the publisher to specify a topic for the response to be sent for a particular message. Therefore, when the subscriber receives the request, it also receives the topic to send the response. It also supports the correlation data field that allows tracking of packets, e.g. request or device identification parameters.
For example, a smart home application with a connected door lock can benefit from the request/response pattern. Suppose a user is interacting with the door lock via a mobile app that sends MQTT messages to open/close the lock. Any messages exchanged between the app and the door lock must be acknowledged and be traceable whether the packets were delivered. Also, the door lock command needs to pass with the context, e.g. requester user identity.
To experiment with this feature, check the ./aws-iot-mqttv5-examples/01_request_response_example.py
script file in the git repository you downloaded on the Getting Started step. Run the following command by specifying your device data endpoint you obtained on the Getting Started step with --endpoint
parameter, replacing <AWS-IoT-Device-Data-Endpoint>
with, for example: abcd123456z-ats.iot.region.amazonaws.com
$ python3 01_request_response_example.py --endpoint <AWS-IoT-Device-Data-Endpoint>
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'TestThing01' properties=None
DEBUG:__main__:Received SUBACK: 4, "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"}'], ... (4 bytes)
DEBUG:root:Received a message on topic: 'home07/main_door/lock', payload: 'LOCK'
DEBUG:root:Main door LOCK request with parameters: 'b'{"user_profile_id": 4, "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"}''
DEBUG:__main__:Sending PUBLISH (d0, q0, r0, m3), 'b'home07/main_door/status'', properties=[CorrelationData : b'{"user_profile_id": 4, "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"}'], ... (25 bytes)
DEBUG:__main__:Received PUBLISH (d0, q0, r0, m0), 'home07/main_door/status', properties=[CorrelationData : b'{"user_profile_id": 4, "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"}'], ... (25 bytes)
DEBUG:root:Received a message on topic: 'home07/main_door/status', payload: 'USER_IS_NOT_AUTHENTICATED'
DEBUG:root:Main door status: 'USER_IS_NOT_AUTHENTICATED'' with parameters: 'b'{"user_profile_id": 4, "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"}''
Figure: Request/Response messaging pattern for door lock with mobile application
- The mobile app’s MQTT client subscribes to the response topic. Then, a lock request package is published to
home07/main_door/lock
topic with expected response topic ashome07/main_door/status
and a correlation data object contains the requesteruser_profile_id
andrequest_id
. - When the door lock receives the lock request on
home07/main_door/lock
, it processes the MQTT packet, including the response topic and correlation data. - The door lock makes the decision and responds by publishing to the topic with passing the correlation data.
- The subscriber function receives the response on
home07/main_door/status
, and logs that the decision with the correlation data. Further actions can be taken by the requester using theuser_profile_id
andrequest_id
.
2. More flexible device messaging with the user properties feature
The user properties feature allows connected devices or subscriber applications to pass custom information by appending custom key-value pairs to MQTT packets including publish and connect. The feature provides similar functionality with HTTP headers and can be used as long as a total of 8KB size is not exceeded in the header.
For example, you can use the user properties feature for a multi-vendor sensor deployment use-case. Assume a case with multiple sensors from different vendors deployed in an industrial or a smart home application. In these cases, the individual sensors could send their data using various encodings, which are specified in user properties. Depending on the user property value, subscribers of the messages can take specific actions to process them.
To experiment with this feature, check the ./aws-iot-mqttv5-examples/02_user_properties_example.py
script file in the git repository you downloaded on the Getting Started step. Run the following command by specifying your device data endpoint you obtained on the Getting Started step with --endpoint
parameter, replacing <AWS-IoT-Device-Data-Endpoint>
with, for example: abcd123456z-ats.iot.region.amazonaws.com
$ python3 02_user_properties_example.py --endpoint <AWS-IoT-Device-Data-Endpoint>
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'TestThing02' properties=NonerandX-rev8.2')]], ... (8 bytes)
DEBUG:__main__:Sending PUBLISH (d0, q0, r0, m4), 'b'sensors/gateway01/sensor03'', properties=None, ... (4 bytes)
DEBUG:__main__:Received PUBLISH (d0, q0, r0, m0), 'sensors/gateway01/sensor01', properties=[UserProperty : [('Content-Type', 'text/plain'), ('Hardware-Revision', 'brandX-rev1.17c')]], ... (4 bytes)
DEBUG:root:Received a message on topic: 'sensors/gateway01/sensor01'
DEBUG:root:Message has user properties: [('Content-Type', 'text/plain'), ('Hardware-Revision', 'brandX-rev1.17c')]
DEBUG:root:Received message with Content-Type: 'text/plain'
DEBUG:root:Plain text payload: '23.4'
DEBUG:__main__:Received PUBLISH (d0, q0, r0, m0), 'sensors/gateway01/sensor02', properties=[UserProperty : [('Content-Type', 'base64'), ('Hardware-Manufacturer', 'brandX-rev8.2')]], ... (8 bytes)
DEBUG:root:Received a message on topic: 'sensors/gateway01/sensor02'
DEBUG:root:Message has user properties: [('Content-Type', 'base64'), ('Hardware-Manufacturer', 'brandX-rev8.2')]
DEBUG:root:Received message with Content-Type: 'base64'
DEBUG:root:Raw payload: 'MjMuNw==', Decoded base64 payload: '23.7'
DEBUG:__main__:Received PUBLISH (d0, q0, r0, m0), 'sensors/gateway01/sensor03', properties=[], ... (4 bytes)
DEBUG:root:Received a message on topic: 'sensors/gateway01/sensor03'
DEBUG:root:No User Property specified, raw payload: '24.4'
This example script shows three sensors for different brands, publishing to their topics using different data encodings. The subscriber processes a raw sensor value and a base64 encoded sensor value by evaluating their Content-Type
user property values.
Processing MQTT packets with user properties on AWS IoT Core topic rules
AWS IoT Core’s topic rules feature allows configuring/setting up rules to forward and ingest MQTT messages from AWS IoT Core to various AWS services. You can define processing logic using AWS IoT rule SQL statements. This allows data transformation across multiple vendors to a standardized and vendor-agnostic form on the AWS IoT topic rule by implementing corresponding processing to each data schema, and forwarding it to any AWS service.
SELECT CASE get_user_property("Content-Type")
WHEN "base64" THEN decode(decode(encode(*, 'base64'), 'base64'), 'base64')
ELSE decode(encode(*, 'base64'), 'base64') END as sensor_value,
FROM sensors/#'
The AWS IoT Core topic rules feature provides the get_user_property()
function that allows accessing user property values of the MQTT packets in rule definitions. The rule SQL provided above applies base64 decoding operation if it’s base64-encoded. Check the Creating an AWS IoT Rule documentation page to create a topic rule. Also, check the documentation page for AWS IoT SQL Reference and Working with binary payloads.
3. More efficient use of device bandwidth with the topic aliases feature
Cellular IoT devices and sensors use mobile networks to communicate with their back-end services. These devices are mostly designed to operate on the lowest possible bandwidth because of their metered data services. Assuming cellular connected sensor devices are designed to operate on farmlands, they would be expected to operate with low data communication and long battery life. Also, larger data packets often lead to more power consumption. Considering these sensors publish only a few bytes of sensor values, long MQTT topics become an overhead for device messaging.
The topic aliases feature allows MQTT clients to assign numeric aliases to topics and then refer to the alias when publishing further messages. This allows reduction in the transmitted MQTT packet size by referencing the topic with a single number instead of the topic itself.
Example sensor value: 23.2
Example MQTT topic (83 bytes): sensors/field/field001/equipments/a804e598-ee90-4f89-9cde-458f8fe9b980/temperature
To experiment with this feature, check the ./aws-iot-mqttv5-examples/03_topic_alias_example.py
script file in the git repository you downloaded on the Getting Started step. Run the following command by specifying your device data endpoint you obtained on the Getting Started step with --endpoint
parameter, replacing <AWS-IoT-Device-Data-Endpoint>
with, for example: abcd123456z-ats.iot.region.amazonaws.com
$ python3 03_topic_alias_example.py --endpoint <AWS-IoT-Device-Data-Endpoint>
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'TestThing03' properties=None
DEBUG:__main__:Sending PUBLISH (d0, q0, r0, m1), 'b'sensors/field/field001/equipments/a804e598-ee90-4f89-9cde-458f8fe9b980/temperature'', properties=[TopicAlias : 1], ... (4 bytes)
DEBUG:__main__:Sending PUBLISH (d0, q0, r0, m2), 'b''', properties=[TopicAlias : 1], ... (4 bytes)
DEBUG:__main__:Sending PUBLISH (d0, q0, r0, m3), 'b''', properties=[TopicAlias : 1], ... (4 bytes)
The script publishes the first temperature value to a topic with setting the topic alias as “1”, which is valid until the end of the current connection. For the next publish operation, only the topic alias is referenced without specifying the actual topic. All messages will be received to the same topic on broker. Refer to AWS IoT Core message broker and protocol limits and quotas documentation for limits.
4. Better control of device behavior using message expiry, session expiry, and clean start features
MQTTv5 has a set of session and message expiration parameters to allow better control of device behavior. With the new session and message expiration parameters, the broker provides and mandates better session controls instead of depending on the client’s implementation.
- Session expiry feature allows you to define fixed intervals, after which the broker removes the session information for a particular client.
- Message expiry feature defines a set interval that the broker uses to store published messages for any matching subscribers that are not currently connected. The session expiry interval overrides the message expiry when used together. Also, the message expiry interval overrides any AWS IoT Core message retention intervals. Check AWS IoT Core message broker and protocol limits and quotas page for limits.
- A clean start is a flag that can be set in tandem with the session expiry interval. Setting this flag in the packet indicates the session should start without using an existing session.
A connected car is a good example for a device with irregular connectivity patterns and requires resilience when the connection is recovered. A connected car use-case with a mobile app to interact with the car’s systems such as the air conditioning and the door lock can showcase these features. It could be a case of using a remote command to unlock/lock the doors remotely for a delivery service or for a car sharing. These remote commands issued by the mobile app need to be processed within a specific time window. You can specify a message expiry interval that says if the car does not receive the command within a short interval, i.e. within 10 seconds of sending, then the message must expire. You can specify a second type of message for less time-critical remote commands, such as controlling the air conditioning systems. In that case, you can set the remote command for turning on the AC with 2 minutes of message expiry.
To experiment with this feature, we use one publisher script which will behave as the mobile app client that sends remote commands, and one subscriber script which will behave as the connected car client to perform actions. Check the ./aws-iot-mqttv5-examples/04_message_session_expiry_clean_start_publisher_example.py
and ./aws-iot-mqttv5-examples/04_message_session_expiry_clean_start_subscriber_example.py
scripts. During the experiment, we will run these two scripts in different intervals to demonstrate online and offline states of the connected car. Run the following commands by specifying your device data endpoint you obtained on the Getting Started step with --endpoint
parameter, replacing <AWS-IoT-Device-Data-Endpoint>
with, for example: abcd123456z-ats.iot.region.amazonaws.com
First, run the subscriber with 300 seconds of session expiry interval. This will create a session instance with the subscription in the AWS IoT Core MQTT broker and allow queuing messages for 300 seconds when the device goes offline.
$ python3 04_message_session_expiry_clean_start_subscriber_example.py --endpoint <AWS-IoT-Device-Data-Endpoint> --session-expiry-interval 300
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'TestThing04-Sub' properties=[SessionExpiryInterval : 300]
DEBUG:__main__:Received CONNACK (0, Success) properties=[SessionExpiryInterval : 0, ServerKeepAlive : 60, ReceiveMaximum : 100, TopicAliasMaximum : 8, MaximumQoS : 1, RetainAvailable : 1, MaximumPacketSize : 149504, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 0, SharedSubscriptionAvailable : 1]
DEBUG:__main__:Sending SUBSCRIBE (d0, m1) [(b'vehicle/#', {QoS=1, noLocal=False, retainAsPublished=False, retainHandling=0})]
DEBUG:__main__:Received SUBACK
Now, stop the client. When stopped, 300 seconds of the session expiry clock will start ticking. So, our connected car is now offline and it will be able to receive messages if it goes back online in 300 seconds, before the message expiry intervals have passed. Now run the publisher to publish two remote commands while the connected car is offline:
$ python3 04_message_session_expiry_clean_start_publisher_example.py --endpoint <AWS-IoT-Device-Data-Endpoint>
After seeing Received PUBACK
logs for two messages, run the subscriber script.
$ python3 04_message_session_expiry_clean_start_subscriber_example.py --endpoint <AWS-IoT-Device-Data-Endpoint> --session-expiry-interval 300
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'TestThing04-Sub' properties=[SessionExpiryInterval : 300]
DEBUG:__main__:Received CONNACK (1, Success) properties=[SessionExpiryInterval : 0, ServerKeepAlive : 60, ReceiveMaximum : 100, TopicAliasMaximum : 8, MaximumQoS : 1, RetainAvailable : 1, MaximumPacketSize : 149504, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 0, SharedSubscriptionAvailable : 1]
DEBUG:__main__:Sending SUBSCRIBE (d0, m1) [(b'vehicle/#', {QoS=1, noLocal=False, retainAsPublished=False, retainHandling=0})]
DEBUG:__main__:Received PUBLISH (d0, q1, r0, m1), 'vehicle/air_conditioner/set', properties=[MessageExpiryInterval : 116], ... (8 bytes)
DEBUG:root:Received a message on topic: 'vehicle/air_conditioner/set', payload: 'PRE_HEAT'
DEBUG:__main__:Sending PUBACK (Mid: 1)
DEBUG:__main__:Received PUBLISH (d0, q1, r0, m2), 'vehicle/driver_door/lock', properties=[MessageExpiryInterval : 6], ... (6 bytes)
DEBUG:root:Received a message on topic: 'vehicle/driver_door/lock', payload: 'UNLOCK'
DEBUG:__main__:Sending PUBACK (Mid: 2)
DEBUG:__main__:Received SUBACK
As seen on the script log outputs, two remote commands were published while the connected car was offline and were received when it went back online. Notice that the vehicle/driver_door/lock
message has 6 seconds remaining, and vehicle/air_conditioner/set
message has 116 seconds remaining. So, the connected car went back online within this period to receive both remote commands before expiration.
Now, stop both scripts and run the same publish and subscribe experiment again. For this case, wait 15 seconds after the publish before subscribing to the messages. You will notice that only the vehicle/air_conditioner/set
message is received as expected. While the connected car was offline, the vehicle/driver_door/lock
message expired.
As a last experiment on this feature set, run the subscriber with 10 seconds of session expiry. In that case, the connected car’s session in the AWS IoT Core MQTT broker will be removed along with queued messages. Even if the message expiry intervals allow queuing messages, they won’t be received by the connected car since the session is removed after 10 seconds.
Run the subscriber with 10 seconds of session expiry and stop it after seeing Received SUBACK
log. Then, run the publisher to send remote commands and wait 15 seconds. Then, run the subscriber again:
$ python3 04_message_session_expiry_clean_start_subscriber_example.py --endpoint <AWS-IoT-Device-Data-Endpoint> --session-expiry-interval 10
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'TestThing04-Sub' properties=[SessionExpiryInterval : 10]
DEBUG:__main__:Received CONNACK (0, Success) properties=[SessionExpiryInterval : 0, ServerKeepAlive : 60, ReceiveMaximum : 100, TopicAliasMaximum : 8, MaximumQoS : 1, RetainAvailable : 1, MaximumPacketSize : 149504, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 0, SharedSubscriptionAvailable : 1]
DEBUG:__main__:Sending SUBSCRIBE (d0, m1) [(b'vehicle/#', {QoS=1, noLocal=False, retainAsPublished=False, retainHandling=0})]
DEBUG:__main__:Received SUBACK
As seen on log outputs, nothing has been received by the connected car since the session is already removed.
5. Enhanced device connectivity flow using reason codes and server disconnect features
Reason codes allow a sender to determine the type of error (if any) in the transaction between the publisher and the subscriber. View the full list in OASIS specs for MQTT Version 5.0.
The server disconnect feature is a response from the server with the reason code as to why the connection was closed. This feature is helpful when analyzing the reason why the disconnect/reject happened, which you can use for various debugging purposes.
An example use case could be an edge sensor gateway that integrates with various services running in the cloud. When MQTT clients are disconnected, they are often configured to attempt reconnects automatically. With MQTTv3.1.1, misconfigurations between the gateway’s subscription topics and the IoT device policy were leading to connect/disconnect loops when the device attempted to perform unauthorized MQTT actions in the absence of the reason code for the disconnect. With MQTTv5, the device knows why it was disconnected, and it won’t try to subscribe to that topic when the reason from the server for disconnect is specified as authentication. The device can report the issue and try taking appropriate remedial actions using the reason code.
To experiment with this feature, check the ./aws-iot-mqttv5-examples/05_reason_codes_example.py
script file in the git repository you downloaded on the Getting Started step. Run the following command by specifying your device data endpoint you obtained on the Getting Started step with --endpoint
parameter, replacing <AWS-IoT-Device-Data-Endpoint>
with, for example: abcd123456z-ats.iot.region.amazonaws.com
$ python3 05_reason_codes_example.py --endpoint <AWS-IoT-Device-Data-Endpoint>
DEBUG:__main__:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'TestThing05' properties=None
DEBUG:__main__:Received CONNACK (0, Success) properties=[SessionExpiryInterval : 0, ServerKeepAlive : 60, ReceiveMaximum : 100, TopicAliasMaximum : 8, MaximumQoS : 1, RetainAvailable : 1, MaximumPacketSize : 149504, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 0, SharedSubscriptionAvailable : 1]
DEBUG:root:Connected {'session present': 0}
DEBUG:__main__:Sending PUBLISH (d0, q1, r0, m1), 'b'sensors/field/field001/equipments/9e6282ff-c8f0-49cd-b3a0-fa17ad6b84a7/temperature'', properties=None, ... (4 bytes)
DEBUG:__main__:Sending PUBLISH (d0, q1, r0, m2), 'b'sensors/field/field001/equipments/46be210d-8a83-4e92-a3fe-4f989704d21e/temperature'', properties=[TopicAlias : 14], ... (4 bytes)
DEBUG:__main__:Received DISCONNECT Topic alias invalid [ReasonString : DISCONNECT:Topic alias is out of range.:e3392cff-a031-4887-5b87-59eae249b6c4]
DEBUG:root:Received Disconnect with reason: Topic alias invalid
DEBUG:root:The disconnect is caused by the topic alias. Logging the issue for further analysis and exiting.
When the script starts, it first publishes a message without a topic alias successfully. Then, the script publishes the second message with a topic alias set to 14. Since the current limit of topic aliases is 8; the broker rejects the packet with the reason for the disconnect as well. The client receives the reason code 148 which is “Topic Alias invalid” for DISCONNECT
packet, as specified in OASIS specs for MQTT Version 5.0. After the reason code 148, the client stops gracefully.
Conclusion
AWS IoT Core provides a more comprehensive palette of IoT messaging features with newly announced MQTTv5 features. These features help customers to build adaptive IoT architectures and ensure a more bandwidth efficient, cost efficient, and reliable deployment. In this post, you’ve learned how those features work to solve business challenges on various IoT use-cases. To learn more and get started with AWS IoT Core MQTTv5 features, visit the documentation page. Also, visit repost.aws Internet of Things channel to discuss the new features with the AWS IoT community and share ideas.
About the authors
Emir Ayar is a Tech Lead Solutions Architect on the AWS Prototyping team. He specializes in helping customers build IoT, ML at the Edge, and Industry 4.0 solutions and implement architectural best practices. He lives in Luxembourg and enjoys playing synthesizers. |
Ashok Rao is an IoT Specialist Solutions Architect at AWS. He has a mix of expertise both in IoT hardware such as MCUs, Edge gateways as well as Cloud technologies. He has helped various customers design and deploy IoT solutions from concept to production across multiple domains. He is based in the UK and enjoys photography and tinkering with smart home projects. |