亚马逊AWS官方博客

集成 Amazon Timestream for InfluxDB 时序数据库在物联网和金融行业的应用方案

Amazon Timestream for InfluxDB 时序数据库在亚马逊云科技中国北京和宁夏区域以及海外区域都已经落地。时序数据库用于存储、处理和分析时间序列数据,主要应用于 IT 资产设备监控、物联网设备信息采集分析、金融交易行情、工业制造检测和生产过程控制等场景。

本文选择物联网和金融两个场景,结合实际演示,帮助大家理解 Amazon Timestream for InfluxDB 在其中的作用。

场景 1:物联网(IoT)

这里的物联网,广义上包括各种行业的设备连接网络,例如,光伏分布式电站、风电机组设备、工厂制造设备、智能穿戴设备。这些终端设备,以传感器收集设备上的各种信息,例如温度、湿度、压力、转速、功率、故障情况,通过 MQTT 等协议,或者通过边缘 IoT 网关,把实时数据发送到云上 IoT 服务。由于设备众多,通常物联网写入量巨大,按照时间序列写入,而数据写入到数据库之后很少修改。时序数据库按照时序存储的特性,加上顺序写入的高效,以及丰富的按照时间范围聚合查询,是此类场景的极佳选择。

此方案中,以 Amazon IoT Device Simulator 模拟物联网设备发送数据到 IoT core 服务,满足规则的数据,触发 Lambda 函数服务,写入 InfluxDB 时序数据库。Grafana 可以通过面板,设置 InfluxDB 数据源,并以不同维度查询时间窗口聚合数据,以直观图形化界面展示。

IoT Device Simulator 模拟器以 Cloudformation 形式一键部署,自动创建包括前端模拟器界面,以及后端数据流转所需的 Lambda、DynamoDB、Step Function 服务,最终数据写入到 IoT Core 服务。

Cloudformation 创建成功后,在 Output 中找到界面地址,类似如下:

ConsoleURL https://a1b2c3d4.cloudfront.net

添加设备类型 Add Device Type

Device type name: PressureDevice, Topic: pressure/data

其他属性参考下图:

继续添加模拟设备 Add Simulations:

Simulation name: pressuresensor

Device Types: PressureDevice

Amount: 20

创建模拟器之后,运行 Start,即可看到模拟设备数据发送到 IoT Topic: pressure/data。

此时数据库和 Lambda 函数尚未配置,接着创建 Timestream for InfluxDB 数据库。

完成之后控制台有类似数据库 URL:

https://xxxx.us-east-1.timestream-influxdb.amazonaws.com:8086

下载配置 influx 客户端,需要创建数据库时的用户名密码,生成 token 以访问数据库:

$ influx config list
Active	Name		URL										Org
*	ping-new	https://xxxx.us-east-1.timestream-influxdb.amazonaws.com:8086	aws
PowerShell

创建 influxdb bucket:

$ influx bucket create --org aws --name testdb
PowerShell

创建 Lambda 函数 lambda_function.py,此函数当数据发送到 IoT 服务对应的 Topic 时触发,写入 Influxdb。此外也可以进一步聚合转换,以 Step Function 编排批量写入到 S3 以供分析或者机器学习训练,满足异常检测等业务需求。此演示中只包含实时写入 Influxdb 部分。

Lambda 代码如下:

https://github.com/milan9527/influxdb/blob/main/lambda_function.py

Lambda 相关设置(注意,Lambda 需要设置与 Influxdb 同样的 VPC):

Runtime: Python 3.13, Same VPC as InfluxDB, Timeout: 1 minute
Trigger: AWS IoT, Iot Rule: SELECT pressure AS pressure, viscosity as viscosity, sensordatetime as sensordatetime, deviceid as deviceid, clientid as clientid FROM 'pressure/data'
Environment variables
INFLUXDB_BUCKET	testdb
INFLUXDB_ORG	aws
INFLUXDB_TOKEN: xxxx
INFLUXDB_URL: https://xxxx.us-east-1.timestream-influxdb.amazonaws.com:8086
PowerShell

打包 Lambda 代码和相关 Python 包 influxdb_client:

pip install --target ./package influxdb_client
cd package
zip -r ../my_deployment_package.zip .
cd ..
zip my_deployment_package.zip lambda_function.py
PowerShell

上传压缩包 my_deployment_package.zip 到 Lambda:

开始运行 IoT simulator 模拟器,发送设备数据:

在 Infludb 中可以看到有数据写入。这里按时间倒序查询最新 10 条数据:

$ influx v1 shell
> use "testdb"
> select * from "pressure" order by time desc limit 10
PowerShell

接着设置 Grafana 可视化展现。

创建 Amazon Grafana 服务,选择 SSO 认证方式(IAM Identity Center),创建用户并赋予 admin 权限。

Grafana 数据源 Data sources 选择 Amazon TimeStream,允许开放访问。

SSO 登录 Grafana

创建数据源 data source ,选择 Influxdb:

Language: InfluxQL
URL: https://ping-xfyuwukmm27bga.us-east-1.timestream-influxdb.amazonaws.com:8086
Auth with Credentials:
Header: Authorization, Value: Token xxxxx
Database: testdb
User: user
Password: password
PowerShell

创建 Grafana dashboard,选择以上创建的 InfluxQL 数据源:

SELECT last("viscosity"), last("pressureValue") FROM "pressure" WHERE $timeFilter GROUP BY time(10s) fill(0)
PowerShell

