Amazon Web Services ブログ
Amazon Timestream の UNLOAD 機能の紹介:時系列データをエクスポートして洞察を得る方法
Amazon Timestream はスケーラブルなサーバレスの時系列データベースサービスで、1 日に何兆ものイベントを簡単に保存、分析する事が出来ます。様々な業界のお客様が Timestream を採用しており、リアルタイムの洞察を導き出したり、重要なビジネスアプリケーションを監視したり、Web サイトやアプリケーション全体に渡る何百万ものリアルタイムイベントの分析に利用しています。また Timestream はワークロードに応じて自動的にスケールする為、インフラストラクチャを管理する作業無でソリューションを容易に構築できます。
多くの Timestream をご利用中のお客様は、時系列データをデータレイクに追加したり、予測用の機械学習モデルのトレーニングに利用したり、他の AWS サービスやサードパーティのサービスを使ってデータの強化を行う等、時系列データから追加の価値を引き出そうと考えていましたが、個別に実装する必要があり、時間のかかる作業となっていました。これらのニーズに対応する為、Timestream で UNLOAD 機能が利用可能になりました。この機能を使う事で、安全でコスト効率よく、AI/ML パイプラインを構築したり、ETL プロセスを簡素化することが出来ます。
本投稿では UNLOAD を利用して Timestream から Amazon Simple Storage Service (Amazon S3) に時系列データをエクスポートする方法を紹介します。
気象の傾向を予測しようとする研究者、患者の健康状態を監視する医療従事者、生産を監督するエンジニア、業務の最適化を行うサプライチェーンのマネージャ、売上を追跡する e コマースのマネージャのいずれであっても UNLOAD 機能を利用して追加の価値を時系列データから引き出す事が出来ます。
UNLOAD が役立つユースケースの例を以下に示します。
- ヘルスケア分析:医療機関は一定期間に渡って患者の健康指標を監視し、大量の時系列データを生成します。Timestream を使う事で、患者の健康状態をリアルタイムに追跡、監視する事ができます。時系列データをデータレイクに取り込み、データの強化を行った上で更に分析を進めて結果を予測し、患者に対するケアを改善する事ができます。
- サプライチェーン分析:サプライチェーンのアナリストは Timestream を利用して在庫レベル、納期、遅延等の指標を追跡し、サプライチェーン全体を最適化する事ができます。
- eコマース分析:eコマースの管理者は Timestream を使ってトラフィックのソース情報、クリックスルー率、販売数量等の e コマースストアと Web サイトの指標を追跡できます。
UNLOAD 機能の概要
Timestream の UNLOAD 機能を使うと安全かつコスト効率の高い方法でクエリ結果を Amazon S3 にエクスポートする事ができます。出力フォーマットは Apache Parquet または CSV 形式が選択可能で、Amazon Athena、Amazon EMR、Amazon SageMaker 等の他サービスを使って時系列データを柔軟に分析する事ができます。また、UNLOAD の際に出力データを Amazon S3 管理キー、または AWS KMS 管理キーを使用して暗号化し圧縮する事が可能で、不正なデータアクセスを防止しつつ、ストレージコストの削減につなげる事ができます。更に、エクスポートされたデータを一つ以上のキーを指定して出力データをパーティション分割し、ダウンストリームのサービスがスキャンするデータ量を絞り込む事が出来る為、処理時間とコストを削減する事が出来ます。
UNLOAD の構文は次の通りです。
UNLOAD はマネージメントコンソール、AWS CLI、AWS SDK から実行可能です。
ソリューションの概要
本投稿では、Timestream から Amazon S3 にデータを抽出して洞察を得る手順とベストプラクティスについて説明します。これには以下の手順が含まれます。
- サンプルデータを Timestream に取り込む
- データ分析の実施
- UNLOAD を使ってクエリ結果セットを Amazon S3 に出力
- AWS Glue カタログテーブルの作成
- Athena を使って追加のビジネス視点での洞察を得る
次の図は本ソリューションのアーキテクチャを示します。
お使いの AWS アカウントで以下のソリューションを再現する場合、AWS リソースのコストが発生する点はご注意下さい。
Timestream を使用して e コマース Web サイトからメトリクスを追跡するユースケースを実証していきます。ここでは、製品が販売される度に、製品 ID、販売数量、顧客を Web サイトに誘導したチャネル (ソーシャルメディアや検索サイト等)、取引のタイムスタンプ、その他関連する詳細を含む販売情報が Timestream に書き込まれます。Timestream に取り込むサンプルデータは Faker を使って作成しており、デモ用にクリーンアップされています。
データには channel, ip_address, session_id, user_id,event, user_group, current_time, query, product_id, product と quantity が含まれており、商品が購入されると、product_id とquantity が記録されます。データを Timestream に取り込む際には以下のデータモデルを採用しました。
- ディメンジョン – channel,ip_address,session_id,user_id,event,user_groupを使います。ディメンジョンの詳細についてはこちらを参照して下さい。
- タイムスタンプ –  current_timeを使います。尚、サンプルの時間が古い可能性がある点は注意して下さい。本投稿で提供されているサンプルコードは取り込み中に最新のタイムスタンプに変更します。
- マルチメジャー –   query,product_id,product,quantityを使います。マルチメジャーの詳細についてはこちらをご確認下さい。
前提条件
本投稿の手順を進めるには以下の前提条件を満たす必要があります
- データベースとテーブルを作成するには、CRUD 操作を許可する権限が必要です。
- レコード挿入を行う為には、挿入操作を許可する権限が必要です。
- UNLOAD を実行するには Amazon S3 にデータを書き込む為の前提条件を満たす必要があります。
- コードを実行する前に適切な AWS アカウント認証情報を環境変数としてエクスポートします。
Timestream にデータを取り込む
本投稿のサンプルコードを使用して、データベースとテーブルを作成し、e コマース Web サイトの販売データを Timestream に取り込む事が出来ます。次の手順を実行して下さい。
1. Jupyter ノートブックや選択した統合開発環境 (IDE) をセットアップします。次のコードは説明の為に複数の部分に分割されており、Python バージョン 3.9 を使用しています。同一のコードを使う場合は、コードブロックを 1 つのプログラムに結合するか、Jupyter ノートブックを使ってサンプルに従って下さい。
2. Timestream クライアントを初期化します。
import boto3
from botocore.config import Config
session = boto3.Session()
write_client = session.client('timestream-write', config=Config(region_name="<region>", read_timeout=20, max_pool_connections = 5000, retries={'max_attempts': 10}))
query_client = session.client('timestream-query', config=Config(region_name="<region>"))3. データベースを作成します。
database_name = "timestream_sample_database"
write_client.create_database(DatabaseName=database_name)
print("Database [%s] created successfully." % database_name)データベースを作成すると、コンソール画面に表示されます。
4. テーブルを作成します。
table_name = "timestream_sample_unload_table"
retention_properties = {
'MemoryStoreRetentionPeriodInHours': 12,
'MagneticStoreRetentionPeriodInDays': 7
}
write_client.create_table(DatabaseName=database_name, TableName=table_name,
RetentionProperties=retention_properties)
print("Table [%s] successfully created." % table_name)テーブルがコンソール画面に表示されるようになりました。
5. サンプルデータをテーブルに取り込みます。
def __submit_batch(records, n):
    try:
        result = write_client.write_records(DatabaseName=database_name, TableName=table_name,
                                                 Records=records, CommonAttributes={})
        if result and result['ResponseMetadata']:
            print("Processed [%d] records. WriteRecords Status: [%s]" % (n, result['ResponseMetadata']['HTTPStatusCode']))
    except Exception as err:
        print("Error:", err)
        
