Amazon Web Services ブログ

Amazon Timestream のよくあるクエリパターンとその効率的な SQL 記述方法

Amazon Timestream は時系列データを取り扱う為のサーバーレスの目的別データベースサービスであり、その使用量に応じて課金が行われます。時系列データに対して、SQL を利用してクエリを実行しますが、同じ結果を得る為に複数の SQL 記述方法があり、そのパフォーマンスやコスト、複雑さはそれぞれ異なる為、正しいクエリを特定する事はとても重要です。また、組み込みの時系列関数を利用する事でクエリの複雑さを軽減する事ができます。

この投稿では、時系列のワークロードで一般的なクエリパターンと、それに対する Timestream の SQL クエリの効果的な記述方法を取り上げます。初めに本投稿で利用する IoT のデータセットを確認し、次に、Timestream でのフラット/時系列データモデルについて確認します。フラット/時系列データモデルの差を理解する事で、この投稿で利用されるクエリの複雑さを軽減する Timestream の組み込みの時系列関数をより深く理解する事が出来ます。最後に、7 つの一般的なアクセスパターンについて取り上げます。尚、それぞれのクエリパターンは独立している為、興味のあるところだけ確認頂ければ十分です。

サンプルデータ

この投稿で紹介される例では、各クエリがどのように実行されるか理解頂けるよう、全て少ないデータ量で実行されていますが、実際にはその数千倍の規模のデータでもテストをしており、実用に耐えうるクエリであるとお考え下さい。また、データセットは IoT ワークロードに関連するものであり、実運用で見られる様々なシナリオ (メジャーの欠落や無効なもの) が含まれています。

次のセクションのスクリプトを実行すると、データセットを生成して、Timestream にデータをロードする事ができます。データセットは <current_year>-01-01 09:00:00 で始まり、14 個のデバイスからのデータを含みます。各デバイスには複数のセンサーがあり、午前 10 時まで約 10 分間隔でデータを測定します。各測定値は、SensorReadingsDeviceMetadata という 2 つのテーブルに分割された 複数のメジャーで構成されています。SensorReadings テーブルは temperaturehumidityair_quality_no2 というメジャーがあり、DeviceMetadata テーブルは battery_levelstateuc_temperature というメジャーがあります。また、それぞれのメジャーは各テーブル内の 1 つの行として格納されています。

本投稿では、全てのクエリで WHERE 句で時刻でフィルターをかけています。また、その他のメジャー名やディメンジョン名を条件に加える事で、データを絞り込んでスキャンするデータ量を削減し、より良いパフォーマンスを得たり、コスト削減につなげることができます。クエリのベストプラクティスの詳細についてはこちらを確認して下さい。

前提

操作を始める前に、Amazon Timestream への読み書きが可能で、AWS CloudShell にアクセス出来る AWS Account を用意しましょう。

サンプルデータの Timestream へのデータ投入

本投稿で利用するサンプルデータを生成する為に、次のステップを実施して下さい。

  1. Blog という名前の Timestream のデータベースを作成し、SensorReadingsDeviceMetadata テーブルを作成しましょう。メモリでの保持期間は 1 年で設定します。
  2. CloudShell を使って、python スクリプトを実行しましょう (python3 script_name.py で実行可能です)

フラットモデルと時系列モデル

Timestream ではフラットモデルと時系列モデルの 2 つのデータモデルをサポートしています。フラットモデルは表形式のフォーマットとなっており、それぞれの測定値は1つの独立した行として表され、時刻、メジャー名、メジャー値、及びディメンジョン値がカラムとして設定されます。フラットモデルでは標準SQL (標準 SQL 関数、演算子、集約、フィルタリング等) を利用し、組み込みの時系列関数は使用しません。SensorReadings テーブルのサンプルデータは、temperaturehumidityair_quality_no2、そして request_number をマルチメジャーとして Timestream に格納しており (ディメンジョンは device_idcity) 、これらのフィールドには全て SQL 構文で1つの行としてアクセス出来ます。例えば、次の SELECT 文を使う事で、気温と湿度の測定値のリストと対応するタイムスタンプ、リクエストナンバーを取得する事が出来ます。

SELECT time, device_id, city, temperature, humidity, request_number
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND city = 'Zion'
	AND device_id IN ('real1', 'real2')
	AND measure_name = 'multi'
	AND temperature < 999
ORDER BY device_id
	,time ASC

この SQL を実行すると、以下のフラットモデルで表されるデータが得られます。

time device_id city temperature humidity request_number
2021-01-01 09:01:00.000000000 real1 Zion 102 13.2 30493
2021-01-01 09:13:00.000000000 real1 Zion 103 13.1 30494
2021-01-01 09:31:00.000000000 real1 Zion 107 13.1 30496
2021-01-01 09:44:00.000000000 real1 Zion 106 13.0 30497
2021-01-01 09:58:00.000000000 real1 Zion 105 12.9 30499
2021-01-01 09:00:00.000000000 real2 Zion 104 14.2 43231
2021-01-01 09:12:00.000000000 real2 Zion 105 14.1 43232
2021-01-01 09:22:00.000000000 real2 Zion 109 14.1 43233
2021-01-01 09:33:00.000000000 real2 Zion 109 14.1 43234

一方、時系列モデルでは、一連のディメンジョンに対する時刻とメジャー値のペアを順序付けたシーケンスとして表されます。このデータ形式は Timestream の組み込みの時系列関数を利用する場合に必要であり、欠損値がある場合のギャップを埋める補間、変化率を見つける導関数、また、2 つの時系列の相関等を導出するケース等で利用されます。

