Amazon Web Services ブログ

リアルタイム予測のために Amazon SageMaker を使用して Amazon DynamoDB でデータを分析する



世界で数多くの企業が、Amazon DynamoDB を使って、ユーザー対話履歴データを保存およびクエリしています。DynamoDB は、1 桁台のミリ秒の安定したレイテンシーを必要とするアプリケーションで使用されている、高速の NoSQL データベースです。

たいていは、顧客は Amazon S3 に格納されているテーブルのコピーを分析することで、DynamoDB 内の貴重なデータを詳細な情報に変換する必要があります。これにより、低いレイテンシーのクリティカルパスから分析クエリが分離します。このデータは、顧客の過去の行動を理解し、将来の行動を予測し、下流のビジネス価値を生み出す主要な情報源となり得るのです。スケーラビリティと可用性が高いという理由で、顧客は DynamoDB の使用へ切り替えるということがよくあります。立ち上げが上手くいった後、多くの顧客が DynamoDB のデータを使用して、今後の行動を予測したり、個別の推奨事項を提供したりしたいと考えます。

DynamoDB は、レイテンシーの低い読み書きに適していますが、DynamoDB データベース内のすべてのデータをスキャンし、モデルをトレーニングするのは現実的ではありません。この記事では、AWS Data Pipeline によって Amazon S3 にコピーされた DynamoDB テーブルデータを使って、顧客の行動を予測する方法を解説します。さらにこのデータを使用して、Amazon SageMaker で顧客に個別の推奨事項を提供する方法も説明します。Amazon Athena を使用して、データに対してアドホッククエリを実行することもできます。DynamoDB は最近、オンデマンドバックアップをリリースし、パフォーマンスに影響を与えずに完全なテーブルバックアップを作成しました。しかし、この記事には適していないので、代わりに AWS Data Pipeline が管理バックアップを作成し、他のサービスからアクセスできるようにする方法を紹介します。

これを行うため、Amazon Data Pipeline で DynamoDB バックアップファイル形式を読み取る方法について説明します。Amazon S3 のオブジェクトを Amazon SageMaker が読み取れる CSV 形式に変換する方法についてもお話しします。加えて、Amazon Data Pipeline を使用して、定期的なエクスポートと変換をスケジュールする方法も説明します。この記事で使用するサンプルデータは、「Bank Marketing Data Set of UCI」のものです。

ここで説明するソリューションは、以下のような利点があります。

  • 重要な本番リクエストの DynamoDB 読み取り容量単位 (RCU) を保存して、DynamoDB テーブルの本番トラフィックから分析クエリを分離する
  • モデルを自動更新して、リアルタイムでの予測を得る
  • エクスポート後に DynamoDB RCU と競合しないようパフォーマンスを最適化し、既に使用しているデータを使用してコストを最適化する
  • あらゆるスキルレベルの開発者が、Amazon SageMaker をより簡単に使えるようにする

この記事のすべてのコードとデータは、こちら .zip ファイルにあります。

ソリューションのアーキテクチャ

次の図は、ソリューションの全体的なアーキテクチャを示しています。

アーキテクチャーにおけるデータ処理の手順は、次のとおりです。

  1. Amazon Data Pipeline は、DynamoDB テーブルの全コンテンツを JSON として Amazon S3 に定期的にコピーする
  2. エクスポートされた JSON ファイルは、Amazon SageMaker のデータソースとして使用するカンマ区切り値 (CSV) 形式に変換されます。
  3. Amazon SageMaker はモデルアーティファクトとエンドポイントを更新します。
  4. 変換した CSV は、Amazon Athena を使ってアドホッククエリに使用できます。
  5. Data Pipeline はこのフローを制御し、顧客要件によって定義されたスケジュールに基づいてサイクルを繰り返します。

自動更新モデルの構築

このセクションでは、Data Pipeline でエクスポートされた DynamoDB データを読み込み、定期的に更新したモデルを使用してリアルタイム予測のための自動ワークフローを構築する方法について、詳しく見ていきます。

サンプルスクリプトとデータのダウンロード

始める前に、次の手順を実行します。

  1. この .zip ファイルにあるサンプルスクリプトをダウンロードします。
  2. src.zip ファイルを解凍します。
  3. automation_script.sh ファイルを見つけ、自分の環境に合わせて編集します。例えば、's3://<your bucket>/<datasource path>/' を、Amazon ML のデータソースへの自分の S3 パスに置き換える必要があります。このスクリプトでは、角カッコ (< と >) で囲まれたテキストを自身のパスに置き換える必要があります。
  4. Apache Hive の ADD jar コマンドが参照できるように、json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ファイルを S3 パスにアップロードします。

