Amazon Web Services ブログ

Amazon DynamoDB からのデータストリームを AWS Lambda と Amazon Kinesis Firehose を活用して Amazon Aurora に格納する

Aravind Kodandaramaiah は AWS パートナープログラムのパートナーソリューションアーキテクトです。

はじめに

AWS ワークロードを実行するお客様は Amazon DynamoDBAmazon Aurora の両方を使用していることがよくあります。Amazon DynamoDB は、どのような規模でも、一貫した、数ミリ秒台にレイテンシーを抑える必要のあるアプリケーションに適した、高速で柔軟性の高い NoSQL データベースサービスです。データモデルの柔軟性が高く、パフォーマンスが信頼できるため、モバイル、ウェブ、ゲーム、広告、IoT、他の多くのアプリケーションに最適です。

Amazon Aurora は、MySQL と互換性のあるリレーショナルデータベースエンジンで、オープンソースデータベースのコスト効率性と簡素性を備えた、高性能の商用データベースの可用性とスピードをあわせもったエンジンです。Amazon Aurora は、MySQL よりも最大 5 倍のパフォーマンスを発揮するだけでなく、商用データベースのセキュリティ、可用性、および信頼性を 10 分の 1 のコストで実現します。

DynamoDB と Aurora を連携させるために、カスタムウェブ解析エンジンを構築して、毎秒数百万のウェブクリックが DynamoDB に登録されるようにしたとします。Amazon DynamoDB はこの規模で動作し、データを高速に取り込むことができます。また、このクリックストリームデータを Amazon Aurora などのリレーショナルデータベース管理システム (RDBMS) にレプリケートする必要があるとします。さらに、ストアドプロシージャまたは関数内で SQL の機能を使用して、このデータに対してスライスアンドダイスや、さまざまな方法でのプロジェクションを行ったり、他のトランザクション目的で使用したりするとします。

DynamoDB から Aurora に効率的にデータをレプリケートするには、信頼性の高いスケーラブルなデータレプリケーション (ETL) プロセスを構築する必要があります。この記事では、AWS Lambda Amazon Kinesis Firehose によるサーバーレスアーキテクチャを使用して、このようなプロセスを構築する方法について説明します。

ソリューションの概要

以下の図に示しているのは、このソリューションのアーキテクチャです。このアーキテクチャの背後には、次のような動機があります。

  1. サーバーレス – インフラストラクチャ管理を AWS にオフロードすることで、メンテナンスゼロのインフラストラクチャを実現します。ソリューションのセキュリティ管理を簡素化します。これは、キーやパスワードを使用する必要がないためです。また、コストを最適化します。さらに、DynamoDB Streams のシャードイテレーターに基づいた Lambda 関数の同時実行により、スケーリングを自動化します。
  2. エラー発生時に再試行可能 – データ移動プロセスには高い信頼性が必要であるため、プロセスは各ステップでエラーを処理し、エラーが発生したステップを再試行できる必要があります。このアーキテクチャではそれが可能です。
  3. 同時データベース接続の最適化 – 間隔またはバッファサイズに基づいてレコードをバッファすることで、Amazon Aurora への同時接続の数を減らすことができます。この方法は、接続タイムアウトを回避するのに役立ちます。
  4. 懸念部分の分離 – AWS Lambda を使用すると、データレプリケーションプロセスの各懸念部分を分離できます。たとえば、抽出フェーズを DynamoDB ストリームの処理として、変換フェーズを Firehose-Lambda 変換として、ロードフェーズを Aurora への一括挿入として分離できます。

