亚马逊AWS官方博客

云边一体的 EMQ 物联网解决方案

“物联网”(Internet of Things,缩写 IoT)一词诞生于 15 年前。近几年,在 5G、窄带物联网、云计算、大数据、人工智能、区块链和边缘计算等新一代信息技术加持下,物联网获得高速发展。目前,物联网的实际应用,已在制造业、农业、家居、交通和车联网、医疗健康等多个领域取得显著成果。

在物联网多年高速发展中,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)成为使用最广泛的传输协议。

基于 MQTT,如何简单高效的实现端到端、一站式解决“万物互联”的方案,也备受关注。

我们观察到,在工业物联网和车联网行业中,有诸多采用 EMQ 来实现端到端、一站式解决“万物互联”的方案,本文将为大家演示在亚马逊云平台上如何利用 EMQ 实现数据接入和展现。

云边一体的 IoT 接入解决方案

EMQ 是一个云原生 MQTT 消息服务器和流处理数据库,为企业云边端的海量物联网数据提供高可靠、高性能的实时连接、移动、处理与集成,助力构建「面向未来」的物联网平台与应用。

一、方案场景

我们就以工业物联网解决方案为例,展示整个方案的实施方式,和效果展现。

完整的物联网端到端数据解决方案

为了更便于阅读和理解,本文通过四部分,为大家讲解。

  • 组件介绍
  • 组件部署
  • 物联网场景端到端数据演示
  • 车联网场景端到端数据演示

系统架构及组件介绍

组件信息列表

组件名称 版本 功能介绍 部署方式
EMQX Enterprise 4.4.19 分布式物联网接入平台 Amazon Linux 2
Neuron 2.5.1 开源的、轻量级工业协议网关软件,支持数十种工业协议的一站式设备连接、数据接入、MQTT 协议转换 Docker
HstreamDB Cloud N/A 实时数据流的接入、存储、处理、分发等环节进行全生命周期管理的流数据库 Cloud platform
HstreamDB v0.15.0 Docker
MQTTX 1.9.3 MQTTX 跨平台 MQTT 桌面和 CLI 客户端,用于学习和验证 EMQX 功能,发布订阅消息等。 Windows Server
PeakHMI Slave Simulators 3.0.0 模拟器,用于模拟 Modbus 设备数据 Windows Server

运行环境准备

资源类型 系统类型 运行组件
EC2 Amazon Linux 2 EMQX
Neuron
HstreamDB
EC2 Windows Server PeakHMI Slave Simulators
MQTTX

二、部署实施

1. EMQX 部署

EMQX 企业版是一款「随处运行,无限连接,任意集成」云原生分布式物联网接入平台。EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。

1.1 EMQX 可以通过以下几种方式部署使用:

  • EMQX Cloud
  • Docker
  • Amazon Elastic Kubernetes Service (Amazon EKS)
  • Amazon EC2

本文我们使用 EC2 的方式进行部署。

1.2 EMQX 企业版 for Amazon Linux 2 安装方式

版本选择链接,本文选择 4.4.19 版本

https://www.emqx.com/zh/downloads/enterprise

下载

wget https://www.emqx.com/zh/downloads/enterprise/v4.4.19/emqx-ee-4.4.19-otp24.3.4.2-1-amzn2-amd64.rpm

安装

yum install emqx-ee-4.4.19-otp24.3.4.2-1-amzn2-amd64.rpm -y

运行

sudo systemctl start emqx

1.3 访问 EMQX

在安装完成后,使用 IP+18083 端口,即可通过浏览器访问 EMQX 的控制台

http://<EC2 IP>:18083

1.3.1 初次登录用户名:admin,密码 public

1.3.2 初次登录后,需要修改密码

1.3.3 EMQX 控制台展示

2. MQTTX 部署

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。

MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

2.1 MQTTX 测试工具下载

https://mqttx.app/zh/downloads

创建 EMQX 连接,输入 EMQX IP,用户名密码。然后点击 connect 即可。

2.2 订阅 topic

2.3 验证 EMQX 服务状态

