Amazon Web Services ブログ

AWS DMS と Amazon Kinesis Data Firehose を利用した Aurora PostgreSQL データベースへのストリームデータのロード

AWS Database Migration Service (AWS DMS) を利用することで、様々なデータソースから商用データベースやオープンソースデータベースへとデータを移行できます。このサービスでは、Oracle Database から Oracle Database への移行といった同一のDBMS製品間での移行をサポートしています。また、Oracle Database から Amazon Aurora, Microsoft SQL Server から MySQL へといった異なるプラットフォーム間での移行もサポートしています。さらに、ストリーミングデータを Amazon S3 から Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle, SQL Server を含む様々な移行先へ配信することが可能です。

Amazon Kinesis Data Firehose は、AWS へストリーミングデータをロードする上で、最も簡単な方法です。ストリーミングデータのキャプチャ、変換を行い、Amazon Kinesis Data Analytics, Amazon S3, Amazon Redshift, Amazon Elasticsearch Service へロードできます。Firehose を利用することで、すでに利用しているビジネスインテリジェンスツールやダッシュボードを使い、ニアリアルタイム分析が可能となります。Firehose はお客様が送信するデータのスループットに合わせて自動的にスケールするフルマネージドサービスで、継続した運用管理を必要としません。Firehose は、ロード前にデータをまとめ、圧縮し、暗号化することで、ロード先のストレージで必要な容量を最小化したり、セキュリティを向上させたりすることができます。

AWS DMS と Kinesis Data Firehose を利用することで、ストリーミングデータをサポートされたデータベースに格納し、分析やレポーティングのために利用できます。

本ブログでは、ストリーミングデータを Firehose から Amazon Aurora with PostgreSQL Compatibility へロードするパイプラインを作成する方法をご紹介します。そうすることで、既存のBIツールやダッシュボードを利用して、可視化を実現できます。

パイプラインのハイレベルなダイアグラムは下記の通りです。



サービス概要
Kinesis Data Firehose (S3 as a target)
Amazon Kinesis Data Firehose を利用することで、事前に決めたバッファサイズとバッファ間隔でストリームデータを Amazon S3 バケットにロードすることができます。バッファサイズはメガバイト単位で、バッファ間隔は秒単位で指定します。Kinesis Data Firehose へは、様々なソースからデータを送ることができます。例えば、Kinesis Data Stream, Kinesis エージェント、SDK を介して Kinesis Data Firehose API を呼び出して送信できます。Amazon CloudWatch Logs, CloudWatch Events, AWS IoT をデータソースとして利用することも可能です。

Kinesis Data Firehose では、 AWS Lambda 関数を呼び出して、ソースデータの変換を行い、配信先へ変換されたデータを送信できます。

Amazon S3 へのデータ配信では、お客様のバッファリング設定に基づき、Kinesis Data Firehose が送信されてくる複数のレコードを連結します。そして、Amazon S3 へ S3 オブジェクトとして配信します。

Kinesis Data Firehose は、Amazon S3 へオブジェクトを書き込む前に、YYYY/MM/DD/HH というフォーマットで UTC タイムスタンプをプレフィックスとして付与します。 このプレフィックスは、バケット内に論理的な階層を作成します。プレフィックスで使用するスラッシュ(/)ごとに階層ができます。Kinesis Data Firehose の配信ストリームを作成する際に、時間形式プレフィックスの前に任意のプレフィックスを追加することでこの階層を変更することができます。例えば、”myApp/”を追加することで、myApp/YYYY/MM/DD/HH というプレフィックスを利用できます。もしくは、”myapp” と追加することで、myApp YYYY/MM/DD/HH というプレフィックスを利用できます。このプレフィックス配下の S3 オブジェクト名は、DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString という形式に従います。

AWS DMS (S3 as a source)
AWS DMS はソースとなる S3 バケットからデータを読み取り、ターゲットデータベースにロードできます。これを行うには、データファイルを保持する S3 バケットに対するアクセス権を与える必要があります。S3 バケットには、データとデータを格納するデータベーステーブル間のマッピングを示す JSON ファイルが含まれます。

