Amazon Web Services ブログ

初めての Amazon Timestream 入門

はじめに

みなさんこんにちは。AWS Japan ソリューションアーキテクトの高野 翔史です。

このブログでは、時系列データ専用のデータベースである Amazon Timestream について、実際に時系列データを挿入したり、検索をしながらご紹介します。

Amazon Timestream とは?

Amazon Timestream とは、高速かつスケーラブルなサーバレス時系列データベースサービスです。2022/7 より東京リージョンで利用ができるようになりました!

時系列データとは、時間経過とともに変化する事象を測定するために、タイムスタンプとともに測定情報や属性情報を記録したデータをシーケンシャルに並べたデータのまとまりのことです。昨今この時系列データが下記のような様々なユースケースで必要になっています。

  • IoT センサーからの時系列データを収集して、分析・可視化して、予防保全に役立てる。
  • 各時刻におけるサーバ機器の CPU・メモリ使用率や IOPS といった運用メトリクスをリアルタイムに分析・可視化して、正常性のモニタリングを行う。
  • Web システムのクリックストリームデータを収集して、購入経路やショッピングカートの放棄率等のデータを可視化し、商品の宣伝方法等に役立てる。

時系列データをリレーショナルデータベース (RDBMS) に格納すると下記のような課題が発生することがあります。

  • 一般的な RDBMS はスケールアップでの拡張になるため、大量のデバイスから数秒の間隔で同時に書き込みが来ると、性能的な限界が来ることがある。
  • 事前にスキーマ定義する必要があり、元データの出力内容が変更される度に、追従してスキーマ変更をしなければならなくなる。
  • 時系列データが一部欠損しているためデータ補間をしたい等のユースケースで必要な関数が存在せず、アプリケーションで独自に実装が必要になる。

上記を解決するために時系列データ専用のデータベースが存在していますが、本番に必要な規模や可用性、拡張性を十分に満たせなかったり、インフラの管理が非常に手間になることがあります。
Amazon Timestream は、時系列データを扱うマネージドデータベースサービスで下記特徴があり、上記のような課題を解決することが可能です。

  • サーバレスサービスでインフラの管理が不要であり、時系列データの挿入と保存と検索の処理がそれぞれ個別に自動でスケールするように設計されている。
  • 事前にスキーマの定義が不要で、柔軟にデータを取り込める。
  • データの欠損時にデータを補間するための関数等、様々な時系列処理に対応する専用の関数が用意されている。

このブログでは、上記特徴がある Amazon Timestream の構成要素を紹介しながら、実際にデータの挿入や検索をやってみます。
それでは、早速 Amazon Timestream の構成要素を説明していきます。

Amazon Timestream の構成要素

Amazon Timestream の構成要素の全体像は以下の通りです。

Components_of_Timestream

それぞれの構成要素の説明を以下にまとめます。

構成要素名 構成要素説明
データベース (Database) テーブル (Table) を保持するコンテナ
テーブル (Table) タイムシリーズ (Time-Series) を保持するコンテナ
タイムシリーズ (Time-Series) ある属性値で説明できる、時系列に並んだレコードのまとまり
ディメンション (Dimension) 測定値を識別するための属性情報セット
メジャー (Measure) 測定値 (名前 (measure_name) と値 (measure_value) のセット)
レコード (Record) 単一の時系列のデータポイント

一番大きな単位としてデータベース (Database) があり、その中に複数のテーブル (Table) を持つことができ、そのテーブルの中に複数のタイムシリーズ (Time-Series) が入っています。タイムシリーズ (Time-Series) は時系列に並んだレコード (Record) のまとまりとなっていて、その中にディメンション(Dimension)と呼ばれる測定値を識別するための属性情報とメジャー (Measure) と呼ばれる測定値でテーブル内で一意に決まるものに対して、タイムスタンプごとにレコード (Record) と呼ばれる単一のデータポイントが記録されています。

各構成要素の細かい制約はこちらを参照してください。では、Amazon Timestream の基本操作のイメージを掴んでいただくために、Timestream のデータベースとテーブルを作成し、データを挿入後、検索してみます。

今回作成する構成

Amazon Timestream に実際にテーブルを作って、データを挿入したり検索してみます。今回は、AWS Cloud9 から Amazon Timestream に対してデータを挿入する下記構成を用意します。リージョンは東京リージョンで作成していきますので、別のリージョンで作成しないようにお願いします。また、以降の記載は、利用するクライアント環境が Amazon Timestream のデータベース環境と同一アカウントかつ同一リージョンにあることを前提にしています。