このソリューションでは、banking.csv を DynamoDB テーブルにインポートする必要があります。

DynamoDB テーブルをエクスポートする

DynamoDB テーブルを Amazon S3 にエクスポートするには、Data Pipeline コンソールを開き、Export DynamoDB table to S3 テンプレートを選択します。このテンプレートでは、Data Pipeline は Amazon EMR クラスターを作成し、EMRActivity アクティビティでエクスポートを実行します。ビジネス要件に応じて、バックアップの適切な間隔を設定します。

1 つのコアノード (m3.xlarge) は EMR クラスターのデフォルト容量を備えているので、この記事のソリューションに適しているはずです。TableBackupActivity アクティビティで有効にする前に、クラスターのサイズを変更するオプションを残して、Data Pipeline がクラスタをテーブルサイズに合わせて拡張できるようにします。この EMR クラスターでは、CSV 形式に変換してモデルを更新するプロセスが発生します。

DynamoDB からデータをエクスポートする方法の詳細については、Amazon Data Pipeline ドキュメントの「AWS Data Pipeline を使用して DynamoDB データをエクスポートおよびインポートする」を参照してください。

既存のパイプラインにスクリプトを追加する

DynamoDB テーブルをエクスポートしてから、次の手順に従って、EMRActivity に追加の EMR ステップを追加します。

  1. Data Pipeline コンソールを開き、スクリプトを追加したいパイプラインの ID を選択します。
  2. [Actions] では、[Edit] を選択します。
  3. 下記のように、編集コンソールで、[Activities] のカテゴリを選択し、前のセクションでダウンロードしたカスタムスクリプトを使って EMR ステップを追加します。

データアップロードステップの後、次のコマンドを新しいステップに貼り付けます。

s3://#{myDDBRegion}.elasticmapreduce/libs/script-runner/script-runner.jar,s3://<your bucket name>/automation_script.sh,#{output.directoryPath},#{myDDBRegion}

要素 #{output.directoryPath} は、データパイプラインが DynamoDB データを JSON としてエクスポートする S3 パスを参照します。パスは引数としてスクリプトに渡す必要があります。

bash スクリプトには、データフォーマットの変換、および Amazon SageMaker モデルの更新という 2 つの目的があります。後のセクションでは、自動化スクリプトのコンテンツについて説明します。

自動化スクリプト : Hive を使用して JSON データを CSV に変換する

Apache Hive を使って、データを新しい形式に変換します。外部テーブルを作成しデータを変換する Hive QL スクリプトは、Data Pipleine 定義に追加したカスタムスクリプトの中に含まれています。

Hive スクリプトを実行するときは、-e オプションを使用してください。さらに、'org.openx.data.jsonserde.JsonSerDe' 行形式で Hive テーブルを定義し、JSON 形式を解析して読み込みます。SQL は Hive EXTERNAL テーブルを作成し、Data Pipeline によって渡された S3 パスの DynamoDB バックアップデータを読み込みます。

注 : テーブルを削除する場合、バックアップデータが S3 から誤って削除されないように、「EXTERNAL」キーワードでテーブルを作成する必要があります。

変換のための完全な自動化スクリプトは、以下のとおりです。ハイライト表示された部分に、自分のバケット名とデータソースパスを追加します。