フラットモデルから時系列モデルに変換する際には、組み込みの CREATE_TIME_SERIES 関数を利用します。この関数は、timevalue の 2 つの引数を持ち、タイムスタンプと対応する値のシーケンスを時系列データとして返します。このデータタイプは組み込みの時系列関数の入力として利用されます。以下、CREATE_TIME_SERIES 関数の利用例となります。

SELECT device_id
	,CREATE_TIME_SERIES(time, temperature) AS temp_series
	,CREATE_TIME_SERIES(time, humidity) AS humidity_series
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now() 
	AND city = 'Zion'
	AND device_id IN ('real1', 'real2')
	AND measure_name = 'multi'
	AND temperature < 999
GROUP BY device_id
ORDER BY device_id

このクエリを実行すると、以下の結果が得られます。

device_id temp_series humidity_series
real1 [
{ time: 2021-01-01 09:01:00.000000000, value: 102 },
{ time: 2021-01-01 09:13:00.000000000, value: 103 },
{ time: 2021-01-01 09:31:00.000000000, value: 107 },
{ time: 2021-01-01 09:44:00.000000000, value: 106 },
{ time: 2021-01-01 09:58:00.000000000, value: 105 }
]
[
{ time: 2021-01-01 09:01:00.000000000, value: 13.2 },
{ time: 2021-01-01 09:13:00.000000000, value: 13.1 },
{ time: 2021-01-01 09:31:00.000000000, value: 13.1 },
{ time: 2021-01-01 09:44:00.000000000, value: 13.0 },
{ time: 2021-01-01 09:58:00.000000000, value: 12.9 }
]
real2 [
{ time: 2021-01-01 09:00:00.000000000, value: 104 },
{ time: 2021-01-01 09:12:00.000000000, value: 105 },
{ time: 2021-01-01 09:22:00.000000000, value: 109 },
{ time: 2021-01-01 09:33:00.000000000, value: 109 }
]
[
{ time: 2021-01-01 09:00:00.000000000, value: 14.2 },
{ time: 2021-01-01 09:12:00.000000000, value: 14.1 },
{ time: 2021-01-01 09:22:00.000000000, value: 14.1 },
{ time: 2021-01-01 09:33:00.000000000, value: 14.1 }
]

UNNEST

標準 SQL の集約や WINDOW 関数を利用して他のテーブルやデータセットと結合させたり、集約を行いたい場合には、データを時系列モデルからフラットモデルに戻す必要があります。その場合は、UNNEST 関数を用いると、CREATE_TIME_SERIES 関数と逆の変換が可能で、入力として時系列データを与えると、時刻とメジャー値の 2 つのカラムを持ったテーブルを返します。また、UNNEST はテーブル、時刻、メジャーのカラム名に対して UNNEST (timeseries) AS <alias_name> (time_alias, value_alias) のように、別名を設定する事が可能です。

WITH device_temp_series
AS (
	SELECT CREATE_TIME_SERIES(time, temperature) AS temp_series
		,device_id
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now() 
		AND city = 'Zion'
		AND device_id IN ('real1', 'real2')
		AND measure_name = 'multi'
		AND temperature < 999
	GROUP BY device_id
	ORDER BY device_id
	)
SELECT device_id
	,ts.time
	,ts.value
FROM device_temp_series
CROSS JOIN UNNEST(temp_series) AS ts
ORDER BY device_id
	,ts.time ASC

この SQL を実行すると、以下の結果が得られます。

device_id time value
real1 2021-01-01 09:01:00.000000000 102
real1 2021-01-01 09:13:00.000000000 103
real1 2021-01-01 09:31:00.000000000 107
real1 2021-01-01 09:44:00.000000000 106
real1 2021-01-01 09:58:00.000000000 105
real2 2021-01-01 09:00:00.000000000 104
real2 2021-01-01 09:12:00.000000000 105
real2 2021-01-01 09:22:00.000000000 109
real2 2021-01-01 09:33:00.000000000 109

CREATE_TIME_SERIES、及び、UNNEST 関数の実例については、以下の補間の説明を確認して下さい。

よくあるクエリーパターン

このセクションでは、以下の 7 つのアクセスパターンを取り上げ、その詳細を解説します。全てのクエリは独立している為、全てを見る必要は無く、皆さんの興味のあるパターンのみ確認すれば十分です。

1.最後の値
2.補間
3.複数のイベントの相互関係 (N-event correlation)
4.特定の期間毎に集計する (running aggregates)
5.導関数と変化率
6.高いカーディナリティの N 個の時系列の取得
7.ヒストグラムの作成

1. 最後の値

時系列のワークロードにおいて、最新のレコードやメジャー値を探す事をまずは考えてみましょう。例えば、センサーの測定値の最新の値、状態、またはアセットの位置等を確認する場合が考えられます。

この例では、現時点で動作していないデバイスを見つけようと思います。利用するデータセットには、それぞれのデバイスの状態が state というメジャー値に格納されており (メジャー名は multi) 、クエリでこのカラムを参照しますが、2つの記述方法を比較してみたいと思います。1 つは共通テーブル式 (CTE) を利用する方法で、もう 1 つは max_by() 関数を利用する方法です。まず、CTE を使って最新の値を取る事を考えてみます。