从图中可以看到,viscosity 和 pressureValue 指标在按照 10 秒分组展现出来。此外,也可以根据更多时间窗口或者聚合查询。业务场景中,可以把按时间段计算的数据,实时展示到大屏,或者对某些指标进行监控,如果异常,则触发警报。Influxdb 下游可以使用 kapacitor 进行告警或者 ETL 任务。

场景 2:金融行情数据

金融行业中,证券、期货、汇率等行情数据变化频率很高。高频交易场景下,需要更细的数据维度,TICK 数据记录了某个时间内所有变化,以实时成交价而变动。以此数据,可以实现量化交易中的回测等场景。

投资者需要了解实时的行情变化,通常以 K 线行情数据显示,在 TICK 数据以某个时间段取样,即可获取 K 线图。通常包含开盘价(Open),最高价(High),最低价(Low),收盘价(Close),并可以不同的时间维度展示,例如分钟、小时、日线、周线、月线 K 线。随着时间推移,以更新的 TICK 数据来计算 K 线价格,并以成交量等指标计算均线、KDJ、MACD 等投资指标。

此外,除了实时行情,还需要分析历史行情。数据结构和实时数据类似,也会以 K 线、均线等指标展现,只是时间维度不同,无需实时更新。交易所一般也会提供历史行情接口,以供券商等投资者使用。

无论那种行情数据,最重要的一点,是数据都带有时间,并且会随着时间而变化。数据按照时间顺序写入,不会更新之前的数据。读取数据时,会按照时间段聚合查询,并且经常会根据时间窗口而变化。例如,投资者除了查看实时行情,也会查看日 K 线,以及周 K 线 KDJ/MACD 指标,以作为交易的买卖点。

时序数据库正适合此场景。按照时间顺序海量写入交易数据的高性能,按照时间维度以及证券代码结构组合查询,以及某个时间段的聚合价格查询,正好完美契合行情数据的要求。

以下例子演示了股票行情数据在 Amazon Timestream for InfluxDB 的读写,并以 Grafana 展现 K 线图。

此演示中,模拟股票交易所交易数据,每 1ms 更新几个大公司股票的成交价和成交量,券商获取交易 TICK 数据,写入时序数据库 InfluxDB。投资者除了查看委托交易和逐笔行情,还需要 K 线图等投资指标。Grafana 本身提供了 K 线功能,可以直接从 Influxdb 获取行情数据并以 K 线展示,并自定义时间范围和间隔。

先创建 Amazon Timestream for InfluxDB,创建 bucket: stock_data

influx bucket create -org aws -name stock_data
PowerShell

在 EC2 运行 python 程序,每 1 毫秒写入行情数据。

查看 influxDB 数据,例如 AMZN 股票最近 10 条的行情数据:

influx v1 shell
> use "stock_data"
> select * from "stock_tick" where symbol = 'AMZN' order by time desc limit 10;
PowerShell

所有数据都以时间为序列,以股票代码 symbol 为 TAG 索引。

其他 Field 是行情相关指标,包括各个价格和成交量。新的数据开盘价(open)等于之前数据的收盘价(close)。

查询时,按照股票代码和时间范围即可查询出某个时间段内的所有行情数据,聚合之后即可得到业务所需结果。

在 Grafana 中创建 influxdb 数据源。创建 dashboard,选择 K 线 Candlestick。

以下查询从起始时间范围内,股票 FB 每个时间窗口内的行情数据,求平均值(mean)。实际操作中可以按照最高价(max)、最低价(min)、开盘价(first)、收盘价(last)聚合计算:

from(bucket: "stock_data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["symbol"] == "FB")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")
PowerShell

刷新查询,即可看到以下 K 线图,因为写入频繁,如时间范围过长,数据量过大,难以直接绘制图表,这里选择了相对较短的时间间隔。实际业务中,写入数据频率按业务需求调整,查询时可以选择更长的分钟、小时、日线、周线等时间范围指标。

修改查询条件,例如,查找 AMZN 股票某个时间范围内每 20ms 的行情。

from(bucket: "stock_data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["symbol"] == "AMZN")
  |> aggregateWindow(every: 20ms, fn: mean, createEmpty: false)
  |> yield(name: "mean")
PowerShell

Grafana 支持报警功能。例如,当 AMZN 股票收盘价大于 1800 时,发送 INFO 通知:

查看告警历史,发现有 info 信息:

实际业务中,会结合其他服务,例如 Kapacitor,把报警信息及时发送到 slack 等工具。

结论

Amazon Timestream for InfluxDB 存储按照时间序列的数据,提供极高性能的顺序写入和按照时间窗口的聚合查询能力,在物联网、风电、光伏以及金融行情等场景下,可以满足实时性和按照时间高效查询的要求;结合 Grafana 等可视化工具,能够为业务提供实时直观的输出。

参考链接

https://aws.amazon.com/blogs/iot/influxdb-and-grafana-with-aws-iot-to-visualize-time-series-data/

本篇作者

章平

亚马逊云科技数据库架构师。2014 年起就职于亚马逊云科技,先后加入技术支持和解决方案团队,致力于客户业务在云上高效落地。对于各类云计算产品和技术,特别是在数据库和大数据方面,拥有丰富的技术实践和行业解决方案经验。此前曾就职于 Sun,Oracle,Intel 等 IT 企业。