import csv
import time
with open("Downloads/sample_unload.csv", 'r') as csvfile:
            csvreader = csv.reader(csvfile)
            records = []
            current_time = str(int(round(time.time() * 1000)))
            header_row = []
            # extracting csv file content
            row_counter = 0
            for i, row in enumerate(csvreader, 1):
                if(len(row) == 0 ):
                    continue;
                # skip csv header
                if(i == 1 ):
                    header_row = row
                    continue
                row_counter = row_counter + 1
                record = {
                    'Dimensions': [
                        {'Name': header_row[0], 'Value': row[0]},
                        {'Name': header_row[1], 'Value': row[1]},
                        {'Name': header_row[2], 'Value': row[2]},
                        {'Name': header_row[3], 'Value': row[3]},
                        {'Name': header_row[4], 'Value': row[4]},
                        {'Name': header_row[5], 'Value': row[5]},
                    ],
                    'Time': str(int(current_time) - (i * 50)) # Modifying time in the sample to current time
                }
                measure_values = []
                if (row[7] != ""):
                    measure_values.append( {
                        "Name": header_row[7],
                        "Value": row[7],
                        "Type": 'VARCHAR',
                    })
                if (row[8] != ""):
                    measure_values.append( {
                        "Name": header_row[8],
                        "Value": row[8],
                        "Type": 'VARCHAR',
                    })
                if (row[9] != ""):
                    measure_values.append( {
                        "Name": header_row[9],
                        "Value": row[9],
                        "Type": 'VARCHAR',
                    })
                if (row[10] != ""):
                    measure_values.append( {
                        "Name": header_row[10],
                        "Value": row[10],
                        "Type": 'DOUBLE',
                    })
                record.update(
                    {
                        'MeasureName': "metrics",
                        'MeasureValueType': "MULTI",
                        'MeasureValues': measure_values
                    }
                )
                records.append(record)
                if len(records) == 100:
                    __submit_batch(records, row_counter)
                    records = []
            if records:
                __submit_batch(records, row_counter)
            print("Ingested %d records" % row_counter) データの取り込み後、Timestream のクエリエディタを利用してテーブルの内容を確認する事が出来ます。