WITH latest_ts
AS (
	SELECT device_id
		,max(time) AS latest_timestamp
	FROM "Blog"."DeviceMetadata"
	WHERE measure_name = 'multi'
		AND time > ago(365d)
	GROUP BY device_id
		,measure_name
	)
SELECT devmd.device_id, lts.latest_timestamp, devmd.state
FROM "Blog"."DeviceMetadata" devmd
JOIN latest_ts AS lts ON lts.device_id = devmd.device_id
	AND lts.latest_timestamp = devmd.time
WHERE devmd.measure_name = 'multi'
	AND devmd.state <> 'IN_SERVICE'
	AND devmd.time > ago(365d)

このクエリでは、まず過去 365 日でそれぞれのデバイスの状態に関連した最新のタイムスタンプを CTE を利用して latest_ts として取得しています。デバイスは全部で 14 個あり、過去 365 日間で全デバイスで最新の状態が取れると考えられる為、CTE は以下の通り 14 行を結果として返します。

device_id latest_timestamp
smarty9 2021-01-01 10:00:00.000000000
coldy2 2021-01-01 10:20:00.000000000
smarty7 2021-01-01 10:00:00.000000000
coldy1 2021-01-01 10:00:00.000000000
smarty2 2021-01-01 10:00:00.000000000
smarty0 2021-01-01 10:00:00.000000000
smarty6 2021-01-01 10:00:00.000000000
smarty4 2021-01-01 10:00:00.000000000
smarty8 2021-01-01 10:00:00.000000000
smarty5 2021-01-01 10:00:00.000000000
real2 2021-01-01 10:00:00.000000000
smarty3 2021-01-01 10:00:00.000000000
smarty1 2021-01-01 10:00:00.000000000
real1 2021-01-01 09:58:00.000000000

この時点で、各デバイスの最新状態のタイムスタンプが取得出来ました。次にメインのテーブルと CVE で定義した latest_ts とを device_id、タイムスタンプで結合し、実際のステータスを確認します。ここでは、 最新の状態で IN_SERVICE となっていないデバイスのみ取るようにフィルターをかけています。用意したデータセットからは、この検索の結果、2 つのデバイスの情報が結果として取得できました。もしも単純に全てのデバイスの最新の状態が取りたい場合は、WHERE 句を省略して 14 行 (全てのデバイス) を取得する事も出来ます。

device_id latest_timestamp state
coldy2 2021-01-01 10:20:00.000000000 AUTO_SHUTDOWN
real2 2021-01-01 10:00:00.000000000 REPLACE_BATTERY

Timestream のコンソール画面で結果タブを参照すると、このクエリは 8.17 KB をスキャンした事が分かります。
さて、次にもう 1 つのやり方として、CTE を使わず max_by() 関数を使う方法を見てみましょう。

SELECT device_id
	,max_by(time, time) AS latest_timestamp
	,max_by(state, time) AS state
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
	AND measure_name = 'multi'
GROUP BY device_id
HAVING max_by(state, time) <> 'IN_SERVICE'

このクエリでは前述の CTE を使ったクエリと同じ結果を得る事ができます。ここでは、max_by(x , y) 関数を使っており、y が最大値の場合の、関連する項目である x の値をとる事ができます。この例では、時刻が最大となる場合のメジャー値を取得しています。また、最新のステータスが IN_SERVICE でないデバイスのみを取得する為、HAVING 句を利用してフィルターをかけています。

この2つのクエリを比較すると、2 番目のクエリの方がより単純です。また、スキャンしたバイト数についても2番目のクエリの方が小さい為、コストについても 2 番目のクエリが安くなりえる事が分かります (但し、Timestream においては、10 MB 以下のスキャン量については全て 10 MB に切り上げて計算される事に注意して下さい)

尚、max_by() 関数は、例えば max_by(temperature , time , 5) のように記述すると、5 個の最新の temperature の値を取る事も出来ます。

2. 補間

IoT のユースケースでは、デバイスからのデータが一部欠落していたり、データ取得の間隔がずれたり、現実的では無い値が取れたりする事がよくあります。以下の例で real1, real2 というデバイスの気温センサーの測定値を見ていきましょう。

SELECT time
	,device_id
	,temperature AS temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND city = 'Zion'
	AND device_id IN ('real1', 'real2')
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
ORDER BY device_id
	,time ASC

この例では、センサー real1 の 9:20 AM 近辺のデータが無く、タイムスタンプの間隔についても偏りがあります。また、センサー real2 では 9:44 AM のデータが非現実的な値となっています。

time device_id temp
2021-01-01 09:01:00.000000000 real1 102
2021-01-01 09:13:00.000000000 real1 103
2021-01-01 09:31:00.000000000 real1 107
2021-01-01 09:44:00.000000000 real1 106
2021-01-01 09:58:00.000000000 real1 105
2021-01-01 09:00:00.000000000 real2 104
2021-01-01 09:12:00.000000000 real2 105
2021-01-01 09:22:00.000000000 real2 109
2021-01-01 09:33:00.000000000 real2 109
2021-01-01 09:44:00.000000000 real2 999

非現実的な値は temperature < 999 という条件を入れて取り除いてみましょう。

SELECT time
	,device_id
	,temperature AS temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND city = 'Zion'
	AND device_id IN ('real1', 'real2')
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
	AND temperature < 999
ORDER BY device_id
	,time ASC

欠落しているデータを補って、10 分間隔のデータポイントの見栄えの良いグラフを得るにはどうすれば良いのでしょう? その為には、INTERPOLATE_LINEAR 関数を使い、欠落しているデータを線形補間するようにします。