#!/bin/bash
hive -e "
ADD jar s3://<your bucket name>/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ; 
DROP TABLE IF EXISTS blog_backup_data ;
CREATE EXTERNAL TABLE blog_backup_data (
 customer_id map<string,string>,
 age map<string,string>, job map<string,string>,
 marital map<string,string>,education map<string,string>,
 default map<string,string>, housing map<string,string>,
 loan map<string,string>, contact map<string,string>,
 month map<string,string>, day_of_week map<string,string>,
 duration map<string,string>, campaign map<string,string>,
 pdays map<string,string>, previous map<string,string>,
 poutcome map<string,string>, emp_var_rate map<string,string>,
 cons_price_idx map<string,string>, cons_conf_idx map<string,string>,
 euribor3m map<string,string>, nr_employed map<string,string>,
 y map<string,string> ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
LOCATION '$1/';

INSERT OVERWRITE DIRECTORY 's3://<your bucket name>/<datasource path>/' 
SELECT concat( customer_id['s'],',',
 age['n'],',', job['s'],',',
 marital['s'],',', education['s'],',', default['s'],',',
 housing['s'],',', loan['s'],',', contact['s'],',',
 month['s'],',', day_of_week['s'],',', duration['n'],',',
 campaign['n'],',',pdays['n'],',',previous['n'],',',
 poutcome['s'],',', emp_var_rate['n'],',', cons_price_idx['n'],',',
 cons_conf_idx['n'],',', euribor3m['n'],',', nr_employed['n'],',', y['n'] ) 
FROM blog_backup_data
WHERE customer_id['s'] > 0 ; 

外部テーブルを作成したら、データを読み込む必要があります。次に、INSERT OVERWRITE DIRECTORY ~ SELECT コマンドを使用して、Amazon SageMaker のデータソースとして指定した S3 パスに CSV データを書き込みます。

要件に応じて、このステップで SELECT 節の列を削除または処理して、データ分析を最適化することができます。例えば、間違った列を維持すると、トレーニング中にモデルが「過学習」する可能性があるため、予測値との相関が目標値と異なる列がいくつか削除される可能性があります。この投稿では、customer_id の列が削除されています。過学習で、予測が悪くなる可能性があります。過学習の詳細については、Amazon ML ドキュメントの中にある「モデルフィット : アンダーフィットとオーバーフィット」を参照してください。

自動化スクリプト : Amazon SageMaker モデルを更新する

CSV データを置き換えて使用する準備が整ったら、Amazon S3 の更新データセットを使用して Amazon SageMaker 用の新しいモデルアーティファクトを作成します。 モデルアーティファクトを更新するには、新しいトレーニングジョブを作成する必要があります。  トレーニングジョブは、AWS SDK (例えば、Amazon SageMaker boto3)、あるいは Amazon SageMaker Python SDK コマンド (「pip install sagemaker」でインストール可能)、またはこの記事で説明している Amazon SageMaker 用 AWS CLI を使って実行します。

さらに、モデルがアプリケーションによってリアルタイムで呼び出されるため、サービスに影響を与えずに既存のモデルを円滑に更新する方法を検討します。これを行うには、新しいエンドポイント構成を最初に作成し、作成したばかりのエンドポイント構成で現在のエンドポイントを更新する必要があります。

#!/bin/bash
## Define variable 
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>" 


# Select containers image based on region. 
case "$REGION" in
"us-west-2" )
    IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
    ;;
"us-east-1" )
    IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest" 
    ;;
"us-east-2" )
    IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest" 
    ;;
"eu-west-1" )
    IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest" 
    ;;
 *)
    echo "Invalid Region Name"
    exit 1 ;  
esac

# Start training job and creating model artifact 
TRAINING_JOB_NAME=TRAIN-${DTTIME} 
S3OUTPUT="s3://<your bucket name>/model/" 
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5 
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE}  --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None"  }]'  --output-data-config S3OutputPath=${S3OUTPUT} --resource-config  InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier  

# Wait until job completed 
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME}  --region ${REGION}

# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME}  --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT}  --execution-role-arn ${ROLE}

# create a new endpoint configuration 
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker  create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME}  --production-variants  VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge

# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name  ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
    aws sagemaker  create-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}    
else
    aws sagemaker  update-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi

アクセス許可を付与する

スクリプトを実行する前に、Amazon Data Pipeline に対して適切なアクセス許可を与える必要があります。Data Pipeline は、デフォルトで DataPipelineDefaultResourceRole のロールを使用します。DataPipelineDefaultResourceRole に次のポリシーを追加して、Data Pipeline が Amazon SageMaker モデルとデータソースをスクリプトで作成、削除、更新できるようにしました。

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:CreateTrainingJob",
 "sagemaker:DescribeTrainingJob",
 "sagemaker:CreateModel",
 "sagemaker:CreateEndpointConfig",
 "sagemaker:DescribeEndpoint",
 "sagemaker:CreateEndpoint",
 "sagemaker:UpdateEndpoint",
 "iam:PassRole"
 ],
 "Resource": "*"
 }
 ]
}

リアルタイム予測を使用する

Amazon SageMaker ホスティングサービスを使用して本番環境にモデルをデプロイすると、クライアントアプリケーションはこの API を使って、指定されたエンドポイントでホストされているモデルから推論を取得します。このアプローチは、インタラクティブ型ウェブ、モバイル、またはデスクトップ用アプリケーションに利用できます。

次に、Amazon SageMaker のエンドポイント URL に対してその名前 (「ServiceEndpoint」) をクエリし、それをリアルタイム予測に使用している単純な Python コードの例を用意しました。

===リアルタイム予測のための Python サンプル===

#!/usr/bin/env python
import boto3
import json 