ソースデータファイルは、CSV形式でなければいけません。次の命名規則を使用してファイルに名前を付けます。この規約では、schemaNameはソースのスキーマ名であり、tableNameはそのスキーマ内のテーブル名です。

/schemaName/tableName/LOAD001.csv
/schemaName/tableName/LOAD002.csv
/schemaName/tableName/LOAD003.csv
…

データファイルに加えて、外部のテーブル定義についても提供しなければいけません。外部テーブルの定義は AWS DMS が S3 からのデータをどのように解釈すれば良いかを示す JSON 形式で記述します。

AWS DMS がデータのフルロードを実行後、オプションとしてデータの変更をターゲットエンドポイントへレプリケートできます。これを行うには、change data capture ファイル (CDC ファイル) を S3 バケットへアップロードします。AWS DMS は、これらの CDC ファイルを読み取り、ターゲットエンドポイントに変更を適用します。

CDC ファイルは下記のように命名されます。

CDC00001.csv
CDC00002.csv
CDC00003.csv
…

AWS DMS がファイルを検出できる場所を示すよう、 cdcPath パラメーターをソースエンドポイント作成時にDMSコンソールで指定します。もし、cdcPath を changedata と設定した場合、AWS DMS は下記のパスにある CDC ファイルを読み取ります。

s3://mybucket/changedata

CDC ファイル内のレコードは、下記の形式に従います:

  • Operation – 実行したい変更操作: INSERT, UPDATE, DELETE (これらのキーワードは、大文字小文字を意識する必要ありません)
  • Table name – ソーステーブル名
  • Schema name – ソーススキーマ名
  • Data –変更されるデータを表す 1つ以上のカラム
    下記は、stock という名前のスキーマ内の ticker というテーブルのCDCファイルの例です。
INSERT,ticker,stock,"RFV","FINANCIAL",-0.37,25.4
INSERT,ticker,stock,"VVS","ENERGY",-0.18,2.24
INSERT,ticker,stock,"NFS","ENERGY",-1.27,199.93
INSERT,ticker,stock,"WMT","RETAIL",-2.38,133.51
INSERT,ticker,stock,"BNM","TECHNOLOGY",-2.29,126.94

 

ソリューション概要
このソリューションのゴールは、Kinesis Data Firehose ストリームから Database Migration Service で利用する S3 バケットに書かれたデータを CDC を利用してデータベースに書き込むことです。

このソリューションは下記のコンポーネントを含みます:

  1. Kinesis Data Firehose – S3 へデータをストリームします。
  2. Lambda Function – JSON データを CSV へ変換し、DMS の要件にあうようにフォーマットします。
  3. Lambda Function –DMS が利用できるようにデータを適切なキーにコピーします。
  4. Aurora PostgreSQL Database – データを格納するターゲットデータベース。
  5. Database Migration Service –S3 からデータを読み取り、Aurora with PostgreSQL compatibility にロードします。

データベーステーブルの作成
まず、Aurora PostgreSQL データベースに接続し、下記の SQL を実行します。

Create Schema Stock;

Create Table Stock.Ticker
 (
    ticker_symbol varchar(20) not null,
    sector varchar(20) not null,
     change float not null,
     price float not null
 );

S3 アクセスに利用するIAM ロールの作成
次に、DMS がデータを読み取る特定の S3 バケットへのアクセスを行うための IAM ロールを作成します。私の場合、DMS から読み取れるように bucket/changedata, bucket/stock/ticker キーに対する読み取り権限を付与しています。Amazon リソースネーム (ARN)について注意してください。

また、S3 へ書き込みを行う Lambda 関数のための IAM ロールを作成します。私の場合、所有するバケット内の changedata キーに対して書き込みを行う権限を持つポリシーを作成しています。

Kinesis ストリームの設定

Firehose console
次に、Kinesis Data Firehose コンソールを開き、デリバリストリームを作成し、データ送信の方法として “Direct PUT or other sources” を選択します。

Firehose transform
次のページでは、record transformation を有効化(“Enabled“)し、 [Create New] をクリックします。

Lambda blueprints
Lambda blueprint(設計図) では、General Firehose Processing を選択します。AWS Lambda では、複数の設計図を提供し、ログファイルを変換するために利用できます。(※ 訳者注: Kinesis Data Firehose からblue print を選択して Lambda 関数の設定を行う方法でうまくいかない場合は、あらかじめ Lambda 関数を blueprint なしで作成/設定の上、Kinesis Data Firehose ストリームを作成してみてください。)