SELECT device_id
	,INTERPOLATE_LINEAR(
		CREATE_TIME_SERIES(time, temperature), 
		SEQUENCE(min(time), max(time), 10m)
	) AS interpolated_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND city = 'Zion'
	AND device_id IN ('real1', 'real2')
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
	AND temperature < 999
GROUP BY device_id
ORDER BY device_id

INTERPOLATE_LINEAR 関数は、時系列 (第 1 引数) と、タイムスタンプの配列 (第 2 引数) を入力とする事で、時系列のデータ欠損をタイムスタンプの配列に沿って補完していきます。この関数は、時系列のオブジェクトを結果として返す為、上述のクエリについても以下のように device_id と時系列データを持つ複数行を返します。

device_id interpolated_temp
real1 [
{ time: 2021-01-01 09:01:00.000000000, value: 102 },
{ time: 2021-01-01 09:11:00.000000000, value: 102 },

]
real2 [
{ time: 2021-01-01 09:00:00.000000000, value: 104 },
{ time: 2021-01-01 09:10:00.000000000, value: 104 },

]

尚、デバイス real2 の時系列データは表データとすると以下のようにも表せます。

time value
2021-01-01 09:00:00.000000000 104
2021-01-01 09:10:00.000000000 104
2021-01-01 09:20:00.000000000 108
2021-01-01 09:30:00.000000000 109

尚、SEQUENCE 関数では結果配列のサイズ制限 (10,000 個まで) がありますので、その制限を超過しないようにする為に、集約した (この例で言うと平均) 気温で補完する必要があります。

WITH data
AS (
	SELECT BIN(time, 15s) as t
		,device_id
		,avg(temperature) as avg_temp
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now()
		AND city = 'Zion'
		AND device_id IN ('real1', 'real2')
		AND measure_name = 'multi'
		AND temperature IS NOT NULL
		AND temperature < 999
	GROUP BY BIN(time, 15s), device_id
	)
SELECT device_id
	,INTERPOLATE_LINEAR(
		CREATE_TIME_SERIES(t, avg_temp), 
		SEQUENCE(min(t), max(t), 10m)
	) AS interpolated_temp
FROM data
GROUP BY device_id
ORDER BY device_id

もしも、元々の型でデータを返したい場合は、以下のように UNNEST 関数を使い、結果をフラットなテーブルに変換して、補間値を丸めます。

WITH data
AS (
	SELECT BIN(time, 15s) as t
		,device_id
		,avg(temperature) as avg_temp
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now()
		AND city = 'Zion'
		AND device_id IN ('real1', 'real2')
		AND measure_name = 'multi'
		AND temperature IS NOT NULL
		AND temperature < 999
	GROUP BY BIN(time, 15s), device_id
	)
	,interpolated_data
AS (
	SELECT device_id
		,INTERPOLATE_LINEAR(
			CREATE_TIME_SERIES(t, avg_temp), 
			SEQUENCE(min(t), max(t), 10m)
		) AS interpolated_temp
	FROM data
	GROUP BY device_id
	)
SELECT device_id
	,time
	,round(value, 1) as interpolated_rounded_temp
FROM interpolated_data
CROSS JOIN UNNEST(interpolated_temp)

この結果は欠損値を補完しつつ、全てのデータは 10 分間隔で表示されています。

device_id time interpolated_rounded_temp
real1 2021-01-01 09:01:00.000000000 102.0
real1 2021-01-01 09:11:00.000000000 102.8
real1 2021-01-01 09:21:00.000000000 104.8
real1 2021-01-01 09:31:00.000000000 107.0
real1 2021-01-01 09:41:00.000000000 106.2
real1 2021-01-01 09:51:00.000000000 105.5
real2 2021-01-01 09:00:00.000000000 104.0
real2 2021-01-01 09:10:00.000000000 104.8
real2 2021-01-01 09:20:00.000000000 108.2
real2 2021-01-01 09:30:00.000000000 109.0

3. 複数のイベントの相互関係 (N-event correlation)

この例では、オーバーヒートしたデバイスがどの位の時間で冷却され通常の値に戻るかを測定していきます。冷却にかかる時間は OVERHEATING から IN_SERVICE の状態に戻るまでの時間で定義するものとします。coldy2 のセンサーに注目してみましょう。

SELECT time
	,device_id
	,state
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
	AND device_id = 'coldy2'
	AND measure_name = 'multi'
ORDER BY time ASC

上記 SQL で以下の結果を得る事が出来ます。

time device_id state
2021-01-01 09:00:00.000000000 coldy2 OVERHEATING
2021-01-01 09:10:00.000000000 coldy2 IN_SERVICE
2021-01-01 09:20:00.000000000 coldy2 OVERHEATING
2021-01-01 09:30:00.000000000 coldy2 IN_SERVICE
2021-01-01 09:40:00.000000000 coldy2 OVERHEATING
2021-01-01 09:50:00.000000000 coldy2 OVERHEATING
2021-01-01 10:00:00.000000000 coldy2 IN_SERVICE
2021-01-01 10:10:00.000000000 coldy2 OVERHEATING
2021-01-01 10:20:00.000000000 coldy2 AUTO_SHUTDOWN

9:40 と 9:50 でデバイスは 2 度 OVERHEATING の状態となっている事がわかります。状態の変化に注目する必要がある為、連続した同じ値をまずは省いてみましょう。この為に、現在の行から前の指定されたオフセットの行を返す LAG ウィンドウ関数を使います。