client = boto3.client('sagemaker-runtime', region_name ='<your region>' )
new_customer_info = '34,10,2,4,1,2,1,1,6,3,190,1,3,4,3,-1.7,94.055,-39.8,0.715,4991.6'
response = client.invoke_endpoint(
    EndpointName='ServiceEndpoint',
    Body=new_customer_info,
    ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
print(result)
--- output(response) ---
{u'predictions': [{u'score': 0.7528127431869507, u'predicted_label': 1.0}]}

ソリューションの要約

このソリューションは、以下の手順を行います。

  1. Data Pipeline が、DynamoDB テーブルのデータを Amazon S3にエクスポートする。元の JSON データは、これを必要とするまれなイベントでのテーブルを回復するために保持する必要があります。次に Data Pipeline は、JSON を CSV に変換して、Amazon SageMaker がデータを読み取ることができるようにします。注釈 : CSV を変換する時は、意味のある属性のみを選択する必要があります。例えば、「campaign」属性に相関がないと判断した場合は、この属性を CSV から削除できます。
  2. 新しいデータソースを使用して、Amazon SageMaker モデルをトレーニングします。
  3. 新しい顧客がウェブサイトに来ると、この顧客が Amazon SageMaker の提供する「predictedScores」に基づいて新製品を登録する可能性を判断します。
  4. 新規ユーザーが新製品を登録する場合、アプリケーションは属性「y」を値「1」 (はいの場合) に更新する必要があります。この更新したデータは、次のモデル更新のための新しいデータソースとなります。これで予測の精度を向上させます。新しいエントリごとに、アプリケーションはより賢くなり、さらに良い予測が可能となります。

Amazon Athena を使用して、アドホッククエリを実行する

Amazon Athena は、標準 SQL を使用して Amazon S3 に格納された大量のデータを簡単に分析できる、サーバーレスのクエリサービスです。Athena は、データを調べたり、データに関する統計や情報を要約したりするのに便利です。Presto documentation の「Aggregate Functions of Presto」で説明しているように、Presto の強力な分析関数を使用することも可能です。

Data Pipeline でスケジュールされたアクティビティでは、最近の CSV データは常に Amazon S3 に配置され、Amazon Athena を用いてデータに対してアドホッククエリを実行します。これを、次の SQL ステートメントの例で示します。このプロセスの詳細な説明については、AWS News Blog の「Interactive SQL Queries for Data in Amazon S3」を参照してください。

Amazon Athena テーブルを作成し、実行する

Amazon Athena マネジメントコンソールで Amazon S3 の CSV データ用 EXTERNAL テーブルを、簡単に作成することができます。

===テーブルの作成===
CREATE EXTERNAL TABLE datasource (
 age int,
 job string,
 marital string ,
 education string,
 default string,
 housing string,
 loan string,
 contact string,
 month string,
 day_of_week string,
 duration int,
 campaign int,
 pdays int ,
 previous int ,
 poutcome string,
 emp_var_rate double,
 cons_price_idx double,
 cons_conf_idx double,
 euribor3m double,
 nr_employed double,
 y int 
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' 
LOCATION 's3://<your bucket name>/<datasource path>/';

次のクエリでは、Amazon Athena を使用して、ターゲット属性と他の属性との間の相関係数を計算しています。

===サンプルクエリ===

SELECT corr(age,y) AS correlation_age_and_target,
 corr(duration,y) AS correlation_duration_and_target,
 corr(campaign,y) AS correlation_campaign_and_target,
 corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y ,
 CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact 
 FROM datasource 
 ) datasource ;

結論

この記事では、Amazon S3 のテーブルデータを使ってて、DynamoDB テーブルの読み込み容量を最適化することで、DynamoDB でデータを分析する方法を紹介しました。次に、分析したデータを新しいデータソースとして使用して、正確なリアルタイム予測のための Amazon SageMaker モデルをトレーニングしました。さらに、Amazon Athena で、Amazon S3 上のデータに対してアドホッククエリも実行しました。また、Amazon Data Pipeline を用いて、これらの手順を自動化する方法も紹介しました。

この例を、ご自分のユースケースにあてはめてみてください。この記事が、開発の促進にお役に立ちましたら幸いです。Amazon SageMaker の例とユースケースは、AWS ウェブサイト内の動画 AWS 2017: Introducing Amazon SageMaker で、いろいろ見ることができます。

 


その他の参考資料

この記事がお役に立った場合は、「Serving Real-Time Machine Learning Predictions on Amazon EMR」および「Analyzing Data in S3 using Amazon Athena」をぜひ参照してください。

 


著者について

Yong Seong Lee は AWS ビッグデータサービスのクラウドサポートエンジニアです。データやデータベースに関するあらゆるテクノロジーに関心があります。AWS では、AWS のサービスの利用に手間取っているお客様をサポートしています。「人生を楽しむ。好奇心を持つ。これ以上ないほどの経験をする。」というのが、彼のモットーです。