Overall_configuration_diagram_of_Timestream

前準備

まず、Amazon Timestream 以外の部分は、できる限りAWSで提供している Infrastructure as Code サービスであるAWS CloudFormation で作成してしまいます (AWS Cloudformation の詳細についてはこちら) 。下記コードをコピーして、YML ファイル形式で保存し、CloudFormation に読み込ませ、AWS Cloud9 をデプロイするネットワーク環境を作成してください。作成する Amazon Virtual Private Cloud (Amazon VPC) やサブネットの CIDR は必要に応じて変更してください。本ブログでは、できる限りシンプルに構成するために、この YML ファイルでパブリックサブネットを作成して、その後の操作で、パブリックサブネット上に AWS Cloud9 をデプロイします。AWS Cloud9 の通信はアウトバウンドのみ許可しており、インバウンドは許可していませんが、プライベートネットワークからアクセスしたい場合は、別途こちらを参考にネットワーク環境を構成してください。

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation Template for Environment for ingesting data to Timestream

Parameters:
#VPC CIDR BLOCK
  VPCCIDR:
    Description: "Set VPC CIDR Block"
    Type: "String"
    Default: "10.0.0.0/16"

#Public Subnet CIDR BLOCK
  PUBLICSUBNETCIDR:
    Description: "Set Public Subnet CIDR Block"
    Type: "String"
    Default: "10.0.0.0/24"

Resources:
#VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !Ref VPCCIDR
      InstanceTenancy: default
      EnableDnsSupport: 'true'
      EnableDnsHostnames: 'true'
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-VPC

#VPC Subnet
  PublicSubnet:
    Type: AWS::EC2::Subnet
    Properties:
      CidrBlock: !Ref PUBLICSUBNETCIDR
      AvailabilityZone:
        Fn::Select:
          - 0
          - Fn::GetAZs: !Ref 'AWS::Region'
      VpcId: !Ref VPC
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-PublicSubnet

#Internet Gateway
  ManageIGW:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-IGW

  IGWAttachment:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref ManageIGW

#RouteTable
  PublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: !Sub ${AWS::StackName}-PublicRouteTable

  PublicRoute:
    Type: AWS::EC2::Route
    Properties:
      RouteTableId: !Ref PublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref ManageIGW

  PublicRouteAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      RouteTableId: !Ref PublicRouteTable
      SubnetId: !Ref PublicSubnet

Amazon Timestream のデータベース・テーブルを作成してみよう

まず、Timestream を利用するために、データベースとテーブルを作成してみます。作成には以下方法があります。本ブログでは、AWS マネージメントコンソールから GUI 操作で作成してみます。

  • AWS マネージメントコンソールからの操作
  • AWS CLI や各種プログラミング言語で提供されている AWS SDK の利用
  • AWS CloudFormation 等の Infrastructure as Code サービスの利用

データベースの作成

最初に Timestream のデータベースを作成します。

  1. AWS マネージメントコンソールの Timestream の操作画面から [データベースの作成] をクリックします。
  2. Creation_Timestream_Datebase

  3. データベース設定画面で、[名前] を SampleDB と入力し、その他はデフォルト値のまま [データベースを作成] をクリックします。
  4. Creation_Timestream_Database_screenshot

  5. データベースの管理画面に SampleDB が表示されれば、無事データベースの作成が完了になります。

Timestream_Database_created

テーブルの作成

次に作成したデータベース内にテーブルを作成していきます。

  1. AWS マネージメントコンソールの Timestream 操作画面の左側で [テーブル] を選択し、[テーブルを作成] をクリックします。
  2. Creation_Timestream_Table

  3. テーブルの作成画面で、[データベース名] を先ほど作成した SampleDB を選択し、[テーブル名] を SampleTable と入力し、その他はデフォルト値のまま [テーブルを作成] をクリックします。
  4. Creation_Timestream_Table_screenshot

  5. テーブルの管理画面に SampleTable が表示されれば、無事テーブルの作成が完了になります。
  6. Timestream_Table_created

Amazon Timestream テーブルにデータを挿入してみよう

次に作成したテーブルにデータを挿入してみます。データ挿入も以下方法があります。本ブログでは、AWS SDK for Python を利用して Amazon Timestream にデータを挿入してみます。

  • AWS CLI や各種プログラミング言語で提供されている AWS SDK の利用
  • AWS IoT Core の IoT ルールアクションの利用 (詳しくはこちら)