SELECT BIN(time, 10m) AS binnedTime
	,device_id
	,MIN(state) AS state
	,LAG(MIN(state), 1) OVER (
		PARTITION BY device_id ORDER BY BIN(time, 10m)
		) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
	AND device_id = 'coldy2'
	AND measure_name = 'multi'
GROUP BY device_id
	,BIN(time, 10m)

以下の結果が得られました。

binnedTime device_id state prevState
2021-01-01 09:00:00.000000000 coldy2 OVERHEATING
2021-01-01 09:10:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 09:20:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 09:30:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 09:40:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 09:50:00.000000000 coldy2 OVERHEATING OVERHEATING
2021-01-01 10:00:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 10:10:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 10:20:00.000000000 coldy2 AUTO_SHUTDOWN OVERHEATING

この結果では同じ行にその時刻の状態と、1 つ前の状態が表示されていますが、以下のようにフィルタを追加する事で重複した行を削除する事が出来そうです。

WITH binnedData
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,device_id
		,MIN(state) AS state
		,LAG(MIN(state), 1) OVER (
			PARTITION BY device_id ORDER BY BIN(time, 10m)
			) AS prevState
	FROM "Blog"."DeviceMetadata"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'coldy2'
		AND measure_name = 'multi'
	GROUP BY device_id
		,BIN(time, 10m)
)
SELECT *
FROM binnedData
WHERE (
		state != prevState
		OR prevState IS NULL
		)
ORDER BY binnedTime

以下の結果を得る事が出来ました。

binnedTime device_id state prevState
2021-01-01 09:00:00.000000000 coldy2 OVERHEATING
2021-01-01 09:10:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 09:20:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 09:30:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 09:40:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 10:00:00.000000000 coldy2 IN_SERVICE OVERHEATING
2021-01-01 10:10:00.000000000 coldy2 OVERHEATING IN_SERVICE
2021-01-01 10:20:00.000000000 coldy2 AUTO_SHUTDOWN OVERHEATING

ステータスが変化するまでの時間を測定する為には、1 つ前の状態の時刻が必要となります。ここで再度、binnedTime 列に対して LAG 関数を使います。

WITH binnedData
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,device_id
		,MIN(state) AS state
		,LAG(MIN(state), 1) OVER (
			PARTITION BY device_id ORDER BY BIN(time, 10m)
			) AS prevState
	FROM "Blog"."DeviceMetadata"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'coldy2'
		AND measure_name = 'multi'
	GROUP BY device_id
		,BIN(time, 10m)
	)
	,cleaned_data
AS (
	SELECT binnedTime
		,device_id
		,state
		,prevState
	FROM binnedData
	WHERE (
			state != prevState
			OR prevState IS NULL
			)
	ORDER BY binnedTime
	)
SELECT binnedTime
	,device_id
	,state
	,prevState
	,LAG(binnedTime, 1) OVER (
		PARTITION BY device_id ORDER BY binnedTime
		) AS prevBinnedTime
FROM cleaned_data

結果として、1 つ前の状態とその時刻が同じ行で表示されました。

binnedTime device_id state prevState prevBinnedTime
2021-01-01 09:00:00.000000000 coldy2 OVERHEATING
2021-01-01 09:10:00.000000000 coldy2 IN_SERVICE OVERHEATING 2021-01-01 09:00:00.000000000
2021-01-01 09:20:00.000000000 coldy2 OVERHEATING IN_SERVICE 2021-01-01 09:10:00.000000000
2021-01-01 09:30:00.000000000 coldy2 IN_SERVICE OVERHEATING 2021-01-01 09:20:00.000000000
2021-01-01 09:40:00.000000000 coldy2 OVERHEATING IN_SERVICE 2021-01-01 09:30:00.000000000
2021-01-01 10:00:00.000000000 coldy2 IN_SERVICE OVERHEATING 2021-01-01 09:40:00.000000000
2021-01-01 10:10:00.000000000 coldy2 OVERHEATING IN_SERVICE 2021-01-01 10:00:00.000000000
2021-01-01 10:20:00.000000000 coldy2 AUTO_SHUTDOWN OVERHEATING 2021-01-01 10:10:00.000000000

このデータを使って、クーリングオフの時間 (OVERHEATINGの状態からIN_SERVICE の状態に移るまでの時間) を計算する事ができます。

WITH binnedData
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,device_id
		,MIN(state) AS state
		,LAG(MIN(state), 1) OVER (
			PARTITION BY device_id ORDER BY BIN(time, 10m)
			) AS prevState
	FROM "Blog"."DeviceMetadata"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'coldy2'
		AND measure_name = 'multi'
	GROUP BY device_id
		,BIN(time, 10m)
	)
	,cleaned_data
AS (
	SELECT binnedTime
		,device_id
		,state
		,prevState
		,
		--windowing function executes after where, so we can use it in this query
		LAG(binnedTime, 1) OVER (
			PARTITION BY device_id ORDER BY binnedTime
			) AS prevBinnedTime
	FROM binnedData
	WHERE (
			state != prevState
			OR prevState IS NULL
			)
	ORDER BY binnedTime
	)
SELECT binnedTime
	,binnedTime - prevBinnedTime AS recovery_time
	,device_id
FROM cleaned_data
WHERE state = 'IN_SERVICE'
	AND prevState = 'OVERHEATING'

無事、以下の結果を得る事が出来ました。

