Amazon Timestream 是一种快速、可扩展的无服务器时间序列数据库服务,适用于物联网和运营应用程序,使用该服务每天可以轻松存储和分析数万亿个事件,速度提高了 1000 倍,而成本仅为关系数据库的十分之一。通过将近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层,Amazon Timestream 为客户节省了管理时间序列数据生命周期的时间和成本。Amazon Timestream 专门构建的查询引擎可用于访问和分析近期数据和历史数据,而无需在查询中显式指定数据是保存在内存中还是成本优化层中。Amazon Timestream 内置了时间序列分析函数,可以实现近乎实时地识别数据的趋势和模式。Amazon Timestream 是无服务器服务,可自动缩放以调整容量和性能,因此无需管理底层基础设施,可以专注于构建应用程序。
 
       本文介绍通过Timestream、Kinesis Stream托管服务和Grafana 和Flink Connector开源软件实现物联网(以PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,能从中获得启发,助力业务发展。
 
       架构
 
       Amazon Timestream 能够使用内置的分析函数(如平滑、近似和插值)快速分析物联网应用程序生成的时间序列数据。例如,智能家居设备制造商可以使用 Amazon Timestream 从设备传感器收集运动或温度数据,进行插值以识别没有运动的时间范围,并提醒消费者采取措施(例如减少热量)以节约能源。
 
       本文物联网(以PM 2.5场景为示例),实现PM2.5数据实时采集、时序数据存储和实时分析, 其中架构主要分成三大部分:
 
        
        - 实时时序数据采集:通过Python数据采集程序结合Kinesis Stream和Kinesis Data Analytics for Apache Flink connector 模拟实现从PM 2.5监控设备, 将数据实时采集数据到Timestream。
- 时序数据存储:通过Amazon Timestream时序数据库实现时序数据存储,设定内存和磁性存储(成本优化层)存储时长,可以实现近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层。
- 实时时序数据分析:通过Grafana (安装Timesteam For Grafana插件)实时访问Timestream数据,通过Grafana丰富的分析图表形式,结合Amazon Timestream 内置的时间序列分析函数,可以实现近乎实时地识别物联网数据的趋势和模式。
具体的架构图如下:
 
       
 
        
 
       部署环境
 
       1.1 创建Cloudformation
 
       请使用自己帐号 (region请选择 us-east-1)
 
       下载Github上Cloudformation Yaml文件:
 
       git clone https://github.com/bingbingliu18/Timestream-pm25
 
       Timestream-pm25目录中包含下面Cloudformation所用文件timestream-short-new.yaml
 
       
 
       
 
        其它都选择缺省, 点击Create Stack button.
其它都选择缺省, 点击Create Stack button.
 
       
 
       
 
       Cloud Formation 创建成功
 
       1.2 连接到新建的Ec2堡垒机:
 
       修改证书文件权限
 
       chmod 0600 [path to downloaded .pem file]
 
       ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]
 
       执行aws configure:
 
       aws configure
 
       default region name, 输入: “us-east-1”,其它选择缺省设置。
 
        
 
       1.3 连接到EC2堡垒机 安装相应软件
 
       设置时区
 
       TZ='Asia/Shanghai'; export TZ
 
       Install python3
 
       sudo yum install -y python3
 
       Install python3 pip
 
       sudo yum install -y python3-pip
 
       pip3 install boto3
 
       sudo pip3 install boto3
 
       pip3 install numpy
 
       sudo pip3 install numpy
 
       install git 
 
       sudo yum install -y git
 
       
 
       1.4 下载Github Timesteram Sample 程序库
 
       git clone https://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools
 
       1.5 安装Grafana Server
 
       连接到EC2堡垒机:
 
       sudo vi /etc/yum.repos.d/grafana.repo
 
       For OSS releases:(拷贝以下内容到grafana.repo)
 
        
        [grafana]
name=grafana
baseurl=https://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
 
         
       安装grafana server:
 
       sudo yum install -y grafana
 
       启动grafana server:
 
        
        sudo service grafana-server start