データ分析の実施
Timestream では取り込まれたデータに対してリアルタイム分析を実行できます。例えば、1 日の製品毎の販売数、過去 1 週間にソーシャルメディア広告から店舗に辿り着いた顧客数、売上の傾向、過去 1 時間の購入パターン等を確認する事が出来ます。
過去 24 時間に製品毎に販売されたユニット数を確認するには、以下のクエリを実行します。
SELECT sum(quantity) as number_of_units_sold, product 
FROM "timestream_sample_database"."timestream_sample_unload_table" 
WHERE time between ago(1d) and now() GROUP BY productデータを Amazon S3 にエクスポートする
UNLOAD 機能を使えば追加の分析の為の時系列データを Amazon S3 にエクスポートできます。この例では、Web サイトにアクセスしたチャネルに基づいて顧客の分析を行います。partition_by 句を使ってデータを分割し、チャネル固有のデータをフォルダにエクスポートできます。この例では Parquet 形式を使用してデータをエクスポートします。
UNLOAD(SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel 
       FROM "timestream_sample_database"."timestream_sample_unload_table"  WHERE time BETWEEN ago(2d) AND now()) 
       TO 's3://<your_bucket_name>/partition_by_channel' WITH (format = 'PARQUET', partitioned_by = ARRAY['channel'])partitioned_by 句で指定された列は SELECT ステートメントの最後の列と同じである必要があります。また、これらは SELECT ステートメントに出現するのと同じ順で ARRAY 値に入力する必要があります。