选择新建订阅,输入消息,并发送到主题 topic。在对话框内,可以立即接收到主题 topic 的信息。说明 EMQX 运行正常。

3. 流数据库 HStreamDB

HStreamDB 是一款专为流式数据设计的, 针对大规模实时数据流的接入、存储、处理、分发等环节进行全生命周期管理的流数据库。 它使用标准 SQL(及其流式拓展)作为主要接口语言,以实时性作为主要特征,旨在简化数据流的运维管理以及实时应用的开发。

HStreamDB 可以通过 Docker,Amazon EKS 部署,也可以使用 HStreamDB Cloud。

3.1 基于 Docker 的 HStreamDB 部署与使用

3.1.1 创建一个 quick-start.yaml

version: "3.5"

services:
  hserver:
    image: hstreamdb/hstream:latest
    depends_on:
      - zookeeper
      - hstore
    ports:
      - "127.0.0.1:6570:6570"
    expose:
      - 6570
    networks:
      - hstream-quickstart
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp:/tmp
      - data_store:/data/store
    command:
      - bash
      - "-c"
      - |
        set -e
        /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
        /usr/local/bin/hstream-server \
        --bind-address 0.0.0.0 --port 6570 \
        --internal-port 6571 \
        --server-id 100 \
        --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
        --advertised-address $$(hostname -I | awk '{print $$1}') \
        --metastore-uri zk://zookeeper:2181 \
        --store-config /data/store/logdevice.conf \
        --store-admin-host hstore --store-admin-port 6440 \
        --store-log-level warning \
        --io-tasks-path /tmp/io/tasks \
        --io-tasks-network hstream-quickstart

  hstore:
    image: hstreamdb/hstream:latest
    networks:
      - hstream-quickstart
    volumes:
      - data_store:/data/store
    command:
      - bash
      - "-c"
      - |
        set -ex
        # N.B. "enable-dscp-reflection=false" is required for linux kernel which
        # doesn't support dscp reflection, e.g. centos7.
        /usr/local/bin/ld-dev-cluster --root /data/store \
        --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
        --user-admin-port 6440 \
        --param enable-dscp-reflection=false \
        --no-interactive
  zookeeper:
    image: zookeeper
    expose:
      - 2181
    networks:
      - hstream-quickstart
    volumes:
      - data_zk_data:/data
      - data_zk_datalog:/datalog
networks:
  hstream-quickstart:
    name: hstream-quickstart

volumes:
  data_store:
    name: quickstart_data_store
  data_zk_data:
    name: quickstart_data_zk_data
  data_zk_datalog:
    name: quickstart_data_zk_datalog

3.1.2 在同一个文件夹中运行,即可后台启动 HstreamDB

docker-compose -f quick-start.yaml up -d

3.1.3 启动 HStreamDB 的 SQL 命令行界面

docker exec -it some-hstream-cli hstream --port 6570 sql

3.1.4 如果所有的步骤都正确运行,您将会进入到命令行界面,并且能看见一下帮助信息

3.2 测试验证

终端 1,创建一个 stream,然后执行一个持久查询

CREATE STREAM test;
SELECT * FROM test WHERE humidity > 70 EMIT CHANGES;

终端 2,插入数据

docker exec -it some-hstream-cli hstream --port 6570 sql

INSERT INTO test (temperature, humidity) VALUES (22, 80);
INSERT INTO test (temperature, humidity) VALUES (15, 20);
INSERT INTO test (temperature, humidity) VALUES (31, 76);

终端 1,输出结果如下:

> SELECT * FROM test WHERE humidity > 70 EMIT CHANGES;

{"humidity":{"$numberLong":"80"},"temperature":{"$numberLong":"22"}}
{"humidity":{"$numberLong":"76"},"temperature":{"$numberLong":"31"}}

更多操作命令,请参考:

https://docs.hstream.io/zh/platform/write-in-platform.html

3.3 基于 HStream Platform 流数据平台

HStream Platform 是以 SaaS 的方式在云上摄取、存储、处理和分发你的海量数据流。基于开源的 HStreamDB 构建。有更清晰易用的控制台访问和使用。并且当前处于免费使用阶段。