今回は、先ほど AWS CloudFormation で作成したネットワーク環境に、ブラウザベースでコードを開発できる IDE である AWS Cloud9 (詳しくはこちら) をデプロイし、Timestream に対して定期的にリソース使用率情報 (CPU/メモリ/ディスク) を挿入するコードを実行してみます。

  1. AWS マネージメントコンソールの AWS Cloud9 操作画面で、[Create environment] をクリックし、AWS Cloud9 を作成していきます。
  2. Creation_Cloud9

  3. [Name] に TimestreamDemo と入力し、[Network settings] の [VPC settings] で先ほど CloudFormation で作成した VPC 名とサブネットを選択して、その他はデフォルトで [Create] をクリックします。
  4. Creation_Cloud9_screenshot1

    Creation_Cloud9_screenshot2

  5. 作成が完了したら、AWS マネージメントコンソールの AWS Cloud9 操作画面で、作成した AWS Cloud9 が表示されるので、[Open] をクリックし、AWS Cloud9 を起動します。
  6. Cloud9_created

  7. ブラウザに AWS Cloud9 の画面が表示されます。
  8. Cloud9_operation_screenshot

AWS Cloud9 の画面が表示されたら、まず、今回実行するコードに必要なライブラリである AWS SDK for Python (Boto3) と psutil をインストールします。

  • AWS SDK for Python (Boto3) は Python で AWS の各種サービスを操作するためのライブラリになります。Timestream にデータを挿入するために使用します。
  • psutil は CPU やメモリ使用率等のリソースの情報を取得するためのライブラリになります。AWS Cloud9 のリソース使用率情報 (CPU/メモリ/ディスク) を取得するために使用します。
  1. AWS Cloud9 上の Terminal に以下のコマンドを入力します。
  2. pip install boto3
    pip install psutil

    boto3_installation

  3. AWS SDK for Python (Boto3) と psutil のインストールに成功したら、先ほど作成した Timestream のテーブルにデータを挿入するためのコードを AWS Cloud9 上に配置します。AWS Cloud9 のトップディレクトリで右クリックして [New File] をクリックして、ファイル名に [ingest_timestream.py] と入力します。
  4. Create_Python_files_on_Cloud9

    Rename_Python_filename_on_Cloud9

  5. ファイルを作成したら、AWS Cloud9 でファイルを開き、下記コードを貼り付けて、保存します。
  6. Paste_code_on_Cloud9

    コードで実行している内容は、AWS Cloud9 の CPU 使用率、メモリ使用率、ディスク使用率を取得して、指定した間隔 (今回は 5 秒に設定) で Timestream にデータ挿入しています。ディメンションはサーバを一意に識別する情報として、国名、都市名、ホスト名を設定しています。

    import time
    import boto3
    import psutil
    
    from botocore.config import Config
    
    
    #AWS region setting
    REGION = "ap-northeast-1"
    
    #Timestream database table name settings
    DATABASE_NAME = "SampleDB"
    TABLE_NAME = "SampleTable"
    
    #Dimension name settings
    COUNTRY = "Japan"
    CITY = "Tokyo"
    HOSTNAME = "DemoInstance01"
    
    #Interval between inserting data into TimeSteam(second)
    INTERVAL = 5
    
    #Functions that set dimensions
    def prepare_dimensions():
        dimensions = [
                {'Name': 'country', 'Value': COUNTRY},
                {'Name': 'city', 'Value': CITY},
                {'Name': 'hostname', 'Value': HOSTNAME}
        ]
        return dimensions
    
    #Function that writes data to timestream
    def write_records(records):
        try:
            result = write_client.write_records(DatabaseName=DATABASE_NAME,
                                                TableName=TABLE_NAME,
                                                CommonAttributes={},
                                                Records=records)
            status = result['ResponseMetadata']['HTTPStatusCode']
            print("WriteRecords HTTPStatusCode: %s" %
                  (status))
        except Exception as err:
            print("Error:", err)
    
    #main function
    if __name__ == '__main__':
    
        print("writing data to database {} table {}".format(
            DATABASE_NAME, TABLE_NAME))
    
        session = boto3.Session(region_name=REGION)
        write_client = session.client('timestream-write',
                                      config=Config(read_timeout=20,
                                      max_pool_connections=5000,
                                      retries={'max_attempts': 10}))
    
        dimensions = prepare_dimensions()
    
        records = []
    
        while True:
            current_time = str(round(time.time() * 1000))
            
            cpu_utilization = {
                'Dimensions': dimensions,
                'MeasureName': 'cpu_utilization',
                'MeasureValue': str(psutil.cpu_percent()),
                'MeasureValueType': 'DOUBLE',
                'Time': current_time
            }
            
            memory_utilization = {
                'Dimensions': dimensions,
                'MeasureName': 'memory_utilization',
                'MeasureValue': str(psutil.virtual_memory().percent),
                'MeasureValueType': 'DOUBLE',
                'Time': current_time
            }
            
            
            disk_utilization = {
                'Dimensions': dimensions,
                'MeasureName': 'disk_utilization',
                'MeasureValue': str(psutil.disk_usage('/').percent),
                'MeasureValueType': 'DOUBLE',
                'Time': current_time
            }
    
    
            records = [cpu_utilization, memory_utilization, disk_utilization]
            write_records(records)
            records = []
    
            time.sleep(INTERVAL)
  7. 作成したコードを実行するために、AWS Cloud9 の Terminal で下記コマンドを実行します。
  8. python3 ingest_timestream.py

    実行がうまくいけば、5 秒ごとに Timestream にデータが挿入されていきます。Terminal 上に [WriteRecords HTTPStatusCode: 200] が定期的に表示されれば、データ挿入が成功しています。

    Code_execution_on_Cloud9