Basic information
続く Create Function ページでは、 convertTickerDataToCSVと関数名を入力します。Create a new role from template(s) を選択し、TickerDataRole とロール名を入力します。 S3 Object read-only permission を policy template の一覧から選択します。

Code inline
[Create function] をクリックすると関数が作成されます。そして、 Runtime  を Python 2.7 に設定します。Handler に lambda_function.lambda_handler と入力し、下記のコードを貼り付け、[Save] をクリックします。

from __future__ import print_function

import boto3
import base64
import json
import re

print('Loading function')

batch = boto3.client('batch')


def lambda_handler(event, context):
    output = []
    succeeded_record_cnt = 0
    failed_record_cnt = 0
    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data'])
        #{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}
        p = re.compile(r"^\{\"(\w+)\":(\"\w+\"),\"(\w+)\":(\"\w+\"),\"(\w+)\":(.{0,1}\d*.{0,1}\d*),\"(\w+)\":(\d*.{0,1}\d*)\}")
        m = p.match(payload)
        fixed_payload = "INSERT,ticker,stock"
    
        if m:
            succeeded_record_cnt += 1
            output_payload = fixed_payload + "," + m.group(2) + ',' + m.group(4) + ',' + m.group(6) + ',' + m.group(8) + '\n'
            print(output_payload)
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(output_payload)
             }
        else:
            print(payload)
            output_payload = ",,,,\n"
            print('Parsing failed')
            failed_record_cnt += 1
            output_record = {
                'recordId': record['recordId'],
                'result': 'ProcessingFailed',
                'data': base64.b64encode(output_payload)
            }
         
        output.append(output_record)
     
    print (output)
    
    print('Processing completed. Successful records {}, Failed records {}.'.format(succeeded_record_cnt, failed_record_cnt))
    return {'records': output}

 

Basic settings

Basic settings セクションで、タイムアウトの時間を 5 分に変更します。

Test event
右上の [Test] ボタンをクリックし、Event name に “TickerTestData” と入力し、[Create]をクリックします。

Transform record
Kinesis Data Firehose の Create Delivery Stream ウィンドウに戻ります。Lambda function のリフレッシュを行い、 convertTickerDataToCSV を選択し、[Next]をクリックします。

Destination
“Destination” として、 [S3 Bucket]を選択します。

Destination config
ウィンドウの下部において、S3 バケットを選択します。変換されたレコードのための Prefix を入力し、元レコードのバックアップのための S3 バケットと Prefix を設定します。

 

Configure settings
Configure Settings セクションでは、Buffer size と Buffer interval を 5 MB, 300 seconds のままにしておきます。
IAM role
IAM Role セクションで、[Create new or Choose] をクリックします。

IAM role details
表示されたタブにて、[Create a new IAM Role] をクリックし、ロール名として firehose_delivery_to_S3_role と入力し、[Allow]をクリックします。

[Next]をクリックし、配信ストリームの詳細を確認し、[Create Delivery Stream] をクリックします。

S3 オブジェクトをコピーする Lambda 関数の作成
ファイルはKinesis Data FirehoseからDMSがCDCに使用するものとは異なるキー構造で配信されます。その場合、正しいキーにデータをコピーする Lambda 関数を記述する必要があります。

そのために Lambda コンソールを開き、[Create Function] をクリックします。次のスクリーンで、[Author from Scratch]を選択します。

Author from scratch
Name として、 ProvideDataToDMSと入力し、Role として”Choose an existing role” を選択します。Existing Role では、最初のセクションで作成した“Creating an IAM role for S3 access”を持つロールを選択し、[Create Function]をクリックします。(※ 訳者注: Runtime は Python 2.7 を選択してください)

Function code
次のスクリーンでは、Runtime として Python 2.7 を選択し、Handler が lambda_function.lambda_handler であることを確認します。コードウィンドウには、下記のコードを貼り付けてください。

from __future__ import print_function

import json
import urllib
import boto3
import random

print('Loading function')

s3 = boto3.client('s3')