以下に示しているのは、このソリューションのしくみです。

  1. DynamoDB Streams がデータソースです。DynamoDB Streams を使用すると、DynamoDB テーブル内の項目が変更されたときに、その変更を取得できます。AWS Lambda は、新しいストリームレコードを検出すると、Lambda 関数を同期的に呼び出します。
  2. Lambda 関数は、DynamoDB テーブルに新たに追加された項目をバッファし、これらの項目のバッチを Amazon Kinesis Firehose に送ります。
  3. Firehose は、受け取ったデータを Lambda 関数により変換して、Amazon S3 に配信します。Firehose に対してデータ変換を有効にしていると、Firehose は受け取ったデータをバッファし、バッファしたデータのバッチごとに、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは Lambda から Firehose に返されてバッファされます。
  4. Firehose は変換されたすべてのレコードを S3 バケットに配信します。
  5. Firehose は変換されないすべてのレコードも S3 バケットに配信します。ステップ 4 と 5 は同時に実行されます。Amazon SNS トピックをこの S3 バケットに登録して、以降の通知、修復、再処理に使用できます (通知に関する詳細はこのブログ記事では取り上げません)。
  6. Firehose が変換されたデータを S3 に正常に配信するたびに、S3 はイベントを発行することで Lambda 関数を呼び出します。この Lambda 関数は VPC 内で実行されます。
  7. Lambda 関数は Aurora データベースに接続し、SQL 式を実行して、S3 から直接テキストファイルにデータをインポートします。
  8. Aurora (VPC プライベートサブネット内で実行) は、S3 VPC エンドポイントを使用して S3 からデータをインポートします。

ソリューションの実装とデプロイ
次に、このソリューションを機能させるために必要な手順について説明します。以下の手順では、AWS CloudFormation スタックを起動して一連の AWS CLI コマンドを実行することで、VPC 環境を作成する必要があります。

AWS サービスを使用してこれらの手順を実行している間、AWS サービスの料金が適用されることがあります。

ステップ 1: ソリューションのソースコードをダウンロードする
このブログ記事で概説したソリューションでは、多くの Lambda 関数を使用し、また、多くの AWS Identity and Access Management (IAM) ポリシーおよびロールを作成します。このソリューションのソースコードは以下の場所からダウンロードします。

git clone https://github.com/awslabs/dynamoDB-data-replication-to-aurora.git

このリポジトリには、以下のフォルダ構造があります。このブログ記事の後続の手順を実行するために、lambda_iam フォルダに移動します。

ステップ 2: Firehose 配信用の S3 バケットを作成する
Amazon Kinesis Firehose を使用すると、Amazon S3 にリアルタイムのストリーミングデータを配信できます。そのためには、まず S3 バケットを作成します。次に、レコードの処理に失敗した場合に備えて、変換された最終のレコードとデータバックアップを保存するフォルダを作成します。

aws s3api create-bucket --bucket bucket_name

aws s3api put-object \
--bucket bucket_name \
--key processed/

aws s3api put-object \
--bucket bucket_name
--key tranformation_failed_data_backup/

 

ステップ 3: IAM ポリシー、S3 イベント通知、Firehose-S3 配信設定ファイルを変更する
次に、以下のファイルで、プレースホルダー AWS_REGION、

AWS_ACCOUNT_NUMBER、BUCKET_NAME をそれぞれ、お客様の AWS リージョン ID、AWS アカウント番号、ステップ 2 で作成した S3 バケットの名前に置き換えます。

 

·         aurora-s3-access-Policy.json

·         DynamoDb-Stream-lambda-AccessPolicy.json

·         firehose_delivery_AccessPolicy.json

·         lambda-s3-notification-config.json

·         s3-destination-configuration.json

·         firehose_delivery_trust_policy.json

ステップ 4: CloudFormation を使用して Aurora クラスターを設定する
次に、[Launch Stack] ボタンをクリックして AWS CloudFormation スタックを起動します。CloudFormation テンプレートは、VPC を作成し、その VPC のパブリックおよびプライベートサブネットを設定します。また、このテンプレートは、プライベートサブネット内で Amazon Aurora データベースクラスターを起動し、パブリックサブネット内でパブリック IP を割り当てた踏み台ホストも起動します。


ステップ 5: Aurora DB クラスターを設定する
CloudFormation スタックが完成したら、S3 バケット内のテキストファイルから DB クラスターにデータをロードするように、Aurora クラスターを変更する必要があります。以下に示しているのは、そのための手順です。

Amazon Aurora が Amazon S3 にアクセスできるようにします。そのためには、IAM ロールを作成し、先ほど作成した信頼およびアクセスポリシーをそのロールにアタッチします。