UNLOAD を使った前述のクエリを実行した後、Query results タブの Export to Amazon S3 summary セクションで詳細を確認できます。
Amazon S3 の results フォルダーを確認しますと、データがチャネル名ごとにパーティション化されている事が分かります。
AWS Glue データカタログテーブルの作成
AWS Glue クローラーを作成して S3 バケット内のデータをスキャンしてスキーマを推論し、Timestream からエクスポートされたデータのメタデータテーブルを AWS データカタログに作成します。AWS Glue に必要な権限がある事を前提として、本セクションでは以下 2 つのオプションを示します。
Option 1 : チャネル毎の AWS Glue メタデータファイルを個別に作成する
チャネルごとに異なる分析をする必要があり、partitioned_by 句を使用して時系列データをチャネルごとに分離した場合には、特定のチャネルの AWS Glue データカタログを生成出来ます。この例では Social Media チャネルのデータカタログを作成します。次の手順を実行します。
1. AWS Glue コンソールのナビゲーションペインで Crawlers を選択します
2. Create crawler を選択
3. 新しい S3 のデータソースを s3://<your_bucket_name>/partition_by_channel/results/channel=Social media に追加します
これはソーシャルメディアチャネルに関連する全ての時系列データが含まれる場所です。
4. 新しい Glue データベースを作成するか、必要に応じて既存のデータベースを使いましょう
5. 通常、AWS Glue は S3 フォルダ構造からテーブル名を類推しますが、必要に応じてオプションのテーブルプレフィックスを追加できます。今回は、テーブルの接頭辞を空にした為、最終的なテーブル名は channel_social_media となります。
6. データカタログ作成の為に 1 回だけクロールするだけなので、スケジュールは On Demand のままにしておきます。
7. 他の必須項目を入力し、Create crawler を選択します。
8. クローラーが作成された後で、対象のクローラーを選択して Run crawler を実行するとエクスポートされた Timestream のデータに基づいてデータカタログが作成されます。
実行が完了すると、実行履歴が表示され、1 テーブルの変更が確認出来ます。これは 1 つのテーブルがデータカタログに追加された事を示します。
AWS Glue の Tables ページに移動すると、クローラによって自動推論されたスキーマを含む新しいテーブル channel_social_media が表示されます。 
これで、このテーブルのデータを Athena で見る事が出来ます。
SELECT * From “AwsDataCatalog”.”timestream-sample”.”channel_social_media”;Option2 : AWS Glue メタストアによって自動的に検出されたパーティションを含む結果フォルダをクロールする
このオプションのクローラの作成は以前と同じ手順で行います。唯一の違いは、選択された S3 の場所が親フォルダーの results となる点です。
今回クローラが実行されると、テーブルが変更されて 5 つのパーティションを持つ 1 つのテーブルが作成された事が分かります。
テーブルスキーマでは、チャネルがパーティションキーとして自動推論された事が分かります。
パーティション分割された AWS Glue テーブルを作成すると、テーブルを結合せずにチャネル間でクエリを実行したり、チャネル毎にクエリを実行する事が簡単になります。詳しくは、こちらを参照して下さい。
Amazon Athena で洞察を引き出す
Timestream で追跡、分析する時系列データと、Timestream の外にある非時系列データを組み合わせて Athena 等のサービスを利用して、洞察に満ちた傾向を引き出す事ができます。
この例では、公開された user_id, first_name, last_name, zip, job, age 等を含むデータセットを利用します。user_id は時系列データのディメンジョンでもありますが、user_id を利用して時系列データと非時系列データを結合し、Social media チャンネルからページにアクセスした顧客の行動に関する洞察を引き出します。
ファイルを Amazon S3 にアップロードし、このデータ用に Athena にテーブルを作成します。
CREATE EXTERNAL TABLE user_data
(
  user_id string,
  first_name string,
  last_name string,
  zip string,
  job string,
  age int
  )
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
 WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar' = '\"',
   'escapeChar' = '\\'
   )
STORED AS TEXTFILE
LOCATION 's3://<your_bucket_name>/csv_user_data/'
TBLPROPERTIES (
  "skip.header.line.count"="1")また、Zipcode, ZipCodeType, City, State, LocationType, Lat, Long, Location, Decommisioned, TaxReturnsFiled, EstimatedPopulation, TotalWages を含む、無料の郵便番号データセットも利用します。このデータセットから人口統計に関する洞察を引き出します。
