亚马逊AWS官方博客

利用 Amazon IoT 生命周期事件和 LWT 管理设备状态

随着物联网和云计算技术的发展,越来越多的设备接入到物联网云平台中。Amazon IoT Core云平台为客户提供数十亿个IoT设备的安全连接和管理,也为客户提供数万亿条消息路由到Amazon云服务中。对于IoT设备的管理,设备状态的管理是重要的一项,一般来说我们可以根据Amazon IoT Core提供的设备影子来实时查看设备状态信息。但对于IoT设备的异常断开连接,我们需要一些额外的机制来获知设备状态信息,另外,如果有些客户因为某些原因并没有使用设备影子,那如何来查看设备状态并进行管理呢?

在这篇博文中,我们将向您详细介绍如何帮助客户解决上面这些挑战,介绍在Amazong IoT如何利用生命周期事件和Last Will and Testament (LWT)遗嘱消息来查看和管理设备状态。本博文的内容主要包括下面三个部分:

  • Amazon IoT 生命周期事件来查看正常的设备连接和断开连接信息
  • 通过LWT消息设置来看看异常设备断开信息并进行处理
  • IoT 规则操作,以及操作实现和示例展示

Amazon IoT 生命周期事件和LWT消息

Amazion IoT Device Shadow 服务在 AWS IoT 事物对象中添加设备影子,无论设备是否连接到 AWS IoT,设备影子都可以向应用程序和其他服务提供设备的状态。但有些客户由于某些原因并没用使用设备影子服务,下面将主要讲述如何利用Amazon IoT 生命周期事件来进行设备状态的管理,同时引入LWT来实现对设备异常断开的状态管理。另外我们也可以通过查看LWT消息来重新发布设备影子更新消息来更新设备影子的设备状态,利用LWT消息也能更好的解决设备影子在设备异常断开不能及时更新状态到设备影子的情况。

Amazon IoT 生命周期事件

Amazon IoT 可以在 MQTT 主题下发布生命周期事件,如连接/断开连接事件,订阅/取消订阅事件等。在设备正常连接或断开时,Amazon IoT 将消息发布到以下 MQTT 主题:

  • $aws/events/presence/connected/<clientId> — 连接到消息服务端的客户端。
  • $aws/events/presence/disconnected/<clientId> — 与消息服务端断开连接的客户端。

发布的消息内容包括了一系列 JSON 元素,比如clientId,clientInitiatedDisconnect,disconnectReason,eventType,ipAddress,principalIdentifier,sessionIdentifier,timestamp,versionNumber。在这篇博文中,我们需要介绍如何利用这些连接/断开连接事件信息,具体定义请详细参照:https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/life-cycle-events.html

通过在Amazon IoT 规则引擎中创建IoT 规则操作,对于“$aws/events/presence/+/+”的发布主题进行规则操作,比如用Lambda进行处理,能及时获知客户设备端正常连接或者断开连接的状态信息。

LWT信息配置

由于 IoT设备经常用于包含不可靠网络的场景,在某些场景的某些 MQTT 客户端偶尔会不正常地断开连接。比如连接丢失、电池耗尽或许多其他原因,都可能会发生不正常的断开连接。上面讲到客户设备端正常断开连接,可以通过Amazon IoT平台本身发送的MQTT主题消息来管理设备状态。对于这些客户设备端异常断开连接的情况,我们可以通过Last Will and Testament (LWT)功能来响应异常断开连接的方法。

当 MQTT服务端检测到客户端非正常地断开连接的时候,就会向LWT消息主题里面发布一条消息。LWT消息相关的设置是在建立连接的时候,在 MQTT连接数据包里面的可变头与有效载荷中指定的。在Amazon IoT Device SDK中,LWT消息需要在客户设备端配置MQTT连接时进行相关设置。在这里,我们使用AWS IoT Device SDK v2 for Python示例代码https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py,对于LWT消息的配置需要添加下面代码(前五行以及倒数第二行),在异常断开连接时Amazon IoT平台将发送如“lwt/iotdemo/<clientId>”主题的JSON格式消息” {“state”:{“reported”:{“connected”:”false”}}}”,请注意在代码中mqtt.Will()接口中的lwt_message需要是byte类型。