auroraS3Arn=$(aws iam create-role \
    --role-name aurora_s3_access_role \
    --assume-role-policy-document file://aurora-s3-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)

aws iam put-role-policy \
    --role-name aurora_s3_access_role \
    --policy-name aurora-s3-access-Policy \
    --policy-document file://aurora-s3-access-Policy.json

 

その IAM ロールを Aurora DB クラスターに関連付けます。そのためには、新しい DB クラスターパラメータグループを作成し、その DB クラスターに関連付けます。

aws rds add-role-to-db-cluster \
    --db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
    --role-arn $auroraS3Arn

aws rds create-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --db-parameter-group-family aurora5.6 \
    --description 'Aurora cluster parameter group - Allow access to Amazon S3'

aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --parameters "ParameterName=aws_default_s3_role,ParameterValue= $auroraS3Arn,ApplyMethod=pending-reboot"

aws rds modify-db-cluster \
	--db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
	--db-cluster-parameter-group-name webAnayticsclusterParamGroup

 

プライマリ DB インスタンスを再起動します。

aws rds reboot-db-instance \
--db-instance-identifier Output PrimaryInstanceId from CloudFormationF Stack 

ステップ 6: DynamoDB ストリームと、そのストリームを処理する Lambda 関数を設定する

1.    ストリームを有効にして新しい DynamoDB テーブルを作成します。以降の手順では、AWS Lambda 関数をストリームに関連付けることで、トリガーを作成します。

aws dynamodb create-table \
    --table-name web_analytics \
    --attribute-definitions AttributeName=page_id,AttributeType=S AttributeName=activity_dt,AttributeType=S \
    --key-schema AttributeName=page_id,KeyType=HASH AttributeName=activity_dt,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=50,WriteCapacityUnits=50 \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

2.    Lambda 実行ロールを作成します。

DdbStreamLambdaRole=$(aws iam create-role \
    --role-name DdbStreamLambdaRole \
    --assume-role-policy-document file://DynamoDB-Stream-lambda-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)


aws iam put-role-policy \
    --role-name DdbStreamLambdaRole \
    --policy-name DdbStreamProcessingAccessPolicy \
    --policy-document file://DynamoDb-Stream-lambda-AccessPolicy.json

3.  DynamoDB ストリームを処理する Lambda 関数を作成します。

aws lambda create-function \
    --function-name WebAnalyticsDdbStreamFunction \
    --zip-file fileb://ddbStreamProcessor.zip \
    --role $DdbStreamLambdaRole \
    --handler ddbStreamProcessor.handler \
    --timeout 300 \
    --runtime nodejs4.3

 

4.  その Lambda 関数を DynamoDB ストリームに関連付けることで、トリガーを作成します。

tableStreamArn=$(aws dynamodb describe-table --table-name web_analytics --query 'Table.LatestStreamArn' --output text)

aws lambda create-event-source-mapping \
    --function-name WebAnalyticsDdbStreamFunction \
    --event-source-arn $tableStreamArn \
    --batch-size 100 \
    --starting-position LATEST

ステップ 7: Firehose のデータ変換 Lambda 関数を作成して設定する

Lambda 実行ロールを作成します。

transRole=$(aws iam create-role \
            --role-name firehose_delivery_lambda_transformation_role \
            --assume-role-policy-document  file://firehose_lambda_transformation_trust_policy.json \
            --query 'Role.Arn' --output text)

aws iam put-role-policy \
        --role-name firehose_delivery_lambda_transformation_role \
        --policy-name firehose_lambda_transformation_AccessPolicy \
        --policy-document file://firehose_lambda_transformation_AccessPolicy.json

2.  データ変換 Lambda 関数を作成します。

aws lambda create-function \
    --function-name firehoseDeliveryTransformationFunction \
    --zip-file fileb://firehose_delivery_transformation.zip \
    --role $transRole \
    --handler firehose_delivery_transformation.handler \
    --timeout 300 \
    --runtime nodejs4.3 \
    --memory-size 1024

この関数は、受け取ったストリームのレコードを JSON スキーマに対して検証します。スキーマに一致したら、受け取った JSON レコードを解析し、カンマ区切り値 (CSV) 形式に変換します。

'use strict';

var jsonSchema = require('jsonschema');

var schema = { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { "Hits": { "type": "integer" }, "device": { "type": "object", "properties": { "make": { "type": "string" }, "platform": { "type": "object", "properties": { "name": { "type": "string" }, "version": { "type": "string" } }, "required": ["name", "version"] }, "location": { "type": "object", "properties": { "latitude": { "type": "string" }, "longitude": { "type": "string" }, "country": { "type": "string" } }, "required": ["latitude", "longitude", "country"] } }, "required": ["make", "platform", "location"] }, "session": { "type": "object", "properties": { "session_id": { "type": "string" }, "start_timestamp": { "type": "string" }, "stop_timestamp": { "type": "string" } }, "required": ["session_id", "start_timestamp", "stop_timestamp"] } }, "required": ["Hits", "device", "session"] };

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');

        var rec = JSON.parse(entry);
        console.log('Decoded payload:', entry);
        var milliseconds = new Date().getTime();

        var payl = JSON.parse(rec.payload.S);

        var jsonValidator = new jsonSchema.Validator();
        var validationResult = jsonValidator.validate(payl, schema);
        console.log('Json Schema Validation result = ' + validationResult.errors);

        if (validationResult.errors.length === 0) {

            const result = `${milliseconds},${rec.page_id.S},${payl.Hits},
				${payl.session.start_timestamp},
				${payl.session.stop_timestamp},${payl.device.location.country}` + "\n";

            const payload = (new Buffer(result, 'utf8')).toString('base64');
            console.log(payload);
            success++;

            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        }
        else {
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            }
        }

    });

    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });

};