このファイルを Amazon S3 にアップロードし、Athena にデータ用のテーブルを作成します。ファイルはすべてのフィールドが引用符で囲まれていますが、単純化の為、全てのフィールドを文字列としてインポートし、後で必要に応じて適切な型にキャストすることに注意してください。
CREATE EXTERNAL TABLE zipcode_data
(
  Zipcode string,
  ZipCodeType string,
  City string,
  State string,
  LocationType string,
  Lat string,
  Long string,
  Location string,
  Decommisioned string,
  TaxReturnsFiled string,
  EstimatedPopulation string,
  TotalWages string
  )
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
 WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar' = '\"',
   'escapeChar' = '\\'
   )
STORED AS TEXTFILE
LOCATION 's3://<your_bucket_name>/zipcode_data/'
TBLPROPERTIES (
  "skip.header.line.count"="1")3 つの S3 全てのデータセットを結合します。
SELECT * FROM "timestream-sample"."user_data" user_data
JOIN "timestream-sample"."channel_social_media" social_media ON social_media.user_id = user_data.user_id
JOIN "timestream-sample"."zipcode_data" zipcode ON user_data.zip = zipcode.zipcode;次のコードを使用して、チャネルソーシャルメディアの年齢層別の売上を検索します。
SELECT sum(social_media.quantity) as quantity_sold, case
   when user_data.age < 18 then 'Under 18'
   when user_data.age between 18 and 35 then '18-35'
   when user_data.age between 36 and 60 then '36-60'
   else 'Above 60'
END AS age_range FROM "timestream-sample"."user_data" user_data
JOIN "timestream-sample"."channel_social_media" social_media ON social_media.user_id = user_data.user_id
GROUP BY case
   when user_data.age < 18 then 'Under 18'
   when user_data.age between 18 and 35 then '18-35'
   when user_data.age between 36 and 60 then '36-60'
   else 'Above 60'
END次の結果が得られました。
州ごとの売上を表示するには、次のクエリを使用します。
SELECT sum(social_media.quantity) as quantity_sold, zipcode.state FROM "timestream-sample"."user_data" user_data
JOIN "timestream-sample"."channel_social_media" social_media ON social_media.user_id = user_data.user_id
JOIN "timestream-sample"."zipcode_data" zipcode ON user_data.zip = zipcode.zipcode
GROUP BY zipcode.state次の結果が得られました。
クリーンアップ
将来のコストを回避するに、この投稿用に作成したすべてのリソースを削除しましょう。
- Timestream コンソールのナビゲーション ペインで Databases を選択し、作成したデータベースを削除します。
- ナビゲーションペインで Tables を選択し、作成したテーブルを削除します。
- AWS Glue コンソールのナビゲーションペインで Crawlers を選択します。
- 作成したクローラーを選択し、Action メニューから Delete crawler を選択します。
- コンソールで Tables を選択し、この投稿用に作成された Data Catalog テーブルを削除します。
- Amazon S3 コンソールのナビゲーションペインで、Buckets を選択します。
- 本投稿用に作成されたバケットを空にして削除します。
結論
Timestream の UNLOAD ステートメントを使用すると、安全かつコスト効率の高い方法で時系列データを Amazon S3 にアンロードできます。このステートメントは、1 つ以上の列によるパーティション化や、形式、圧縮、暗号化の選択など、さまざまなオプションを指定する事ができます。時系列データをデータウェアハウスに追加する、AI/ML パイプラインを構築する、時系列データの ETL プロセスを簡素化する場合でも、UNLOAD ステートメントを使用するとプロセスがより簡単になります。
翻訳はテクニカルアカウントマネージャーの西原が担当しました。原文はこちらをご覧下さい。