binnedTime recovery_time device_id
2021-01-01 09:10:00.000000000 0 00:10:00.000000000 coldy2
2021-01-01 09:30:00.000000000 0 00:10:00.000000000 coldy2
2021-01-01 10:00:00.000000000 0 00:20:00.000000000 coldy2

この手法は様々なイベントに対して応用可能です。例えば、以下のクエリでは、IN_SERVICE から OVERHEATING となり、その後、AUTO_SHUTDOWN となったパターンを検索する事ができます。

WITH binnedData
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,device_id
		,MIN(state) AS state
		,LAG(MIN(state), 1) OVER (
			PARTITION BY device_id ORDER BY BIN(time, 10m)
			) AS prevState
	FROM "Blog"."DeviceMetadata"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'coldy2'
		AND measure_name = 'multi'
	GROUP BY device_id
		,BIN(time, 10m)
	)
	,cleaned_data
AS (
	SELECT binnedTime
		,device_id
		,state
		,prevState
		,
		--windowing function executes after where, so we can use it in this query
		LAG(binnedTime, 1) OVER (
			PARTITION BY device_id ORDER BY binnedTime
			) AS prevBinnedTime
		,LAG(state, 2) OVER (
			PARTITION BY device_id ORDER BY binnedTime
			) AS prevPrevState
	FROM binnedData
	WHERE (
			state != prevState
			OR prevState IS NULL
			)
	ORDER BY binnedTime
	)
SELECT binnedTime
	,device_id
	,state
	,prevState
	,prevPrevState
FROM cleaned_data
WHERE prevPrevState = 'IN_SERVICE'
	AND prevState = 'OVERHEATING'
	AND state = 'AUTO_SHUTDOWN'

以下の結果を得る事が出来ました。

binnedTime device_id state prevState prevPrevState
2021-01-01 10:20:00.000000000 coldy2 AUTO_SHUTDOWN OVERHEATING IN_SERVICE

4. 特定の期間毎に集計する (running aggregates)

この例では、デバイスの状態をよりよくモニターして理解する為に、積算 (移動総計) を使います。30 分間でデバイスが何度 IN_SERVICE で無い状態であったかを把握する為、以下のようなクエリを実行してみます。

WITH binned_time
AS (
	SELECT device_id
		,bin(time, 30m) AS time_bucket
		,count(*) AS event_count_in_the_bucket
	FROM "Blog"."DeviceMetadata"
	WHERE time BETWEEN ago(365d) AND now()
		AND measure_name = 'multi'
		AND state IS NOT NULL
		AND state <> 'IN_SERVICE'
	GROUP BY device_id
		,bin(time, 30m)
	)
SELECT device_id
	,time_bucket
	,event_count_in_the_bucket
	,sum(event_count_in_the_bucket) OVER (
		PARTITION BY device_id ORDER BY time_bucket range BETWEEN unbounded preceding AND CURRENT row
		) AS cumulative_event_count
FROM binned_time

ここでは共通テーブル式 (CTE) を使って binned_time というテーブルを作っていますが、フィルター条件として、state というメジャー値が IN_SERVICE ではないものにしています。また、bin(time , 30m)count() を使いつつ、device_id と 30 分間隔でグループ化する事で、30 分毎の条件のイベントの積算値を算出しています。

CTE で指定したテーブルを得たら、次は SUM() や、OVER() のウィンドウ関数を使い、イベントの積算を算出します。ウィンドウ関数では PARTITION BYORDER BY 句を含む OVER 句が必要となります。PARTITION BY 句はパーティション毎に計算をリセットし、また ORDER BY 句により計算結果を算出する際のレコード順を指定出来ます。

この例では、デバイス毎の 30 分間隔の積算値を知りたいので、PARTITION BY 句として device_id を指定し、ORDER BY 句としては、bin(time , 30m) を指定しています。また RANGE 句を指定する事で、ウィンドウ関数が合計値 (sum) を算出する範囲をその前の全イベントから現在の行までと指定しています (尚、全てのデータポイントで、積算は過去のデータ範囲のイベントの合計と、現在のイベントの件数の合算値となります)

このクエリの結果は以下を返します。

device_id time_bucket event_count_in_the_bucket cumulative_event_count
coldy2 2021-01-01 17:00:00.000000000 2 2
coldy2 2021-01-01 17:30:00.000000000 2 4
coldy2 2021-01-01 18:00:00.000000000 2 6
coldy1 2021-01-01 17:00:00.000000000 1 1
coldy1 2021-01-01 17:30:00.000000000 2 3
real2 2021-01-01 17:30:00.000000000 2 2
real2 2021-01-01 18:00:00.000000000 1 3

尚、積算を使う場合には、単純な値の変化を見るより、変化率を確認する方が簡単な場合があります。変化率を使うと、積算や平均のアノマリや急激な増加、減少を識別する事ができます。この事は次のセクションで議論する事にします。

5. 導関数と変化率

この例では、10 分間隔での気温 (単位は華氏) の変化を見ていきましょう。センサーがオフラインとなっている場合でもメジャー値の欠損を考慮し、10 分間隔での温度変化を取得出来るようなクエリを考えていきます。まずはデータを見てみましょう。

SELECT time
	,device_id
	,temperature
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND device_id = 'real1'
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
ORDER BY time ASC

以下の結果には、9:20 AM 前後で欠損値がある事が分かります。