ステップ 8: Firehose 配信ストリームを作成し、S3 にデータを配信するように設定する

Amazon S3 ターゲットを使用するとき、Firehose は S3 バケットにデータを配信します。配信ストリームを作成するには、IAM ロールが必要です。Firehose はその IAM ロールを引き受けることで、指定したバケットとキーにアクセスする権限を取得します。Firehose はまた、その IAM ロールを使用することで、Amazon CloudWatch ロググループにアクセスし、データ変換 Lambda 関数を呼び出す権限を取得します。

1.    S3 バケット、キー、CloudWatch ロググループ、データ変換 Lambda 関数にアクセスする権限を付与する IAM ロールを作成します。

aws iam create-role \
    --role-name firehose_delivery_role \
    --assume-role-policy-document  file://firehose_delivery_trust_policy.json

aws iam put-role-policy \
    --role-name firehose_delivery_role \
    --policy-name firehose_delivery_AccessPolicy \
    --policy-document file://firehose_delivery_AccessPolicy.json

 

2.  S3 ターゲット設定を指定して、Firehose 配信ストリームを作成します。

aws firehose create-delivery-stream \
    --delivery-stream-name webAnalytics \
    --extended-s3-destination-configuration='CONTENTS OF s3-destination-configuration.json file' 

3.  AWS マネジメントコンソールにサインインし、Firehose コンソールに移動します。webAnalytics という名前の配信ストリームを選択します。[Details] タブで、[Edit] を選択します。[Data transformation][Enabled] を、[IAM role] [firehose_delivery_role] を選択します。[Lambda function] で、[firehoseDeliveryTransformationFunction] を選択します。次に、[Save] を選択してこの設定を保存します。

ステップ 8: Lambda 関数を作成して、VPC リソースにアクセスするように設定する
S3 バケットから Amazon Aurora にデータをインポートするには、VPC 内のリソースにアクセスするように Lambda 関数を設定します。

1.    Lambda 関数の IAM 実行ロールを作成します。

auroraLambdaRole=$(aws iam create-role \
                --role-name lambda_aurora_role \
                --assume-role-policy-document file://lambda-aurora-Trust-Policy.json \
                --query 'Role.Arn' --output text)

aws iam put-role-policy \
    --role-name lambda_aurora_role \
    --policy-name lambda-aurora-AccessPolicy \
    --policy-document file://lambda-aurora-AccessPolicy.json

2.  プライベートサブネットやセキュリティグループなどの VPC 設定を指定する Lambda 関数を作成します。CLI の実行中に渡される環境変数 AuroraEndpoint、dbUser (データベースユーザー)、dbPassword (データベースパスワード) に正しい値を設定していることを確認します。これらの値については、CloudFormation スタック出力を参照してください。

aws lambda create-function \
    --function-name AuroraDataManagementFunction \
    --zip-file fileb://AuroraDataMgr.zip \
    --role $auroraLambdaRole \
    --handler dbMgr.handler \
    --timeout 300 \
    --runtime python2.7 \
    --vpc-config SubnetIds='Output PrivateSubnets from CloudFormation stack',SecurityGroupIds='Output DefaultSecurityGroupId from CloudFormation stack' \
    --memory-size 1024 \    
    --environment='    
                    {
                        "Variables": {
                            "AuroraEndpoint": "Output AuroraEndpoint from CloudFormation stack",
                            "dbUser": "Database User Name",
                            "dbPassword": "Database Password"
                        }
                    }'

 