4. Neuron

Neuron 是一款开源的、轻量级工业协议网关软件,支持数十种工业协议的一站式设备连接、数据接入、MQTT 协议转换,为工业设备赋予工业 4.0 时代关键的物联网连接能力。

Neuron 提供多种安装方式,本实例采用容器化部署的方式,以便于最快开始体验 Neuron。

4.1 Neuron 部署

获取 Docker 镜像

docker pull emqx/neuron:latest  

启动 Docker 容器

docker run -d --name neuron -p 7000:7000 --privileged=true --restart=always emqx/neuron:latest

4.2 Neuron 访问

同步浏览器访问 Neuron http://neuron-ip:7000

初始用户名 admin,密码 0000

控制台页面

5. Modbus 模拟器

安装 PeakHMI Slave Simulators 软件,安装包可在 PeakHMI 官网(opens new window)中下载。

安装后,运行 Modbus TCP slave EX。设置模拟器点位数值及站点号,如下图所示:

提示:须保证 Neuron 与模拟器运行在同一局域网内。

Windows 中尽量关闭防火墙,否则可能会导致 Neuron 连接不上模拟器。

至此,我们已经完成 EMQ 相关组件的部署。

三、物联网场景端到端数据演示

本节演示工业物联网数据接入,以及数据的查询展示。

1. 传感器数据采集列表

物联网通常会采集很多信息,这里以一些常见指标来演示。会通过 Modbus 模拟器,以下面格式生成数据。

采集指标 指标缩略名
id ID
电量 Elec
转数 Speed
温度 Temp
经纬度 GPS
风速 Air-Wind
温度 Air-temp

2. Neuron 设置

2.1 添加设备

添加南向设备和采集指标

配置->南向设备->添加设备

选择 Modbus TCP

配置设备信息,填入 PeakHMI Slave Simulator IP

创建设备组

添加点位列表,即需要采集数据指标

南向设备信息

2.2 添加北向应用

配置->北向应用->添加应用

输入名称,插件选择 MQTT

应用配置

输入主题名称,EMQX 服务器地址,用户名及密码。

添加订阅

Neuron 配置完成。

Neuron 针对北向应用和南向设备,有大量的插件支持,只需点选,既可以完成适配。

2.3 北向应用的插件支持

2.4 南向应用的插件支持

3. HStream Platform 配置

在 HStream Plarform 获取 Service URL,下载 Certificates 备用

创建 stream

4. EMQX 配置

4.1 创建资源

资源类型:HStreamDB

HStreamDB 服务器:hstreams://us-east1.endpoint.hstream.io

开启SSL:true

HStreamDB 证书分别选择:root_ca.crt、client.key、client.crt

4.2 创建规则

输入规则的 SQL,为实验目的,这里选择全部数据。可根据实际业务场景,输入相应 SQL

响应动作->添加动作

响应动作,是根据规则的 SQL 查询结果,输出到消费的应用。这里选择 HStreamDB

注意:消息内容模版,针对 HStreamDB,需要输入“${payload}”

5. 演示结果查询

至此,已经完成全部配置。可以通过 EMQX,HStreamDB 进行数据流状态,及数据结果的查询。

5.1 EMQX 监控,可以全面的状态信息

5.2 也可以通过规则控制面板,查询详细的数据信息

5.3 HStream 查询

5.3.1 Streams 的详细信息

5.3.2 Hstream 查询数据记录

5.4 上面是通过控制台查询数据,我们也可以通过客户端访问 HStreams

具体连接方式,可以点击相应的编程语言类型获取。

5.5 MQTTX 客户端订阅消息

我们也可以通过 MQTTX 直接订阅 EMQX 的信息,也可以看到相应的内容。

四、车联网端到端数据传输场景演示

作为物联网的细分领域,车联网也是备受关注。本小节,会针对车联网场景,进行演示。

车联网数据传输通常使用的是 MQTT 协议。我们使用 MQTTX 来模拟车载数据的发布,进行测试。

1. 下载 Linux 版本的 MQTTX

https://mqttx.app/zh/downloads

命令行下载