time device_id temperature
2021-01-01 09:01:00.000000000 real1 102
2021-01-01 09:13:00.000000000 real1 103
2021-01-01 09:31:00.000000000 real1 107
2021-01-01 09:44:00.000000000 real1 106
2021-01-01 09:58:00.000000000 real1 105

メジャー間の増分変化を計算するには、先の例で紹介した LAG 関数を使用する事もできますが、組み込みの DERIVATIVE_LINEAR 関数を使う事で、よりシンプルに変化率を計算できます。

DERIVATIVE_LINEAR 関数は時系列データとその間隔を引数として取り、指定された間隔で各ポイントの導関数を返します。導関数は微積分で使われる数学的な手法であり、値の変化率を計算するものです。以下の SQL はデバイス real1rateOfTempChange 列として時系列データを返します。

WITH binned_view
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,MIN(temperature) AS minTempPerBucket
		,device_id
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'real1'
		AND measure_name = 'multi'
		AND temperature IS NOT NULL
	GROUP BY device_id
		,BIN(time, 10m)
	)
SELECT device_id
	,derivative_linear(CREATE_TIME_SERIES(binnedTime, minTempPerBucket), 10m) AS rateOfTempChange
FROM binned_view
GROUP BY device_id

次の結果を得る事ができました。

device_id rateOfTempChange
real1 [
{ time: 2021-01-01 09:10:00.000000000, value: 1.0 },
{ time: 2021-01-01 09:30:00.000000000, value: 2.0 },

]

デバイス real1 の時系列は以下の表のように表す事ができます。

time value
2021-01-01 09:10:00.000000000 1.0
2021-01-01 09:30:00.000000000 2.0
2021-01-01 09:40:00.000000000 -1.0
2021-01-01 09:50:00.000000000 -1.0

9:30 AM の変化率は 2 度となっていますが、20 分の間で華氏 103 度から 107 度に変化している事から、10 分間隔では 2 度変化している事になり、計算結果が正しい事が分かります。もしも、元のデータ型として扱いたいなら、UNNIEST 関数を使って時系列データをフラットな表にします。

WITH binned_view
AS (
	SELECT BIN(time, 10m) AS binnedTime
		,MIN(temperature) AS minTempPerBucket
		,device_id
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now()
		AND device_id = 'real1'
		AND measure_name = 'multi'
		AND temperature IS NOT NULL
	GROUP BY device_id
		,BIN(time, 10m)
	)
	,interpolated
AS (
	SELECT device_id
		,derivative_linear(CREATE_TIME_SERIES(binnedTime, minTempPerBucket), 10m) AS rateOfTempChange
	FROM binned_view
	GROUP BY device_id
	)
SELECT device_id
	,time
	,value
FROM interpolated
CROSS JOIN UNNEST(rateOfTempChange)

以下の結果を得る事ができました。

device_id time value
real1 2021-01-01 09:10:00.000000000 1.0
real1 2021-01-01 09:30:00.000000000 2.0
real1 2021-01-01 09:40:00.000000000 -1.0
real1 2021-01-01 09:50:00.000000000 -1.0

6. 高いカーディナリティの N 個の時系列の取得

時間の経過に伴う都市の平均気温のグラフを表示したい場合を考えましょう。以下の SQL を使うと、10 分間隔でデバイスに記録される気温から平均気温を表示する事が出来ます。

SELECT BIN(time, 10m) AS binnedTime
	,avg(temperature) AS avg_temp
	,city
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
GROUP BY BIN(time, 10m)
	,city
ORDER BY city
	,BIN(time, 10m) ASC LIMIT 10

このクエリで以下の結果が得られます。

binnedTime avg_temp city
2021-01-01 09:00:00.000000000 70.3 Basin City
2021-01-01 09:10:00.000000000 69.8 Basin City
2021-01-01 09:20:00.000000000 67.0 Basin City
2021-01-01 09:30:00.000000000 69.5 Basin City
2021-01-01 09:40:00.000000000 69.6 Basin City
2021-01-01 09:50:00.000000000 68.5 Basin City
2021-01-01 10:00:00.000000000 69.1 Basin City
2021-01-01 09:00:00.000000000 22.5 Coruscant
2021-01-01 09:10:00.000000000 23.5 Coruscant
2021-01-01 09:20:00.000000000 24.0 Coruscant

このデータセットには 3 つの都市しか含まれていませんが、実際のデータでは 10,000 以上の都市が含まれる事もあるでしょう。その場合、前述のクエリはタイムスタンプ毎に 10,000 以上のデータポイントを返す事になります。1 つのグラフにそのような膨大な数が含まれると非常に読みづらく現実的ではありません。もしも、特定の期間の平均気温で並び替えられた上位 N 個の最も熱い都市のグラフにのみ関心がある場合は、どのようにすれば良いでしょうか?このデータセットには 3 つの都市がある為、以下の SQL を実行して上位 2 つの最も熱い都市に限定してみましょう。

SELECT city
	,avg(temperature) AS avg_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
GROUP BY city
ORDER BY avg(temperature) DESC
	,city LIMIT 2

以下の結果を得る事が出来ました。

city avg_temp
Zion 194.9
Basin City 69.11428571428571

上位 2 つの熱い都市が得られたので、副問い合わせを利用してこれら 2 つの都市の平均気温のみを取得するように、最初のクエリに制限をかけてみます。