lwt_message = {"state":{"reported":{"connected":"false"}}}
lwt_message_json = json.dumps(lwt_message)
lwt_message_byte = lwt_message_json.encode('utf-8')
lwt_topic = "lwt/iotdemo/" + args.client_id
lwt = mqtt.Will(topic=lwt_topic, qos=1, payload=lwt_message_byte, retain=False)

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint=args.endpoint,
    port=args.port,
    cert_filepath=args.cert,
    pri_key_filepath=args.key,
    client_bootstrap=client_bootstrap,
    ca_filepath=args.root_ca,
    on_connection_interrupted=on_connection_interrupted,
    on_connection_resumed=on_connection_resumed,
    client_id=args.client_id,
    clean_session=False,
    keep_alive_secs=30,
    will=lwt,
    http_proxy_options=proxy_options)

IoT 规则操作和设备端到Amazon IoT云端的总体展示

Amazon IoT 提供规则引擎用于数据处理和路由,通过定义规则操作可以使设备能够与 Amazon云服务进行交互。这些规则是基于 MQTT 主题流来进行规则的分析和执行,在本博文中使用下面规则来完成对设备数据的保存,对设备连接/断开连接的处理,以及在异常断开连接情况下对设备影子的即时状态更新。

  • 将设备数据保存到 Amazon DynamoDB中
  • 调用 Lambda 函数来提取对设备连接或者断开连接(包括正常断开和异常断开)信息的处理
  • 对于设备异常断开连接情况下,重新发布主题消息来更新设备影子的状态

数据路由至DynamoDB

Amazon IoT规则操作中的DynamoDB (dynamoDB) 操作可将所有或部分 MQTT 消息写入 Amazon DynamoDB 表。请注意此规则将非 JSON 数据作为二进制数据写入到 DynamoDB 中,DynamoDB 控制台以 Base64 编码文本格式显示数据。

  • 创建一个DynamoDB表供设备数据存储之用
  • 创建一个规则操作将收取到的相关MQTT主题的数据消息保存至DynamoDB中

下面是AWS CLI的命令来执行上面两个操作:

  • 创建DynamoDB表iotdemoDataTable,索引为timestamp和deviceId:
aws dynamodb create-table --table-name iotdemoDataTable --attribute-definitions AttributeName=timestamp,AttributeType=S AttributeName=deviceId,AttributeType=S --key-schema AttributeName=timestamp,KeyType=HASH AttributeName=deviceId,KeyType=RANGE --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
  • 创建IoT规则操作,将数据路由至DynamoDB中,将保存timestamp时间戳和主题里面的第三项deviceId到DynamoDB表中,另外iotdemoRoleDDB需要有dynamodb:PutItem权限。

aws iot create-topic-rule --rule-name iotdemoDataToDDB --topic-rule-payload file://iotdemoDataToDDB.json

其中iotdemoDataToDDB.json内容如下:

{
  "sql": "SELECT * FROM 'dt/iotdemo/+'",
  "ruleDisabled": false,
  "awsIotSqlVersion": "2016-03-23",
  "actions": [{
      "dynamoDB": {
          "tableName": "iotdemoDataTable",
          "roleArn": "arn:aws-cn:iam::000123456789:role/service-role/iotdemoRoleDDB",
          "hashKeyField": "timestamp",
          "hashKeyValue": "${timestamp()}",
          "rangeKeyField": "deviceId",
          "rangeKeyValue": "${topic(3)}",
          "payloadField": "payload"
      }
  }]
}

数据路由至Lambda处理

Amazon IoT规则操作中的Lambda (lambda) 操作调用 AWS Lambda 函数,传入 MQTT 消息。 AWS IoT 异步调用 Lambda 函数,Lambda函数将处理从IoT Core中路由过来的数据然后对数据进行处理,如果客户另外有一套设备状态管理系统,可以将这些设备连接或断开连接信息通过接口调用发送到该第三方系统中做进一步的处理。

  • 创建一个Lambda函数,用于对IoT Core路由过来的设备连接或断开连接信息进行处理
  • 创建一个规则操作将收取到的相关MQTT主题的数据消息路由至Lambda函数进行操作处理

下面是AWS CLI的命令来执行上面两个操作:

  • 创建Lambda函数,并配置IoT平台操作lambda:InvokeFunction权限:
aws lambda create-function --function-name iotdemoProcessState --runtime python3.7 --zip-file fileb://iotdemoProcessState.zip --handler lambda_function.lambda_handler --role arn:aws-cn:iam::000123456789:role/iotdemoLambda
aws lambda add-permission --function-name iotdemoProcessState --action lambda:InvokeFunction --statement-id iotdemo --principal iot.amazonaws.com
  • 创建IoT规则操作,将数据路由至Lambda中供Lambda来处理

aws iot create-topic-rule --rule-name iotdemoProcessEvents --topic-rule-payload file://iotdemoProcessEvents.json

其中iotdemoProcessEvents.json内容如下:

{
  "sql": "SELECT * FROM '$aws/events/presence/+/+'",
  "ruleDisabled": false,
  "awsIotSqlVersion": "2016-03-23",
  "actions": [{
      "lambda": {
          "functionArn": "arn:aws-cn:lambda:cn-north-1:000123456789:function:iotdemoProcessState"
      }
  }]
}

设备异常断开连接信息重新主题发布和设备影子状态更新

Amazon IoT规则操作中的重新发布 (republish) 操作可将 MQTT 消息重新发布到其它 MQTT 主题。

下面是AWS CLI的命令来创建IoT规则操作:

  • 创建IoT规则操作,将数据路由至Republish中进行新消息主题的重新发布,比如从原先LWT消息主题“’lwt/iotdemo/<clientId>”发布到设备影子更新主题“$aws/things/<thingName>/shadow/update”,这里clientId和thingName将在设备端设置为相同。另外请注意在iotdemoRoleRepublish中需要配置有”iot:Publish”权限。
  • 另外,这里为了MQTT主题路由定义的一致性,也添加了一个Lambda操作,用于对设备异常断开消息的Lambda处理,功能与上面章节的Lambda规则操作相同。

aws iot create-topic-rule --rule-name iotdemoProcessLWT --topic-rule-payload file://iotdemoProcessLWT.json

其中iotdemoProcessLWT.json内容如下:

{
  "sql": "SELECT * FROM 'lwt/iotdemo/+'",
  "ruleDisabled": false,
  "awsIotSqlVersion": "2016-03-23",
  "actions": [{
      "lambda": {
          "functionArn": "arn:aws-cn:lambda:cn-north-1:000123456789:function:iotdemoProcessState"
      }
  },
  {
    "republish": {
          "topic": "$$aws/things/${topic(3)}/shadow/update",
          "roleArn": "arn:aws-cn:iam::000123456789:role/service-role/iotdemoRoleRepublish"
      }
  }]
}

设备端到Amazon IoT云端的总体展示

在这里,我们使用AWS IoT Device SDK v2 for Python示例代码https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py,LWT消息的配置上面已经讲到。下面另外再修改两处如下:

  • clientId 和topic的重新定义,主要是将clientId与thingName物品名字对应起来,便于对设备影子的状态更新操作。
parser.add_argument('--client-id', default="device001", help="Client ID for MQTT connection.")
parser.add_argument('--topic', default="dt/iotdemo/device001", help="Topic to subscribe to, and publish messages to.")
  • message 的重新定义,主要是将原先非JSON格式的message改为JSON格式,便于在DynamoDB表中查看。
message_iotdemo = {"message": ""}
message_iotdemo["message"] = message
message_json = json.dumps(message_iotdemo)

从总体流程来讲,首先在Amazon IoT Core中创建一个thing物品,创建设备证书和私钥并下载保存,关联IoT策略,查看得到IoT Endpoint URL信息。然后执行下面命令即可发送设备数据,设备正常连接和断开连接会在程序执行时正常发送,LWT 的消息发送可以通过杀掉改进程的方法来模拟实现。

python3 pubsub.py --endpoint abcd123efg456.ats.iot.cn-north-1.amazonaws.com.cn --cert ./certs/device001-certificate.pem.crt --key ./certs/device001-private.pem.key --root-ca ./certs/AmazonRootCA1.pem

最后在DynamoDB中可以查看到相关设备数据的保存,在CloudWatch的Lambda的log中可以看到相关连接/断开连接/LWT的消息记录,在设备影子的JSON文件中可以看到”connected”在设备非正常断开时变为了“false”,也就是说通过LWT消息以及MQTT主题的重新发布更新了设备影子的状态信息。

总结

通过本博文,成功的解决了设备状态管理的两个问题。第一,如果有些客户因为某些原因并没有使用设备影子,我们可以根据Amazon IoT 的生命周期事件信息来对查看设备状态并对之进行管理。第二,对于IoT设备的异常断开连接,我们利用了LWT遗嘱消息来获知设备异常断开的状态信息,同时也可以通过重新发布设备影子更新的消息主题来更新设备影子的状态信息。另外在本博文中,通过实际的IoT规则操作定义和Amazon IoT Device SDK的示例代码的修改,实现了从设备端到云端的总体操作展示,便于大家更好的来了解如何使用Amazon IoT Device SDK和Amazon IoT云平台。

本篇作者

张守武

AWS专业服务团队物联网顾问。负责基于AWS IoT的解决方案咨询和项目交付,同时负责物联网行业解决方案的开发和推广,在物联网、网络、视频等领域有着广泛的设计和实践经验。