curl -LO https://www.emqx.com/en/downloads/MQTTX/v1.9.3/mqttx-cli-linux-x64 
sudo install ./mqttx-cli-linux-x64 /usr/local/bin/mqttx

2. MQTTX CLI 内置了一些常用的场景,可以通过 –scenario 参数指定

MQTTX 内置测试场景列表:

场景名称 描述
tesla EMQ tesla module,用于模拟车辆数据
IEM 模拟工业能源监控数据
smart_home 模拟生成智能家居数据
weather 模拟生成天气站数据

3. 环境准备

  • HstreamDB 中创建流
  • EMQX 中创建 Topic
  • EMQX 中创建规则,并将数据输出到 HStreamDB

4. 模拟数据发送

在 Linux 终端执行以下命令,模拟 100 辆车的数据上报

mqttx simulate -sc tesla -c 100 -t 'Topic'

终端输出

[7/11/2023] [5:25:24 PM] ›  Start simulation publishing, scenario: tesla, connections: 100, req interval: 10ms, message interval: 1000ms  success   [100/100] - Connected 
[7/11/2023] [5:25:25 PM] › Created 100 connections in 1.075s 
[7/11/2023] [5:25:40 PM] ›  Published total: 1500, message rate: 100/s

5. 数据结果查询

HStreamDB 中,查询记录,可以看到模拟的车载数据会持续写入。

单条数据值内容

{
  "car_id": "Z5HCY53YSYLP71542",
  "display_name": "Celestine's Tesla",
  "model": "X",
  "trim_badging": "officia",
  "exterior_color": "orange",
  "wheel_type": "veritatis",
  "spoiler_type": "adipisci",
  "geofence": "Vaughntown",
  "state": "asleep",
  "since": "2023-07-11T11:53:13.912Z",
  "healthy": true,
  "version": "1.6.5",
  "update_available": true,
  "update_version": "9.1.4",
  "latitude": "4.7053",
  "longitude": "-146.0241",
  "shift_state": "D",
  "power": 5124,
  "speed": 69,
  "heading": 11,
  "elevation": 3710,
  "locked": false,
  "sentry_mode": true,
  "windows_open": true,
  "doors_open": true,
  "trunk_open": true,
  "frunk_open": true,
  "is_user_present": false,
  "is_climate_on": true,
  "inside_temp": -1.8,
  "outside_temp": 16,
  "is_preconditioning": true,
  "odometer": 453885,
  "est_battery_range_km": 433.2,
  "rated_battery_range_km": 979.1,
  "ideal_battery_range_km": 904.4,
  "battery_level": 52,
  "usable_battery_level": 34,
  "plugged_in": true,
  "charge_energy_added": 27.41,
  "charge_limit_soc": 24,
  "charge_port_door_open": true,
  "charger_actual_current": 71.05,
  "charger_power": 17,
  "charger_voltage": 223,
  "charge_current_request": 24,
  "charge_current_request_max": 34,
  "scheduled_charging_start_time": "2026-01-09T05:51:00.807Z",
  "time_to_full_charge": 2.25,
  "tpms_pressure_fl": 3,
  "tpms_pressure_fr": 2.2,
  "tpms_pressure_rl": 3.3,
  "tpms_pressure_rr": 2.9,
  "timestamp": 1689095696286
}

至此,我们已经完整的展示数据是如何从终端传感器,上传到云端。通过 Neuron,EMQX,HStream 我们可以快捷的实现物联网的接入需求。

在数据接入之后,我们可以进一步把数据存放到 S3,依托亚马逊云科技的智能湖仓架构,通过 Athena,Redshift,Quicksight 等组件,挖掘更多的数据价值。

参考文档

https://neugates.io/docs/zh/latest/

https://docs.hstream.io/zh/

https://docs.emqx.com/zh/enterprise/v5.1/

https://aws.amazon.com/cn/blogs/china/simplify-the-data-collection-pipeline-with-kafka-connect/

本篇作者

许晓亮

亚马逊云解决方案架构师

王宝龙

EMQ 客户成功经理

陆云飞

亚马逊云解决方案架构师