sudo service grafana-server status
 
         
       配置grafana server在操作系统启动时 自动启动:
 
       sudo /sbin/chkconfig --add grafana-server
 
        
 
       1.6 安装timestream Plugin
 
       sudo grafana-cli plugins install grafana-timestream-datasource
 
       重启grafana
 
       sudo service grafana-server restart
 
       1.7 配置Grafana 要访问Timesteam服务所用的IAM Role
 
       获取IAM Role Name
 
       
 
       选择IAM服务, 选择要修改的role, role name: 
 
       timestream-iot-grafanaEC2rolelabview-us-east-1
 
       修改role trust relationship:
 
       
 
       将Policy document 全部选中, 替换成以下内容:
 
        
        {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid":"",
      "Effect": "Allow",
      "Principal": {
        "AWS": "[请替换成CloudFormation output中的role arn]"
      },
      "Action": "sts:AssumeRole"
    } 
  ]
}
 
         
       修改后的trust relationship:
 
       
 
       1.8登录到Grafana server
 
       第一次登录到Grafana Server:
 
        
        - 打开浏览器 访问 http://[Grafana server public ip]:3000
- 缺省的Grafana Server 监听端口是: 3000 .
如何获取Ec2 Public IP地址, 如下图所示, 访问Cloudformation output:
 
       
 
        
 
       3. 在登陆界面, 输入 username: admin; password:admin.(输入用户名和密码都是admin)
 
       4. 点击Log In.登陆成功后, 会收到提示修改密码
 
       1.9 Grafana server中增加 Timestream 数据源
 
       增加 Timestream 数据源
 
       
 
       1.10 Grafana server中配置Timestream数据源
 
       拷贝配置所需要role ARN信息 (从cloudformation output tab)Default Region: us-east-1
 
       
 
       
 
       IoT数据存储
 
       2.1 创建 Timestream 数据库iot
 
       
 
       
 
       2.2 创建 Timestream 表 pm25
 
       
 
        
 
       IoT数据导入
 
       3.1安装Flink connector to Timestream
 
       安装java8
 
       sudo yum install -y java-1.8.0-openjdk*
 
       java -version
 
       安装debug info, otherwise jmap will throw exception
 
       sudo yum  --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo
 
       Install maven 
 
        
        sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo 
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo 
sudo yum install -y apache-maven 
mvn --version 
 
         
       change java version from 1.7 to 1.8 
 
       sudo update-alternatives --config java
 
       sudo update-alternatives --config javac
 
        安装Apache Flink
 
       最新的Apache Flink 版本支持Kinesis Data Analytics是1.8.2.
 
        
        - Create flink folder
cd
 
       mkdir flink
 
       cd flink
 
        
        - 下载Apache Flink version 1.8.2 源代码:
wget https://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz
 
        
        - 解压 Apache Flink 源代码:
tar -xvf flink-1.8.2-src.tgz
 
        
        - 进入到Apache Flink 源代码目录:
cd flink-1.8.2
 
        
        - Compile and install Apache Flink (这个编译时间比较长 需要大致20分钟):
mvn clean install -Pinclude-kinesis -DskipTests
 
       3.2 创建Kinesis Data Stream Timestreampm25Stream 
 
       aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1
 
       3.3 运行Flink Connector建立Kinesis 连接到Timestream:
 
        
        cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile
 
         
       数据采集过程中 请持续运行以下命令:
 
        
        mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName 
Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"
 
         
       3.4 准备PM2.5演示数据:
 
       连接到EC2堡垒机
 
        
        - 下载5演示数据生成程序:
cd
 
       mkdir pm25
 
       cd pm25
 
       下载Github上数据采集Python程序:
 
       git clone https://github.com/bingbingliu18/Timestream-pm25
 
       cd Timestream-pm25
 
        
        - 运行5演示数据生成程序 (python程序2个参数 –region default: us-east-1; –stream default: Timestreampm25Stream)
数据采集过程中 请持续运行以下命令:
 
       python3 pm25_new_kinisis_test.py
 
        
 
       IoT数据分析
 
       4.1 登陆到Grafana Server 创建仪表板和Panel
 
       
 
       创建Dashboard查询时 请设定时区为本地浏览器时区:
 
       
 
       创建新的Panel:
 
       
 
       选择要访问的数据源, 将要查询分析所执行的SQL语句粘贴到新的Panel中:
 
       
 
       4.2 创建时间数据分析仪表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)
 
       4.2.1 查询北京各个监控站点PM2.5 平均值
 
       New Panel
 
        
        SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,
CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,
CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,
CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,
CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,
CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,
CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,
CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,
CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,
CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,
CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,
CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,
CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,
CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,
CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,
CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,
CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,
CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,
CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,
CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyi
FROM 
(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)
 
         
        
 
       选择图形显示 select Gauge
 
       
 
       Save Panel as Beijing PM2.5 analysis
 
       Edit Panel Title:Beijing PM2.5 analysis
 
       
 
       Save Dashboard PM2.5 analysis 1:
 
       
 
       4.2.2 查询上海一天内各个监控站点PM2.5 平均值
 
       New Panel
 
        
        SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, 
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, 
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
 CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang, 
 CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo, 
 CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, 
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, 
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, 
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, 
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, 
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, 
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, 
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, 
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)
 
         
       
 
       Save Panel as Shanghai PM2.5 analysis
 
       Edit Panel Title:Shanghai PM2.5 analysis
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.3查询广州各个监控站点PM2.5 平均值
 
       New Panel
 
        
        SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, 
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, 
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, 
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, 
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, 
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, 
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, 
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, 
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, 
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, 
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, 
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, 
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(
    SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)
 
         
       
 
       Save Panel as Guangzhou PM2.5 analysis
 
       Edit Panel Title:Guangzhou PM2.5 analysis
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.4 查询深圳各个监控站点PM2.5 平均值
 
       New Panel
 
        
        SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
 CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)
 
         
       
 
       Save Panel as Shenzhen PM2.5 analysis
 
       Edit Panel Title:Shenzhen PM2.5 analysis
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.5 深圳华侨城时间序列分析(最近5分钟内PM2.5分析)
 
       New Panel
 
        
        select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5' 
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location
 
         
       选择图形显示 select Lines; Select Points:
 
       
 
       Save Panel as Shen Zhen Huaqiao City PM2.5 analysis
 
       Edit Panel Title: 深圳华侨城最近5分钟PM2.5分析
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.6找出过去2小时内深圳华侨城以30秒为间隔的平均PM2.5值 (使用线性插值填充缺失的值)
 
       New Panel
 
        
        WITH binned_timeseries AS (
    SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND location='huaqiao city'
        AND time > ago(2h)
    GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)
 
         
       选择图形显示 select Lines:
 
       
 
       Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1
 
       Edit Panel Title: 过去2小时深圳华侨城平均PM2.5值 (使用线性插值填充缺失值)
 
       Save Dashboard PM2.5 analysis 1
 
         
 
       4.2.7 过去5分钟内所有城市PM2.5平均值排名 (线性插值)
 
       New Panel
 
        
        SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), all_location_interpolated as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)
 
         
       选择Panel图形类型:
 
       
 
       
 
       Save Panel as all city analysis 1
 
       Edit Panel Title: 过去5分钟所有城市PM2.5平均值 
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.8 过去5分钟内 PM2.5最高的十个采集点(线性插值)
 
       New Panel
 
        
        WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10
 
         
       选择Table
 
       
 
       Save Panel as all city analysis 2
 
       Edit Panel Title:过去5分钟内 PM2.5最高的十个采集点(线性插值)
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       4.2.9 过去5分钟内 PM2.5最低的十个采集点(线性插值)
 
       New Panel
 
        
        WITH binned_timeseries AS (
    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10
 
         
       选择Table
 
       
 
       Save Panel as all city analysis 3
 
       Edit Panel Title:过去5分钟内 PM2.5最低的十个采集点(线性插值)
 
       Save Dashboard PM2.5 analysis 1
 
        
 
       设置仪表板 每5秒钟刷新一次:
 
       
 
       
 
       本blog着重介绍通过Timestream、Kinesis Stream托管服务和Grafana实现物联网(以PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,有所启发,实现海量物联网时序数据高效管理、挖掘物联网数据中蕴含的规律、模式和价值,助力业务发展。
 
       附录
 
       《Amazon Timestream开发人员指南》
 
       《AWS Timestream开发程序示例》
 
       《AWS Timestream与Grafana集成示例》
 
        
 
       本篇作者