SELECT BIN(time, 10m) AS binnedTime
	,avg(temperature) AS avg_temp
	,city
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
	AND measure_name = 'multi'
	AND temperature IS NOT NULL
	AND city IN (
		SELECT city
		FROM "Blog"."SensorReadings"
		WHERE time BETWEEN ago(365d) AND now()
			AND measure_name = 'multi'
			AND temperature IS NOT NULL
		GROUP BY city
		ORDER BY avg(temperature) DESC
			,city LIMIT 2
		)
GROUP BY BIN(time, 10m)
	,city
ORDER BY city
	,BIN(time, 10m) ASC

結果として、最も熱い都市の平均気温が 10 分間隔でまとめて表示されます。3 番目の都市の気温は表示されません。

binnedTime avg_temp city
2021-01-01 09:00:00.000000000 70.3 Basin City
2021-01-01 09:10:00.000000000 69.8 Basin City
2021-01-01 09:20:00.000000000 67.0 Basin City
2021-01-01 09:30:00.000000000 69.5 Basin City
2021-01-01 09:40:00.000000000 69.6 Basin City
2021-01-01 09:50:00.000000000 68.5 Basin City
2021-01-01 10:00:00.000000000 69.1 Basin City
2021-01-01 09:00:00.000000000 103.0 Zion
2021-01-01 09:10:00.000000000 104.0 Zion
2021-01-01 09:20:00.000000000 109.0 Zion
2021-01-01 09:30:00.000000000 108.0 Zion
2021-01-01 09:40:00.000000000 552.5 Zion
2021-01-01 09:50:00.000000000 105.0 Zion

7. ヒストグラムの作成

別の一般的なクエリパターンとして、メジャーやディメンジョンの値をバケット化し、そのバケット内でデータ分析を行うような場合が考えられます。ここでは、各都市のデバイスに記録される気温の分布を理解し、ヒストグラムを作成してみましょう。まずは、全ての都市で 5 度単位で測定値をまとめて (10~15、15~20、20~25等) 、測定値の数を数えてみます。

WITH buckets
AS (
	SELECT time
		,city
		,device_id
		,temperature
		,width_bucket(temperature, 0, 110, 22) bucket_id
	FROM "Blog"."SensorReadings"
	WHERE time BETWEEN ago(365d) AND now()
		AND measure_name = 'multi'
		AND temperature BETWEEN 0 AND 110
	)
SELECT city
	,bucket_id AS temp_bucket_id
	,cast((bucket_id - 1) * 5 AS VARCHAR) || '-' || cast(bucket_id * 5 AS VARCHAR) AS temp_bucket
	,count(*) AS temperature_readings
	,round(avg(temperature), 2) AS avg_temperature
	,round(approx_percentile(temperature, 0.5), 2) AS p50_temperature
	,round(approx_percentile(temperature, 0.9), 2) AS p90_temperature
FROM buckets
GROUP BY city
	,bucket_id
	,cast((bucket_id - 1) * 5 AS VARCHAR) || '-' || cast(bucket_id * 5 AS VARCHAR)
ORDER BY city
	,bucket_id

ここでは WIDTH_BUCKET 関数を利用しますが、利用方法としては 2 種類あります。1 つ目は WIDTH_BUCKET(x , bound1 , bound2 , n)と指定するものです。これは、bound1bound2 の境界内で n 個の等間隔のヒストグラムでバケットを作成した際の x が所属するバケット番号を返します。別の利用法としては WIDTH_BUCKET(x , bins) とするもので、配列 bins で指定された特定のまとまりで x が所属するバケット番号を返します。ここでは、5 度毎のまとまりでヒストグラムを作りたい為、最初の利用法を使ってみましょう。

初めに共通テーブル式を使って buckets というテーブルを定義し、各気温測定値のバケットの番号を取得します。次に、buckets に対してクエリを実行し、各都市の気温測定値の個数、平均気温、50 パーセンタイル、90 パーセンタイル等を取得します。また、見やすいようにバケット番号のテキストラベルも付与します。

city temp_bucket_id temp_bucket temperature_readings avg_temperature p50_temperature p90_temperature
Basin City 14 65-70 38 66.76 67 69
Basin City 15 70-75 31 71.81 71 74
Basin City 16 75-80 1 75.0 75 75
Coruscant 5 20-25 14 22.93 23 24
Coruscant 6 25-30 2 25.0 25 25
Zion 21 100-105 3 103.0 103 104
Zion 22 105-110 6 106.83 107 109

結果を見ると、各都市の気温がどのように分布しているかや、その範囲が分かります ( Basin City は 65~80、Coruscant は 20~30、Zion は 100~110 )

クリーンアップ

今後、利用料が発生しないように、作成した Timestream テーブル (SensorReadingDeviceMetadata) については忘れずに削除しておきましょう。

まとめ

Amazon Timestream は使用量に応じて課金が生じるサーバーレスの時系列専用のデータベースであり、パフォーマンス、コストに優れたクエリを実行する事ができます。また、時系列データに最適化されており、組み込みの時系列関数を利用して、クエリの複雑さを軽減する事が可能です。この投稿では、Timestream のワークロードでいくつかの一般的なアクセスパターンを取り上げ、効果的にクエリを実装する方法を学びました。各クエリを順に解説し、クエリの各部分がどのように動作してデータを処理するのかについても説明しました。また、Timestream で利用可能な組み込みの時系列関数についてもいくつか取り上げました。これらの関数はクエリのコスト、パフォーマンスを改善するのに有用な為、是非、積極的にご活用下さい。

翻訳はテクニカルアカウントマネージャーの西原が担当しました。原文は こちら です。