def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    
    targetBucket = bucket
    targetKey = "changedata/CDC" + "{:0^6}".format(str(random.randint(1,100000))) + ".csv"
    
    s3.copy_object(Bucket=targetBucket, CopySource=bucket+"/"+key, Key = targetKey)
   
    return targetKey

Designer セクションで Trigger として S3 を選択します。

Configure triggers

バケット名とプレフィックスを設定します。プレフィックスには、tickerdata/ と入力します。Event type には、”ObjectCreated(ALL)” を選択します。これにより、tickerdata キーをプレフィックスに持つ新たにオブジェクトが作成されるたびに Lambda 関数が呼び出されます。

[Add]をクリックし、[Save]をクリックします。

DMS エンドポイントのセットアップ
AWS Database Migration Service コンソールを開きます。

DMS landing page
[Replication instances] をクリックし、続くページで [Create Replication Instance] をクリックします。

Replication instance
VPC 内にレプリケーションインスタンスを作成します。また、RDS インスタンスにアクセスするためのセキュリティグループを設定します。そして、[Create]をクリックします。

[Endpoints]をクリックし、続くページで[Create Endpoint]をクリックします。

(※訳者注:画像では、CDC Path に cdcpath と入力されていますが 、changedata と入力してください。)

Create endpoint
Table structure のテキストボックスに下記のテーブル構造を貼り付けます。

{
  "TableCount": "1",
  "Tables": [
    {
      "TableName": "ticker",
      "TablePath": "stock/ticker/",
      "TableOwner": "stock",
      "TableColumns": [
        {
          "ColumnName": "ticker_symbol",
          "ColumnType": "STRING",
          "ColumnNullable": "false",
          "ColumnLength": 20,
          "ColumnIsPk": "true"
        },
        {
          "ColumnName": "sector",
          "ColumnType": "STRING",
          "ColumnNullable": "false",
          "ColumnLength": "20"
        },
        {
          "ColumnName": "change",
          "ColumnType": "REAL8",
          "ColumnNullable": "false"
        },
        {
          "ColumnName": "price",
          "ColumnNullable": "false",
          "ColumnType": "REAL8"
        }
      ],
      "TableColumnsTotal": "4"
    }
  ]
}

Advanced セクション内の Extra connection attributes に下記のように入力します。

csvRowDelimiter=\n;csvDelimiter=,;

[Create endpoint]をクリックしてエンドポイントを作成します。エンドポイントが作成されたら、ターゲットエンドポイントとして、Aurora PostgreSQL を指すエンドポイントを作成します。

次に、DMS タスクを作成します。これを行う前に、エンドポイントページにて各エンドポイントに対する接続テストが成功することを確認しておいてください。このテストにより、レプリケーションインスタンスからソース、ターゲットへの接続が確認できます。

準備ができたら、ナビゲーションペインから[Tasks]をクリックし、[Create Task]をクリックします。

Create task
Source endpointTarget endpoint に、前のセクションで作成したエンドポイントを設定します。Migration type として、Migrate existing data and replicate ongoing changes を選択します。Enable logging のチェックボックスをオンにすることを忘れないでください。

タスクが作成、開始されると、ストリームのテストを開始できます。

Test with demo data
Kinesis Data Firehose コンソールにて、サンプルデータをストリームするオプションを利用できます。作成した配信ストリームを選択し、[Test with demo data] をクリックします。[Start Sending Demo Data]をクリックすると、S3 バケット内の source_recordsキーとchangedataキー配下にファイルが表示されます。

Table stats
DMS コンソールでは、タスクページにて、データベースに挿入されるレコードの数を示すテーブル統計を表示できます。

挿入されたレコードの数を確認するには、Amazon Aurora PostgreSQLデータベースに接続し、次のクエリを実行します。

Select COUNT(1) as "NumberOfRecords" from stock.ticker;

まとめ
このブログでは、Kinesis Data Firehose から Aurora PostgreSQL データベースへデータを流し込む方法を紹介しました。ここでは例として、Aurora PostgreSQL データベースを使いましたが、DMS がターゲットとしてサポートしている他のデータベースでもご利用できます。

本ブログは江川が翻訳しました。原文はこちらです。