Amazon Web Services ブログ
Amazon Timestream のよくあるクエリパターンとその効率的な SQL 記述方法
Amazon Timestream は時系列データを取り扱う為のサーバーレスの目的別データベースサービスであり、その使用量に応じて課金が行われます。時系列データに対して、SQL を利用してクエリを実行しますが、同じ結果を得る為に複数の SQL 記述方法があり、そのパフォーマンスやコスト、複雑さはそれぞれ異なる為、正しいクエリを特定する事はとても重要です。また、組み込みの時系列関数を利用する事でクエリの複雑さを軽減する事ができます。
この投稿では、時系列のワークロードで一般的なクエリパターンと、それに対する Timestream の SQL クエリの効果的な記述方法を取り上げます。初めに本投稿で利用する IoT のデータセットを確認し、次に、Timestream でのフラット/時系列データモデルについて確認します。フラット/時系列データモデルの差を理解する事で、この投稿で利用されるクエリの複雑さを軽減する Timestream の組み込みの時系列関数をより深く理解する事が出来ます。最後に、7 つの一般的なアクセスパターンについて取り上げます。尚、それぞれのクエリパターンは独立している為、興味のあるところだけ確認頂ければ十分です。
サンプルデータ
この投稿で紹介される例では、各クエリがどのように実行されるか理解頂けるよう、全て少ないデータ量で実行されていますが、実際にはその数千倍の規模のデータでもテストをしており、実用に耐えうるクエリであるとお考え下さい。また、データセットは IoT ワークロードに関連するものであり、実運用で見られる様々なシナリオ (メジャーの欠落や無効なもの) が含まれています。
次のセクションのスクリプトを実行すると、データセットを生成して、Timestream にデータをロードする事ができます。データセットは <current_year>-01-01 09:00:00
で始まり、14 個のデバイスからのデータを含みます。各デバイスには複数のセンサーがあり、午前 10 時まで約 10 分間隔でデータを測定します。各測定値は、SensorReadings
、DeviceMetadata
という 2 つのテーブルに分割された 複数のメジャーで構成されています。SensorReadings
テーブルは temperature
、humidity
、air_quality_no2
というメジャーがあり、DeviceMetadata
テーブルは battery_level
、state
、uc_temperature
というメジャーがあります。また、それぞれのメジャーは各テーブル内の 1 つの行として格納されています。
本投稿では、全てのクエリで WHERE 句で時刻でフィルターをかけています。また、その他のメジャー名やディメンジョン名を条件に加える事で、データを絞り込んでスキャンするデータ量を削減し、より良いパフォーマンスを得たり、コスト削減につなげることができます。クエリのベストプラクティスの詳細についてはこちらを確認して下さい。
前提
操作を始める前に、Amazon Timestream への読み書きが可能で、AWS CloudShell にアクセス出来る AWS Account を用意しましょう。
サンプルデータの Timestream へのデータ投入
本投稿で利用するサンプルデータを生成する為に、次のステップを実施して下さい。
Blog
という名前の Timestream のデータベースを作成し、SensorReadings
、DeviceMetadata
テーブルを作成しましょう。メモリでの保持期間は 1 年で設定します。- CloudShell を使って、python スクリプトを実行しましょう (
python3 script_name.py
で実行可能です)
フラットモデルと時系列モデル
Timestream ではフラットモデルと時系列モデルの 2 つのデータモデルをサポートしています。フラットモデルは表形式のフォーマットとなっており、それぞれの測定値は1つの独立した行として表され、時刻、メジャー名、メジャー値、及びディメンジョン値がカラムとして設定されます。フラットモデルでは標準SQL (標準 SQL 関数、演算子、集約、フィルタリング等) を利用し、組み込みの時系列関数は使用しません。SensorReadings
テーブルのサンプルデータは、temperature
、humidity
、air_quality_no2
、そして request_number
をマルチメジャーとして Timestream に格納しており (ディメンジョンは device_id
と city
) 、これらのフィールドには全て SQL 構文で1つの行としてアクセス出来ます。例えば、次の SELECT 文を使う事で、気温と湿度の測定値のリストと対応するタイムスタンプ、リクエストナンバーを取得する事が出来ます。
SELECT time, device_id, city, temperature, humidity, request_number
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature < 999
ORDER BY device_id
,time ASC
この SQL を実行すると、以下のフラットモデルで表されるデータが得られます。
time | device_id | city | temperature | humidity | request_number |
2021-01-01 09:01:00.000000000 | real1 | Zion | 102 | 13.2 | 30493 |
2021-01-01 09:13:00.000000000 | real1 | Zion | 103 | 13.1 | 30494 |
2021-01-01 09:31:00.000000000 | real1 | Zion | 107 | 13.1 | 30496 |
2021-01-01 09:44:00.000000000 | real1 | Zion | 106 | 13.0 | 30497 |
2021-01-01 09:58:00.000000000 | real1 | Zion | 105 | 12.9 | 30499 |
2021-01-01 09:00:00.000000000 | real2 | Zion | 104 | 14.2 | 43231 |
2021-01-01 09:12:00.000000000 | real2 | Zion | 105 | 14.1 | 43232 |
2021-01-01 09:22:00.000000000 | real2 | Zion | 109 | 14.1 | 43233 |
2021-01-01 09:33:00.000000000 | real2 | Zion | 109 | 14.1 | 43234 |
一方、時系列モデルでは、一連のディメンジョンに対する時刻とメジャー値のペアを順序付けたシーケンスとして表されます。このデータ形式は Timestream の組み込みの時系列関数を利用する場合に必要であり、欠損値がある場合のギャップを埋める補間、変化率を見つける導関数、また、2 つの時系列の相関等を導出するケース等で利用されます。
フラットモデルから時系列モデルに変換する際には、組み込みの CREATE_TIME_SERIES
関数を利用します。この関数は、time
と value
の 2 つの引数を持ち、タイムスタンプと対応する値のシーケンスを時系列データとして返します。このデータタイプは組み込みの時系列関数の入力として利用されます。以下、CREATE_TIME_SERIES
関数の利用例となります。
SELECT device_id
,CREATE_TIME_SERIES(time, temperature) AS temp_series
,CREATE_TIME_SERIES(time, humidity) AS humidity_series
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature < 999
GROUP BY device_id
ORDER BY device_id
このクエリを実行すると、以下の結果が得られます。
device_id | temp_series | humidity_series |
real1 | [ { time: 2021-01-01 09:01:00.000000000, value: 102 }, { time: 2021-01-01 09:13:00.000000000, value: 103 }, { time: 2021-01-01 09:31:00.000000000, value: 107 }, { time: 2021-01-01 09:44:00.000000000, value: 106 }, { time: 2021-01-01 09:58:00.000000000, value: 105 } ] |
[ { time: 2021-01-01 09:01:00.000000000, value: 13.2 }, { time: 2021-01-01 09:13:00.000000000, value: 13.1 }, { time: 2021-01-01 09:31:00.000000000, value: 13.1 }, { time: 2021-01-01 09:44:00.000000000, value: 13.0 }, { time: 2021-01-01 09:58:00.000000000, value: 12.9 } ] |
real2 | [ { time: 2021-01-01 09:00:00.000000000, value: 104 }, { time: 2021-01-01 09:12:00.000000000, value: 105 }, { time: 2021-01-01 09:22:00.000000000, value: 109 }, { time: 2021-01-01 09:33:00.000000000, value: 109 } ] |
[ { time: 2021-01-01 09:00:00.000000000, value: 14.2 }, { time: 2021-01-01 09:12:00.000000000, value: 14.1 }, { time: 2021-01-01 09:22:00.000000000, value: 14.1 }, { time: 2021-01-01 09:33:00.000000000, value: 14.1 } ] |
UNNEST
標準 SQL の集約や WINDOW 関数を利用して他のテーブルやデータセットと結合させたり、集約を行いたい場合には、データを時系列モデルからフラットモデルに戻す必要があります。その場合は、UNNEST
関数を用いると、CREATE_TIME_SERIES
関数と逆の変換が可能で、入力として時系列データを与えると、時刻とメジャー値の 2 つのカラムを持ったテーブルを返します。また、UNNEST はテーブル、時刻、メジャーのカラム名に対して UNNEST (timeseries) AS <alias_name> (time_alias, value_alias)
のように、別名を設定する事が可能です。
WITH device_temp_series
AS (
SELECT CREATE_TIME_SERIES(time, temperature) AS temp_series
,device_id
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature < 999
GROUP BY device_id
ORDER BY device_id
)
SELECT device_id
,ts.time
,ts.value
FROM device_temp_series
CROSS JOIN UNNEST(temp_series) AS ts
ORDER BY device_id
,ts.time ASC
この SQL を実行すると、以下の結果が得られます。
device_id | time | value |
real1 | 2021-01-01 09:01:00.000000000 | 102 |
real1 | 2021-01-01 09:13:00.000000000 | 103 |
real1 | 2021-01-01 09:31:00.000000000 | 107 |
real1 | 2021-01-01 09:44:00.000000000 | 106 |
real1 | 2021-01-01 09:58:00.000000000 | 105 |
real2 | 2021-01-01 09:00:00.000000000 | 104 |
real2 | 2021-01-01 09:12:00.000000000 | 105 |
real2 | 2021-01-01 09:22:00.000000000 | 109 |
real2 | 2021-01-01 09:33:00.000000000 | 109 |
CREATE_TIME_SERIES
、及び、UNNEST
関数の実例については、以下の補間の説明を確認して下さい。
よくあるクエリーパターン
このセクションでは、以下の 7 つのアクセスパターンを取り上げ、その詳細を解説します。全てのクエリは独立している為、全てを見る必要は無く、皆さんの興味のあるパターンのみ確認すれば十分です。
1.最後の値
2.補間
3.複数のイベントの相互関係 (N-event correlation)
4.特定の期間毎に集計する (running aggregates)
5.導関数と変化率
6.高いカーディナリティの N 個の時系列の取得
7.ヒストグラムの作成
1. 最後の値
時系列のワークロードにおいて、最新のレコードやメジャー値を探す事をまずは考えてみましょう。例えば、センサーの測定値の最新の値、状態、またはアセットの位置等を確認する場合が考えられます。
この例では、現時点で動作していないデバイスを見つけようと思います。利用するデータセットには、それぞれのデバイスの状態が state
というメジャー値に格納されており (メジャー名は multi
) 、クエリでこのカラムを参照しますが、2つの記述方法を比較してみたいと思います。1 つは共通テーブル式 (CTE) を利用する方法で、もう 1 つは max_by()
関数を利用する方法です。まず、CTE を使って最新の値を取る事を考えてみます。
WITH latest_ts
AS (
SELECT device_id
,max(time) AS latest_timestamp
FROM "Blog"."DeviceMetadata"
WHERE measure_name = 'multi'
AND time > ago(365d)
GROUP BY device_id
,measure_name
)
SELECT devmd.device_id, lts.latest_timestamp, devmd.state
FROM "Blog"."DeviceMetadata" devmd
JOIN latest_ts AS lts ON lts.device_id = devmd.device_id
AND lts.latest_timestamp = devmd.time
WHERE devmd.measure_name = 'multi'
AND devmd.state <> 'IN_SERVICE'
AND devmd.time > ago(365d)
このクエリでは、まず過去 365 日でそれぞれのデバイスの状態に関連した最新のタイムスタンプを CTE を利用して latest_ts
として取得しています。デバイスは全部で 14 個あり、過去 365 日間で全デバイスで最新の状態が取れると考えられる為、CTE は以下の通り 14 行を結果として返します。
device_id | latest_timestamp |
smarty9 | 2021-01-01 10:00:00.000000000 |
coldy2 | 2021-01-01 10:20:00.000000000 |
smarty7 | 2021-01-01 10:00:00.000000000 |
coldy1 | 2021-01-01 10:00:00.000000000 |
smarty2 | 2021-01-01 10:00:00.000000000 |
smarty0 | 2021-01-01 10:00:00.000000000 |
smarty6 | 2021-01-01 10:00:00.000000000 |
smarty4 | 2021-01-01 10:00:00.000000000 |
smarty8 | 2021-01-01 10:00:00.000000000 |
smarty5 | 2021-01-01 10:00:00.000000000 |
real2 | 2021-01-01 10:00:00.000000000 |
smarty3 | 2021-01-01 10:00:00.000000000 |
smarty1 | 2021-01-01 10:00:00.000000000 |
real1 | 2021-01-01 09:58:00.000000000 |
この時点で、各デバイスの最新状態のタイムスタンプが取得出来ました。次にメインのテーブルと CVE で定義した latest_ts
とを device_id
、タイムスタンプで結合し、実際のステータスを確認します。ここでは、 最新の状態で IN_SERVICE
となっていないデバイスのみ取るようにフィルターをかけています。用意したデータセットからは、この検索の結果、2 つのデバイスの情報が結果として取得できました。もしも単純に全てのデバイスの最新の状態が取りたい場合は、WHERE 句を省略して 14 行 (全てのデバイス) を取得する事も出来ます。
device_id | latest_timestamp | state |
coldy2 | 2021-01-01 10:20:00.000000000 | AUTO_SHUTDOWN |
real2 | 2021-01-01 10:00:00.000000000 | REPLACE_BATTERY |
Timestream のコンソール画面で結果タブを参照すると、このクエリは 8.17 KB をスキャンした事が分かります。
さて、次にもう 1 つのやり方として、CTE を使わず max_by()
関数を使う方法を見てみましょう。
SELECT device_id
,max_by(time, time) AS latest_timestamp
,max_by(state, time) AS state
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
GROUP BY device_id
HAVING max_by(state, time) <> 'IN_SERVICE'
このクエリでは前述の CTE を使ったクエリと同じ結果を得る事ができます。ここでは、max_by(x , y)
関数を使っており、y
が最大値の場合の、関連する項目である x
の値をとる事ができます。この例では、時刻が最大となる場合のメジャー値を取得しています。また、最新のステータスが IN_SERVICE
でないデバイスのみを取得する為、HAVING
句を利用してフィルターをかけています。
この2つのクエリを比較すると、2 番目のクエリの方がより単純です。また、スキャンしたバイト数についても2番目のクエリの方が小さい為、コストについても 2 番目のクエリが安くなりえる事が分かります (但し、Timestream においては、10 MB 以下のスキャン量については全て 10 MB に切り上げて計算される事に注意して下さい)
尚、max_by()
関数は、例えば max_by(temperature , time , 5)
のように記述すると、5 個の最新の temperature の値を取る事も出来ます。
2. 補間
IoT のユースケースでは、デバイスからのデータが一部欠落していたり、データ取得の間隔がずれたり、現実的では無い値が取れたりする事がよくあります。以下の例で real1
, real2
というデバイスの気温センサーの測定値を見ていきましょう。
SELECT time
,device_id
,temperature AS temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature IS NOT NULL
ORDER BY device_id
,time ASC
この例では、センサー real1
の 9:20 AM 近辺のデータが無く、タイムスタンプの間隔についても偏りがあります。また、センサー real2
では 9:44 AM のデータが非現実的な値となっています。
time | device_id | temp |
2021-01-01 09:01:00.000000000 | real1 | 102 |
2021-01-01 09:13:00.000000000 | real1 | 103 |
2021-01-01 09:31:00.000000000 | real1 | 107 |
2021-01-01 09:44:00.000000000 | real1 | 106 |
2021-01-01 09:58:00.000000000 | real1 | 105 |
2021-01-01 09:00:00.000000000 | real2 | 104 |
2021-01-01 09:12:00.000000000 | real2 | 105 |
2021-01-01 09:22:00.000000000 | real2 | 109 |
2021-01-01 09:33:00.000000000 | real2 | 109 |
2021-01-01 09:44:00.000000000 | real2 | 999 |
非現実的な値は temperature < 999
という条件を入れて取り除いてみましょう。
SELECT time
,device_id
,temperature AS temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature IS NOT NULL
AND temperature < 999
ORDER BY device_id
,time ASC
欠落しているデータを補って、10 分間隔のデータポイントの見栄えの良いグラフを得るにはどうすれば良いのでしょう? その為には、INTERPOLATE_LINEAR 関数を使い、欠落しているデータを線形補間するようにします。
SELECT device_id
,INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(time, temperature),
SEQUENCE(min(time), max(time), 10m)
) AS interpolated_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature IS NOT NULL
AND temperature < 999
GROUP BY device_id
ORDER BY device_id
INTERPOLATE_LINEAR
関数は、時系列 (第 1 引数) と、タイムスタンプの配列 (第 2 引数) を入力とする事で、時系列のデータ欠損をタイムスタンプの配列に沿って補完していきます。この関数は、時系列のオブジェクトを結果として返す為、上述のクエリについても以下のように device_id
と時系列データを持つ複数行を返します。
device_id | interpolated_temp |
real1 | [ { time: 2021-01-01 09:01:00.000000000, value: 102 }, { time: 2021-01-01 09:11:00.000000000, value: 102 }, … ] |
real2 | [ { time: 2021-01-01 09:00:00.000000000, value: 104 }, { time: 2021-01-01 09:10:00.000000000, value: 104 }, … ] |
尚、デバイス real2
の時系列データは表データとすると以下のようにも表せます。
time | value |
2021-01-01 09:00:00.000000000 | 104 |
2021-01-01 09:10:00.000000000 | 104 |
2021-01-01 09:20:00.000000000 | 108 |
2021-01-01 09:30:00.000000000 | 109 |
尚、SEQUENCE
関数では結果配列のサイズ制限 (10,000 個まで) がありますので、その制限を超過しないようにする為に、集約した (この例で言うと平均) 気温で補完する必要があります。
WITH data
AS (
SELECT BIN(time, 15s) as t
,device_id
,avg(temperature) as avg_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature IS NOT NULL
AND temperature < 999
GROUP BY BIN(time, 15s), device_id
)
SELECT device_id
,INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(t, avg_temp),
SEQUENCE(min(t), max(t), 10m)
) AS interpolated_temp
FROM data
GROUP BY device_id
ORDER BY device_id
もしも、元々の型でデータを返したい場合は、以下のように UNNEST
関数を使い、結果をフラットなテーブルに変換して、補間値を丸めます。
WITH data
AS (
SELECT BIN(time, 15s) as t
,device_id
,avg(temperature) as avg_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND city = 'Zion'
AND device_id IN ('real1', 'real2')
AND measure_name = 'multi'
AND temperature IS NOT NULL
AND temperature < 999
GROUP BY BIN(time, 15s), device_id
)
,interpolated_data
AS (
SELECT device_id
,INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(t, avg_temp),
SEQUENCE(min(t), max(t), 10m)
) AS interpolated_temp
FROM data
GROUP BY device_id
)
SELECT device_id
,time
,round(value, 1) as interpolated_rounded_temp
FROM interpolated_data
CROSS JOIN UNNEST(interpolated_temp)
この結果は欠損値を補完しつつ、全てのデータは 10 分間隔で表示されています。
device_id | time | interpolated_rounded_temp |
real1 | 2021-01-01 09:01:00.000000000 | 102.0 |
real1 | 2021-01-01 09:11:00.000000000 | 102.8 |
real1 | 2021-01-01 09:21:00.000000000 | 104.8 |
real1 | 2021-01-01 09:31:00.000000000 | 107.0 |
real1 | 2021-01-01 09:41:00.000000000 | 106.2 |
real1 | 2021-01-01 09:51:00.000000000 | 105.5 |
real2 | 2021-01-01 09:00:00.000000000 | 104.0 |
real2 | 2021-01-01 09:10:00.000000000 | 104.8 |
real2 | 2021-01-01 09:20:00.000000000 | 108.2 |
real2 | 2021-01-01 09:30:00.000000000 | 109.0 |
3. 複数のイベントの相互関係 (N-event correlation)
この例では、オーバーヒートしたデバイスがどの位の時間で冷却され通常の値に戻るかを測定していきます。冷却にかかる時間は OVERHEATING
から IN_SERVICE
の状態に戻るまでの時間で定義するものとします。coldy2
のセンサーに注目してみましょう。
SELECT time
,device_id
,state
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
ORDER BY time ASC
上記 SQL で以下の結果を得る事が出来ます。
time | device_id | state |
2021-01-01 09:00:00.000000000 | coldy2 | OVERHEATING |
2021-01-01 09:10:00.000000000 | coldy2 | IN_SERVICE |
2021-01-01 09:20:00.000000000 | coldy2 | OVERHEATING |
2021-01-01 09:30:00.000000000 | coldy2 | IN_SERVICE |
2021-01-01 09:40:00.000000000 | coldy2 | OVERHEATING |
2021-01-01 09:50:00.000000000 | coldy2 | OVERHEATING |
2021-01-01 10:00:00.000000000 | coldy2 | IN_SERVICE |
2021-01-01 10:10:00.000000000 | coldy2 | OVERHEATING |
2021-01-01 10:20:00.000000000 | coldy2 | AUTO_SHUTDOWN |
9:40 と 9:50 でデバイスは 2 度 OVERHEATING
の状態となっている事がわかります。状態の変化に注目する必要がある為、連続した同じ値をまずは省いてみましょう。この為に、現在の行から前の指定されたオフセットの行を返す LAG ウィンドウ関数を使います。
SELECT BIN(time, 10m) AS binnedTime
,device_id
,MIN(state) AS state
,LAG(MIN(state), 1) OVER (
PARTITION BY device_id ORDER BY BIN(time, 10m)
) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
GROUP BY device_id
,BIN(time, 10m)
以下の結果が得られました。
binnedTime | device_id | state | prevState |
2021-01-01 09:00:00.000000000 | coldy2 | OVERHEATING | – |
2021-01-01 09:10:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 09:20:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 09:30:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 09:40:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 09:50:00.000000000 | coldy2 | OVERHEATING | OVERHEATING |
2021-01-01 10:00:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 10:10:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 10:20:00.000000000 | coldy2 | AUTO_SHUTDOWN | OVERHEATING |
この結果では同じ行にその時刻の状態と、1 つ前の状態が表示されていますが、以下のようにフィルタを追加する事で重複した行を削除する事が出来そうです。
WITH binnedData
AS (
SELECT BIN(time, 10m) AS binnedTime
,device_id
,MIN(state) AS state
,LAG(MIN(state), 1) OVER (
PARTITION BY device_id ORDER BY BIN(time, 10m)
) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
GROUP BY device_id
,BIN(time, 10m)
)
SELECT *
FROM binnedData
WHERE (
state != prevState
OR prevState IS NULL
)
ORDER BY binnedTime
以下の結果を得る事が出来ました。
binnedTime | device_id | state | prevState |
2021-01-01 09:00:00.000000000 | coldy2 | OVERHEATING | – |
2021-01-01 09:10:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 09:20:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 09:30:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 09:40:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 10:00:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING |
2021-01-01 10:10:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE |
2021-01-01 10:20:00.000000000 | coldy2 | AUTO_SHUTDOWN | OVERHEATING |
ステータスが変化するまでの時間を測定する為には、1 つ前の状態の時刻が必要となります。ここで再度、binnedTime
列に対して LAG
関数を使います。
WITH binnedData
AS (
SELECT BIN(time, 10m) AS binnedTime
,device_id
,MIN(state) AS state
,LAG(MIN(state), 1) OVER (
PARTITION BY device_id ORDER BY BIN(time, 10m)
) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
GROUP BY device_id
,BIN(time, 10m)
)
,cleaned_data
AS (
SELECT binnedTime
,device_id
,state
,prevState
FROM binnedData
WHERE (
state != prevState
OR prevState IS NULL
)
ORDER BY binnedTime
)
SELECT binnedTime
,device_id
,state
,prevState
,LAG(binnedTime, 1) OVER (
PARTITION BY device_id ORDER BY binnedTime
) AS prevBinnedTime
FROM cleaned_data
結果として、1 つ前の状態とその時刻が同じ行で表示されました。
binnedTime | device_id | state | prevState | prevBinnedTime |
2021-01-01 09:00:00.000000000 | coldy2 | OVERHEATING | – | – |
2021-01-01 09:10:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING | 2021-01-01 09:00:00.000000000 |
2021-01-01 09:20:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE | 2021-01-01 09:10:00.000000000 |
2021-01-01 09:30:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING | 2021-01-01 09:20:00.000000000 |
2021-01-01 09:40:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE | 2021-01-01 09:30:00.000000000 |
2021-01-01 10:00:00.000000000 | coldy2 | IN_SERVICE | OVERHEATING | 2021-01-01 09:40:00.000000000 |
2021-01-01 10:10:00.000000000 | coldy2 | OVERHEATING | IN_SERVICE | 2021-01-01 10:00:00.000000000 |
2021-01-01 10:20:00.000000000 | coldy2 | AUTO_SHUTDOWN | OVERHEATING | 2021-01-01 10:10:00.000000000 |
このデータを使って、クーリングオフの時間 (OVERHEATING
の状態からIN_SERVICE
の状態に移るまでの時間) を計算する事ができます。
WITH binnedData
AS (
SELECT BIN(time, 10m) AS binnedTime
,device_id
,MIN(state) AS state
,LAG(MIN(state), 1) OVER (
PARTITION BY device_id ORDER BY BIN(time, 10m)
) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
GROUP BY device_id
,BIN(time, 10m)
)
,cleaned_data
AS (
SELECT binnedTime
,device_id
,state
,prevState
,
--windowing function executes after where, so we can use it in this query
LAG(binnedTime, 1) OVER (
PARTITION BY device_id ORDER BY binnedTime
) AS prevBinnedTime
FROM binnedData
WHERE (
state != prevState
OR prevState IS NULL
)
ORDER BY binnedTime
)
SELECT binnedTime
,binnedTime - prevBinnedTime AS recovery_time
,device_id
FROM cleaned_data
WHERE state = 'IN_SERVICE'
AND prevState = 'OVERHEATING'
無事、以下の結果を得る事が出来ました。
binnedTime | recovery_time | device_id |
2021-01-01 09:10:00.000000000 | 0 00:10:00.000000000 | coldy2 |
2021-01-01 09:30:00.000000000 | 0 00:10:00.000000000 | coldy2 |
2021-01-01 10:00:00.000000000 | 0 00:20:00.000000000 | coldy2 |
この手法は様々なイベントに対して応用可能です。例えば、以下のクエリでは、IN_SERVICE
から OVERHEATING
となり、その後、AUTO_SHUTDOWN
となったパターンを検索する事ができます。
WITH binnedData
AS (
SELECT BIN(time, 10m) AS binnedTime
,device_id
,MIN(state) AS state
,LAG(MIN(state), 1) OVER (
PARTITION BY device_id ORDER BY BIN(time, 10m)
) AS prevState
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'coldy2'
AND measure_name = 'multi'
GROUP BY device_id
,BIN(time, 10m)
)
,cleaned_data
AS (
SELECT binnedTime
,device_id
,state
,prevState
,
--windowing function executes after where, so we can use it in this query
LAG(binnedTime, 1) OVER (
PARTITION BY device_id ORDER BY binnedTime
) AS prevBinnedTime
,LAG(state, 2) OVER (
PARTITION BY device_id ORDER BY binnedTime
) AS prevPrevState
FROM binnedData
WHERE (
state != prevState
OR prevState IS NULL
)
ORDER BY binnedTime
)
SELECT binnedTime
,device_id
,state
,prevState
,prevPrevState
FROM cleaned_data
WHERE prevPrevState = 'IN_SERVICE'
AND prevState = 'OVERHEATING'
AND state = 'AUTO_SHUTDOWN'
以下の結果を得る事が出来ました。
binnedTime | device_id | state | prevState | prevPrevState |
2021-01-01 10:20:00.000000000 | coldy2 | AUTO_SHUTDOWN | OVERHEATING | IN_SERVICE |
4. 特定の期間毎に集計する (running aggregates)
この例では、デバイスの状態をよりよくモニターして理解する為に、積算 (移動総計) を使います。30 分間でデバイスが何度 IN_SERVICE
で無い状態であったかを把握する為、以下のようなクエリを実行してみます。
WITH binned_time
AS (
SELECT device_id
,bin(time, 30m) AS time_bucket
,count(*) AS event_count_in_the_bucket
FROM "Blog"."DeviceMetadata"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND state IS NOT NULL
AND state <> 'IN_SERVICE'
GROUP BY device_id
,bin(time, 30m)
)
SELECT device_id
,time_bucket
,event_count_in_the_bucket
,sum(event_count_in_the_bucket) OVER (
PARTITION BY device_id ORDER BY time_bucket range BETWEEN unbounded preceding AND CURRENT row
) AS cumulative_event_count
FROM binned_time
ここでは共通テーブル式 (CTE) を使って binned_time
というテーブルを作っていますが、フィルター条件として、state
というメジャー値が IN_SERVICE
ではないものにしています。また、bin(time , 30m)
、count()
を使いつつ、device_id
と 30 分間隔でグループ化する事で、30 分毎の条件のイベントの積算値を算出しています。
CTE で指定したテーブルを得たら、次は SUM()
や、OVER()
のウィンドウ関数を使い、イベントの積算を算出します。ウィンドウ関数では PARTITION BY
、ORDER BY
句を含む OVER
句が必要となります。PARTITION BY
句はパーティション毎に計算をリセットし、また ORDER BY
句により計算結果を算出する際のレコード順を指定出来ます。
この例では、デバイス毎の 30 分間隔の積算値を知りたいので、PARTITION BY
句として device_id
を指定し、ORDER BY
句としては、bin(time , 30m)
を指定しています。また RANGE
句を指定する事で、ウィンドウ関数が合計値 (sum) を算出する範囲をその前の全イベントから現在の行までと指定しています (尚、全てのデータポイントで、積算は過去のデータ範囲のイベントの合計と、現在のイベントの件数の合算値となります)
このクエリの結果は以下を返します。
device_id | time_bucket | event_count_in_the_bucket | cumulative_event_count |
coldy2 | 2021-01-01 17:00:00.000000000 | 2 | 2 |
coldy2 | 2021-01-01 17:30:00.000000000 | 2 | 4 |
coldy2 | 2021-01-01 18:00:00.000000000 | 2 | 6 |
coldy1 | 2021-01-01 17:00:00.000000000 | 1 | 1 |
coldy1 | 2021-01-01 17:30:00.000000000 | 2 | 3 |
real2 | 2021-01-01 17:30:00.000000000 | 2 | 2 |
real2 | 2021-01-01 18:00:00.000000000 | 1 | 3 |
尚、積算を使う場合には、単純な値の変化を見るより、変化率を確認する方が簡単な場合があります。変化率を使うと、積算や平均のアノマリや急激な増加、減少を識別する事ができます。この事は次のセクションで議論する事にします。
5. 導関数と変化率
この例では、10 分間隔での気温 (単位は華氏) の変化を見ていきましょう。センサーがオフラインとなっている場合でもメジャー値の欠損を考慮し、10 分間隔での温度変化を取得出来るようなクエリを考えていきます。まずはデータを見てみましょう。
SELECT time
,device_id
,temperature
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'real1'
AND measure_name = 'multi'
AND temperature IS NOT NULL
ORDER BY time ASC
以下の結果には、9:20 AM 前後で欠損値がある事が分かります。
time | device_id | temperature |
2021-01-01 09:01:00.000000000 | real1 | 102 |
2021-01-01 09:13:00.000000000 | real1 | 103 |
2021-01-01 09:31:00.000000000 | real1 | 107 |
2021-01-01 09:44:00.000000000 | real1 | 106 |
2021-01-01 09:58:00.000000000 | real1 | 105 |
メジャー間の増分変化を計算するには、先の例で紹介した LAG 関数を使用する事もできますが、組み込みの DERIVATIVE_LINEAR 関数を使う事で、よりシンプルに変化率を計算できます。
DERIVATIVE_LINEAR
関数は時系列データとその間隔を引数として取り、指定された間隔で各ポイントの導関数を返します。導関数は微積分で使われる数学的な手法であり、値の変化率を計算するものです。以下の SQL はデバイス real1
の rateOfTempChange
列として時系列データを返します。
WITH binned_view
AS (
SELECT BIN(time, 10m) AS binnedTime
,MIN(temperature) AS minTempPerBucket
,device_id
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'real1'
AND measure_name = 'multi'
AND temperature IS NOT NULL
GROUP BY device_id
,BIN(time, 10m)
)
SELECT device_id
,derivative_linear(CREATE_TIME_SERIES(binnedTime, minTempPerBucket), 10m) AS rateOfTempChange
FROM binned_view
GROUP BY device_id
次の結果を得る事ができました。
device_id | rateOfTempChange |
real1 | [ { time: 2021-01-01 09:10:00.000000000, value: 1.0 }, { time: 2021-01-01 09:30:00.000000000, value: 2.0 }, … ] |
デバイス real1
の時系列は以下の表のように表す事ができます。
time | value |
2021-01-01 09:10:00.000000000 | 1.0 |
2021-01-01 09:30:00.000000000 | 2.0 |
2021-01-01 09:40:00.000000000 | -1.0 |
2021-01-01 09:50:00.000000000 | -1.0 |
9:30 AM の変化率は 2 度となっていますが、20 分の間で華氏 103 度から 107 度に変化している事から、10 分間隔では 2 度変化している事になり、計算結果が正しい事が分かります。もしも、元のデータ型として扱いたいなら、UNNIEST
関数を使って時系列データをフラットな表にします。
WITH binned_view
AS (
SELECT BIN(time, 10m) AS binnedTime
,MIN(temperature) AS minTempPerBucket
,device_id
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND device_id = 'real1'
AND measure_name = 'multi'
AND temperature IS NOT NULL
GROUP BY device_id
,BIN(time, 10m)
)
,interpolated
AS (
SELECT device_id
,derivative_linear(CREATE_TIME_SERIES(binnedTime, minTempPerBucket), 10m) AS rateOfTempChange
FROM binned_view
GROUP BY device_id
)
SELECT device_id
,time
,value
FROM interpolated
CROSS JOIN UNNEST(rateOfTempChange)
以下の結果を得る事ができました。
device_id | time | value |
real1 | 2021-01-01 09:10:00.000000000 | 1.0 |
real1 | 2021-01-01 09:30:00.000000000 | 2.0 |
real1 | 2021-01-01 09:40:00.000000000 | -1.0 |
real1 | 2021-01-01 09:50:00.000000000 | -1.0 |
6. 高いカーディナリティの N 個の時系列の取得
時間の経過に伴う都市の平均気温のグラフを表示したい場合を考えましょう。以下の SQL を使うと、10 分間隔でデバイスに記録される気温から平均気温を表示する事が出来ます。
SELECT BIN(time, 10m) AS binnedTime
,avg(temperature) AS avg_temp
,city
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND temperature IS NOT NULL
GROUP BY BIN(time, 10m)
,city
ORDER BY city
,BIN(time, 10m) ASC LIMIT 10
このクエリで以下の結果が得られます。
binnedTime | avg_temp | city |
2021-01-01 09:00:00.000000000 | 70.3 | Basin City |
2021-01-01 09:10:00.000000000 | 69.8 | Basin City |
2021-01-01 09:20:00.000000000 | 67.0 | Basin City |
2021-01-01 09:30:00.000000000 | 69.5 | Basin City |
2021-01-01 09:40:00.000000000 | 69.6 | Basin City |
2021-01-01 09:50:00.000000000 | 68.5 | Basin City |
2021-01-01 10:00:00.000000000 | 69.1 | Basin City |
2021-01-01 09:00:00.000000000 | 22.5 | Coruscant |
2021-01-01 09:10:00.000000000 | 23.5 | Coruscant |
2021-01-01 09:20:00.000000000 | 24.0 | Coruscant |
このデータセットには 3 つの都市しか含まれていませんが、実際のデータでは 10,000 以上の都市が含まれる事もあるでしょう。その場合、前述のクエリはタイムスタンプ毎に 10,000 以上のデータポイントを返す事になります。1 つのグラフにそのような膨大な数が含まれると非常に読みづらく現実的ではありません。もしも、特定の期間の平均気温で並び替えられた上位 N 個の最も熱い都市のグラフにのみ関心がある場合は、どのようにすれば良いでしょうか?このデータセットには 3 つの都市がある為、以下の SQL を実行して上位 2 つの最も熱い都市に限定してみましょう。
SELECT city
,avg(temperature) AS avg_temp
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND temperature IS NOT NULL
GROUP BY city
ORDER BY avg(temperature) DESC
,city LIMIT 2
以下の結果を得る事が出来ました。
city | avg_temp |
Zion | 194.9 |
Basin City | 69.11428571428571 |
上位 2 つの熱い都市が得られたので、副問い合わせを利用してこれら 2 つの都市の平均気温のみを取得するように、最初のクエリに制限をかけてみます。
SELECT BIN(time, 10m) AS binnedTime
,avg(temperature) AS avg_temp
,city
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND temperature IS NOT NULL
AND city IN (
SELECT city
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND temperature IS NOT NULL
GROUP BY city
ORDER BY avg(temperature) DESC
,city LIMIT 2
)
GROUP BY BIN(time, 10m)
,city
ORDER BY city
,BIN(time, 10m) ASC
結果として、最も熱い都市の平均気温が 10 分間隔でまとめて表示されます。3 番目の都市の気温は表示されません。
binnedTime | avg_temp | city |
2021-01-01 09:00:00.000000000 | 70.3 | Basin City |
2021-01-01 09:10:00.000000000 | 69.8 | Basin City |
2021-01-01 09:20:00.000000000 | 67.0 | Basin City |
2021-01-01 09:30:00.000000000 | 69.5 | Basin City |
2021-01-01 09:40:00.000000000 | 69.6 | Basin City |
2021-01-01 09:50:00.000000000 | 68.5 | Basin City |
2021-01-01 10:00:00.000000000 | 69.1 | Basin City |
2021-01-01 09:00:00.000000000 | 103.0 | Zion |
2021-01-01 09:10:00.000000000 | 104.0 | Zion |
2021-01-01 09:20:00.000000000 | 109.0 | Zion |
2021-01-01 09:30:00.000000000 | 108.0 | Zion |
2021-01-01 09:40:00.000000000 | 552.5 | Zion |
2021-01-01 09:50:00.000000000 | 105.0 | Zion |
7. ヒストグラムの作成
別の一般的なクエリパターンとして、メジャーやディメンジョンの値をバケット化し、そのバケット内でデータ分析を行うような場合が考えられます。ここでは、各都市のデバイスに記録される気温の分布を理解し、ヒストグラムを作成してみましょう。まずは、全ての都市で 5 度単位で測定値をまとめて (10~15、15~20、20~25等) 、測定値の数を数えてみます。
WITH buckets
AS (
SELECT time
,city
,device_id
,temperature
,width_bucket(temperature, 0, 110, 22) bucket_id
FROM "Blog"."SensorReadings"
WHERE time BETWEEN ago(365d) AND now()
AND measure_name = 'multi'
AND temperature BETWEEN 0 AND 110
)
SELECT city
,bucket_id AS temp_bucket_id
,cast((bucket_id - 1) * 5 AS VARCHAR) || '-' || cast(bucket_id * 5 AS VARCHAR) AS temp_bucket
,count(*) AS temperature_readings
,round(avg(temperature), 2) AS avg_temperature
,round(approx_percentile(temperature, 0.5), 2) AS p50_temperature
,round(approx_percentile(temperature, 0.9), 2) AS p90_temperature
FROM buckets
GROUP BY city
,bucket_id
,cast((bucket_id - 1) * 5 AS VARCHAR) || '-' || cast(bucket_id * 5 AS VARCHAR)
ORDER BY city
,bucket_id
ここでは WIDTH_BUCKET
関数を利用しますが、利用方法としては 2 種類あります。1 つ目は WIDTH_BUCKET(x , bound1 , bound2 , n)
と指定するものです。これは、bound1
と bound2
の境界内で n 個の等間隔のヒストグラムでバケットを作成した際の x が所属するバケット番号を返します。別の利用法としては WIDTH_BUCKET(x , bins)
とするもので、配列 bins
で指定された特定のまとまりで x
が所属するバケット番号を返します。ここでは、5 度毎のまとまりでヒストグラムを作りたい為、最初の利用法を使ってみましょう。
初めに共通テーブル式を使って buckets
というテーブルを定義し、各気温測定値のバケットの番号を取得します。次に、buckets
に対してクエリを実行し、各都市の気温測定値の個数、平均気温、50 パーセンタイル、90 パーセンタイル等を取得します。また、見やすいようにバケット番号のテキストラベルも付与します。
city | temp_bucket_id | temp_bucket | temperature_readings | avg_temperature | p50_temperature | p90_temperature |
Basin City | 14 | 65-70 | 38 | 66.76 | 67 | 69 |
Basin City | 15 | 70-75 | 31 | 71.81 | 71 | 74 |
Basin City | 16 | 75-80 | 1 | 75.0 | 75 | 75 |
Coruscant | 5 | 20-25 | 14 | 22.93 | 23 | 24 |
Coruscant | 6 | 25-30 | 2 | 25.0 | 25 | 25 |
Zion | 21 | 100-105 | 3 | 103.0 | 103 | 104 |
Zion | 22 | 105-110 | 6 | 106.83 | 107 | 109 |
結果を見ると、各都市の気温がどのように分布しているかや、その範囲が分かります ( Basin City は 65~80、Coruscant は 20~30、Zion は 100~110 )
クリーンアップ
今後、利用料が発生しないように、作成した Timestream テーブル (SensorReading
、DeviceMetadata
) については忘れずに削除しておきましょう。
まとめ
Amazon Timestream は使用量に応じて課金が生じるサーバーレスの時系列専用のデータベースであり、パフォーマンス、コストに優れたクエリを実行する事ができます。また、時系列データに最適化されており、組み込みの時系列関数を利用して、クエリの複雑さを軽減する事が可能です。この投稿では、Timestream のワークロードでいくつかの一般的なアクセスパターンを取り上げ、効果的にクエリを実装する方法を学びました。各クエリを順に解説し、クエリの各部分がどのように動作してデータを処理するのかについても説明しました。また、Timestream で利用可能な組み込みの時系列関数についてもいくつか取り上げました。これらの関数はクエリのコスト、パフォーマンスを改善するのに有用な為、是非、積極的にご活用下さい。
翻訳はテクニカルアカウントマネージャーの西原が担当しました。原文は こちら です。