Lambda 関数は Aurora データベースに接続します。この関数は、LOAD DATA FROM S3 SQL コマンドを実行して、S3 バケット内のテキストファイルから Aurora DB クラスターにデータをロードします。

import logging import pymysql import boto3 import sys import os
# rds settings
db_name = "Demo"
dbUser = os.environ['dbUser']
dbPassword = os.environ['dbPassword']
AuroraEndpoint = os.environ['AuroraEndpoint']

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
try:
    conn = pymysql.connect(AuroraEndpoint, user=dbUser, passwd=dbPassword, db=db_name, connect_timeout=5)
except:
    logger.error("ERROR: Unexpected error: Could not connect to Aurora instance.")
    sys.exit()

logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")

def handler(event, context):

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        s3location = 's3://' + bucket + '/' + key
        logger.info(s3location)

        sql = "LOAD DATA FROM S3 '" + s3location + "' INTO TABLE Demo.WebAnalytics FIELDS TERMINATED BY ',' " \
              "LINES TERMINATED BY '\\n' (timestarted, page_id, hits, start_time, end_time, country_code);"

        logger.info(sql)

        with conn.cursor() as cur:
            cur.execute(sql)
            conn.commit()
            logger.info('Data loaded from S3 into Aurora')

ステップ 9: S3 イベント通知を設定する
最後に、S3 がイベントを発行することで、前のステップで作成した Lambda 関数を呼び出すように、その関数を設定します。このプロセスの最初の手順は、Lambda 関数を呼び出すアクセス権限を S3 に付与することです。

1.    Lambda 関数を呼び出すアクセス権限を S3 に付与します。

aws lambda add-permission \
    --function-name AuroraDataManagementFunction \
    --action “lambda:InvokeFunction” \
    --statement-id Demo1234 \
    --principal s3.amazonaws.com \
    --source-arn ‘ARN of S3 Bucket created in Step 2’ \
    --source-account ‘AWS Account Id

2.    S3 バケット通知を設定します。

aws s3api put-bucket-notification-configuration \
    --bucket 'Name of S3 Bucket created in step 2' \
    --notification-configuration=' CONTENTS OF lambda-s3-notification-config.json ' 

 

ステップ 10: ソリューションをテストする
最後のステップは、ソリューションをテストすることです。

  1. このブログ記事のソースコードの TestHarness フォルダには、テストハーネスがあります。このテストハーネスは DynamoDB テーブルにデータを読み込みます。まず、TestHarness フォルダに移動し、コマンドノード loadDataToDDb.js を実行します。
  2. Secure Shell (SSH) を使用して、踏み台ホストに接続します。SSH を使用した接続の詳細については、EC2 のドキュメントを参照してください。
  3. 踏み台ホストのブートストラッププロセス中に MySQL クライアントがインストールされたため、以下のコマンドを使用して Aurora データベースに接続できます。パラメータ値を適切に変更していることを確認します。
/usr/bin/mysql -h DatabaseEndpoint -u DatabaseUsername --password=DatabasePassword 

4.    コネクションが成功したら、以下のコマンドを実行します。

 mysql> select count(*) from Demo.WebAnalytics;

このコマンドを実行した後、テーブルにレコードが読み込まれています。

テーブルにレコードが読み込まれていない場合、Firehose はそれらのレコードを S3 に送信する前にバッファしている可能性があります。これを回避するには、1 分くらい後に同じ SQL コードを再試行してください。この間隔は、現在設定されている S3 バッファ間隔の値に基づきます。このコードを再試行した後、Amazon Aurora にレコードが挿入されています。

 

まとめ

DynamoDB Streams と、Amazon Kinesis Firehose のデータ変換関数を使用すると、DynamoDB から Amazon Aurora などのデータソースにデータをレプリケートする強力でスケーラブルな方法を得られます。このブログ記事では、DynamoDB から Aurora へのデータのレプリケートを取り上げましたが、同じ一般的なアーキテクチャパターンを使用すれば、他のストリーミングデータを変換して、Amazon Aurora に取り込むことができます。

さらに、以下の関連するブログ記事を参照してください。