Amazon Timestream テーブル内のデータを検索してみよう

次に Amazon Timestream に挿入したデータを検索してみます。データの検索する方法も下記のようないくつか方法かあります。今回は、AWS コンソール上でのクエリーエディタ機能を利用してデータを検索してみます。

  • AWS CLI や各種プログラミング言語で提供されている AWS SDK の利用
  • AWS コンソール上でクエリーエディタ機能の利用
  1. AWS マネージメントコンソールの Timestream の操作画面の左側で [クエリエディタ] をクリックすると、クエリを実際に記述して、実行することができます。
  2. Timestream_Query_Editor

    早速、先ほど Timestream に投入したデータがテーブルに保管されているか検索してみましょう。

  3. クエリエディタに下記 SQL 文を貼り付けて、[実行] をクリックします。テーブル内のデータを全部取ってくる SQL 文です。
  4. SELECT *
    FROM "SampleDB"."SampleTable"
    ORDER BY time DESC
    

    Execute_Timestream_query1

  5. そうすると、検索結果が出力されます。無事指定したディメンションやメジャーに対して時刻ごとの値が返ってきており、データの挿入ができていることが分かります。
  6. Result_Timestream_query1

  7. また、実行結果の [出力] タブを見ると、検索にかかった時間やスキャンされたデータのバイト数が確認できます。これでクエリごとにどの程度時間がかかるのかといった性能や発行した SQL 文がどの程度データスキャンがかかるのかといったコストに関わる部分を確認することができます。
  8. Result1_Timestream_query1

次に、時系列データ専用のデータベースの機能である時系列関数を利用してみます。Timestream は時系列関連の処理を行う SQL 文を簡単に書くことができます。例えば今回は 5 秒単位でデータを挿入していますが、分析には 1 分おきのデータで良いといったユースケースがあるとします。その場合、bin 関数を利用して簡単にデータの平均値を取得することができます。bin 関数は Timestream 独自の時系列関数で任意の時間間隔にデータを間引くことが可能です。bin 関数の詳細はこちらをご確認下さい。

  1. 下記 SQL 文をクエリエディタに貼り付けて実行します。
  2. SELECT measure_name,
        avg(measure_value::double) AS avg_value,
        BIN(time, 1m) AS binned_time 
    FROM "SampleDB"."SampleTable" 
    WHERE measure_name = 'cpu_utilization'
    GROUP BY measure_name, BIN(time, 1m)
    ORDER BY binned_time DESC
    
  3. 結果として、1 分間隔で CPU 使用率の平均値を計算して取得することができています。
  4. Result_Timestream_query2

まとめ

Amazon Timestream とは何か?といった概要や構成要素を説明した上で、データベースの作成から、テーブル作成、データの挿入、データの検索までやってみました。Amazon Timestream は時系列データ専用のサーバレスなデータベースです。時系列データの分析処理が必要になる際は、構築・設計・運用にかかる時間を削減するために、Amazon Timestream の利用をご検討下さい!


著者について

Shoji Takano

高野 翔史

 

このブログの執筆はソリューションアーキテクトの高野翔史が担当しました。
普段は、製造業のお客様のお客様に対し技術支援をしています。