機械学習モデルを構築、トレーニング、デプロイ、およびモニタリングする

Amazon SageMaker を使用します

Amazon SageMaker Studio は、機械学習用の最初の完全統合開発環境 (IDE) であり、ML 開発のすべてのステップを実行するための単一のウェブベースのビジュアルインターフェイスを提供します。

このチュートリアルでは、Amazon SageMaker Studio を使用して、XGBoost モデルを構築、トレーニング、デプロイ、およびモニタリングします。機能エンジニアリングやモデルトレーニングから ML モデルのバッチおよびライブデプロイまで、機械学習 (ML) ワークフロー全体をカバーします。

このチュートリアルでは、以下の方法を学びます。

  1. Amazon SageMaker Studio コントロールパネルを設定する
  2. Amazon SageMaker Studio ノートブックを使用して公開データセットをダウンロードし、Amazon S3 にアップロードする
  3. Amazon SageMaker Experiment を作成して、トレーニングおよび処理のジョブを追跡および管理する
  4. Amazon SageMaker Processing ジョブを実行して、生データから機能を生成する
  5. 組み込みの XGBoost アルゴリズムを使用してモデルをトレーニングする
  6. Amazon SageMaker バッチ変換を使用して、テストデータセットでモデルのパフォーマンスをテストする
  7. モデルをエンドポイントとしてデプロイし、モニタリングジョブを設定して、本番環境でモデルエンドポイントのデータドリフトをモニタリングします。
  8. 結果を視覚化し、SageMaker Model Monitor を使用してモデルをモニタリングし、トレーニングデータセットとデプロイされたモデルの違いを判断します。

モデルは、顧客層、支払い履歴、および請求履歴に関する情報を含む UCI Credit Card Default データセットでトレーニングされます。

このチュートリアルの内容
時間 1 時間                                      
コスト 10 USD 未満
ユースケース 機械学習
製品 Amazon SageMaker
対象者 デベロッパー、データサイエンティスト
レベル 中級コース
最終更新日 2020 年 8 月 17 日

ステップ 1:AWS アカウントを今すぐ無料で作成

本チュートリアルで作成し、使用されたリソースは AWS 無料利用枠の対象となります。このワークショップの料金は 10 USD 未満です。

AWS アカウントをお持ちですか? サインイン

ステップ 2.Amazon SageMaker Studio コントロールパネルを作成する

以下のステップを実行して Amazon SageMaker Studio にオンボーディングし、Amazon SageMaker Studio コントロールパネルを設定します。

注: 詳細については、Amazon SageMaker ドキュメントの Amazon SageMaker Studio の開始方法を参照してください。


a.Amazon SageMaker コンソールにサインインします。 

注: コンソールの右上隅で、SageMaker Studio が利用可能な AWS リージョンを選択してください。利用可能なリージョンの一覧については、Amazon SageMaker Studio にオンボードするを参照してください。


b. Amazon SageMaker のナビゲーションペインで [ Amazon SageMaker Studio] を選択します。
 
注意: 初めて Amazon SageMaker Studio を使用する場合には、 Studio のオンボーディングプロセスを完了する必要があります。 オンボードする際には、認証方法として AWS Single Sign-On (AWS SSO) または AWS Identity and Access Management (IAM) のどちらを使用するかを選択できます。IAM 認証を使用する場合には、クイックスタートまたは標準のセットアップの手順のいずれかを選択できます。どのオプションを選択すべきかわからない場合には、 Amazon SageMaker Studio にオンボードするを参照し、IT 管理者にサポートを依頼してください。説明を簡潔にするため、このチュートリアルでは クイックスタートの手順を使用します。

c.[ 今すぐ始める] ボックスで [ クイックスタート] を選択し、ユーザー名を指定します。

d.[ 実行ロール] で [ IAM ロールを作成する] を選択します。表示されるダイアログボックスで [ 任意の S3 バケット] を選択し、[ ロールの作成] を選択します。

Amazon SageMaker で必要なアクセス許可を持つロールが作成され、作成されたロールがインスタンスに割り当てられます。 


e.[ 送信] をクリックします。

ステップ 3.データセットをダウンロードする

Amazon SageMaker Studio ノートブックは、トレーニングスクリプトの作成とテストに必要なすべてが含まれているワンクリックの Jupyter ノートブックです。SageMaker Studio には、実験の追跡と視覚化も含まれているため、機械学習ワークフロー全体を 1 か所で簡単に管理できます。

次のステップを実行して SageMaker ノートブックを作成し、データセットをダウンロードしてから、データセットを Amazon S3 にアップロードします。

注: 詳細については、Amazon SageMaker ドキュメントの Amazon SageMaker Studio ノートブックの使用を参照してください。


a.[ Amazon SageMaker Studio コントロールパネル] で [ Studio を開く] を選択してください。

b.[ JupyterLab] の [ ファイル] メニューで、[ 新規]、[ ノートブック] の順に選択します。[ カーネルを選択] ボックスで [ Python 3 (データサイエンス)] を選択します。
 

c.まず、 Amazon SageMaker Python SDK のバージョンを確認します。以下のコードブロックをコピーしてコードセルに貼り付け、[ Run] (実行) を選択します。印刷メッセージに「 以前の SageMaker バージョンをインストールしています。カーネルを再起動してください」と表示された場合、[ Kernel] (カーネル) タブを選択して、[ Restart Kernel] (カーネルの再起動) を選択してください。その後、残りのステップに進むことができます。
 
注: コードの実行中、角かっこの間に * が表示されます。数秒後、コードの実行が完了すると、* が番号に置き換えられます。
import boto3
import sagemaker
from sagemaker import get_execution_role
import sys

if int(sagemaker.__version__.split('.')[0]) == 2:
    !{sys.executable} -m pip install sagemaker==1.72.0
    print("Installing previous SageMaker Version. Please restart the kernel")
else:
    print("Version is good")

role = get_execution_role()
sess = sagemaker.Session()
region = boto3.session.Session().region_name
print("Region = {}".format(region))
sm = boto3.Session().client('sagemaker')
次に、ライブラリをインポートします。以下のコードをコピーしてコードセルに貼り付け、[ Run] (実行) を選択します。
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from time import sleep, gmtime, strftime
import json
import time
最後に、実験をインポートします。以下のコードをコピーしてコードセルに貼り付け、[ Run] (実行) を選択します。
!pip install sagemaker-experiments 
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

d.プロジェクトの Amazon S3 バケットとフォルダを定義します。以下のコードをコピーしてコードセルに貼り付け、[ Run] (実行) を選択します。
rawbucket= sess.default_bucket() # Alternatively you can use our custom bucket here. 

prefix = 'sagemaker-modelmonitor' # use this prefix to store all files pertaining to this workshop.

dataprefix = prefix + '/data'
traindataprefix = prefix + '/train_data'
testdataprefix = prefix + '/test_data'
testdatanolabelprefix = prefix + '/test_data_no_label'
trainheaderprefix = prefix + '/train_headers'

e.データセットをダウンロードし、pandas ライブラリを使用してインポートします。以下のコードをコピーして新しいコードセルに貼り付け、[Run] (実行) を選択します。

! wget https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls
data = pd.read_excel('default of credit card clients.xls', header=1)
data = data.drop(columns = ['ID'])
data.head()

f.最後の列の名前を Label に変更し、ラベル列を個別に抽出します。Amazon SageMaker の組み込み XGBoost アルゴリズムの場合、ラベル列はデータフレームの最初の列である必要があります。この変更を行うには、以下のコードをコピーして新しいコードセルに貼り付け、[Run] (実行) を選択します。

data.rename(columns={"default payment next month": "Label"}, inplace=True)
lbl = data.Label
data = pd.concat([lbl, data.drop(columns=['Label'])], axis = 1)
data.head()

g.CSV データセットを Amazon S3 バケットにアップロードします。以下のコードをコピーして新しいコードセルに貼り付け、[Run] (実行) を選択します。

if not os.path.exists('rawdata/rawdata.csv'):
    !mkdir rawdata
    data.to_csv('rawdata/rawdata.csv', index=None)
else:
    pass
# Upload the raw dataset
raw_data_location = sess.upload_data('rawdata', bucket=rawbucket, key_prefix=dataprefix)
print(raw_data_location)

完了しました。 以下の例のように、コードの出力に S3 バケットの URI が表示されます。

s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data

ステップ 4: Amazon SageMaker Processing を使用してデータを処理する

このステップでは、Amazon SageMaker Processing を使用して、列のスケーリングやデータセットのトレーニングデータとテストデータへの分割など、データセットを前処理します。Amazon SageMaker Processing を使用すると、完全に管理されたインフラストラクチャで前処理、後処理、およびモデル評価のワークロードを実行できます。

Amazon SageMaker Processing を使用してデータを処理し、機能を生成するには、次のステップを実行します。

注: Amazon SageMaker Processing は、ノートブックとは別のコンピューティングインスタンスで実行されます。これは、処理ジョブの進行中に、ノートブックでコードの実験と実行を継続できることを意味します。これにより、処理ジョブの期間中に稼働しているインスタンスのコストに対して追加料金が発生します。処理ジョブが完了すると、インスタンスは SageMaker によって自動的に終了します。料金の詳細については、Amazon SageMaker の料金をご覧ください。

注: 詳細については、Amazon SageMaker ドキュメントのデータの処理とモデルの評価を参照してください。


a.scikit-learn processing コンテナをインポートします。以下のコードをコピーして新しいコードセルに貼り付け、[Run] (実行) を選択します。

注: Amazon SageMaker は、scikit-learn のマネージドコンテナを提供します。詳細については、scikit-learn を使用したデータの処理とモデルの評価を参照してください。

from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.c4.xlarge',
                                     instance_count=1)

b.次の前処理スクリプトをコピーして新しいセルに貼り付け、[Run] (実行) を選択します。

%%writefile preprocessing.py

import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.exceptions import DataConversionWarning
from sklearn.compose import make_column_transformer

warnings.filterwarnings(action='ignore', category=DataConversionWarning)

if __name__=='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.3)
    parser.add_argument('--random-split', type=int, default=0)
    args, _ = parser.parse_known_args()
    
    print('Received arguments {}'.format(args))

    input_data_path = os.path.join('/opt/ml/processing/input', 'rawdata.csv')
    
    print('Reading input data from {}'.format(input_data_path))
    df = pd.read_csv(input_data_path)
    df.sample(frac=1)
    
    COLS = df.columns
    newcolorder = ['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
    
    split_ratio = args.train_test_split_ratio
    random_state=args.random_split
    
    X_train, X_test, y_train, y_test = train_test_split(df.drop('Label', axis=1), df['Label'], 
                                                        test_size=split_ratio, random_state=random_state)
    
    preprocess = make_column_transformer(
        (['PAY_AMT1'], StandardScaler()),
        (['BILL_AMT1'], MinMaxScaler()),
    remainder='passthrough')
    
    print('Running preprocessing and feature engineering transformations')
    train_features = pd.DataFrame(preprocess.fit_transform(X_train), columns = newcolorder)
    test_features = pd.DataFrame(preprocess.transform(X_test), columns = newcolorder)
    
    # concat to ensure Label column is the first column in dataframe
    train_full = pd.concat([pd.DataFrame(y_train.values, columns=['Label']), train_features], axis=1)
    test_full = pd.concat([pd.DataFrame(y_test.values, columns=['Label']), test_features], axis=1)
    
    print('Train data shape after preprocessing: {}'.format(train_features.shape))
    print('Test data shape after preprocessing: {}'.format(test_features.shape))
    
    train_features_headers_output_path = os.path.join('/opt/ml/processing/train_headers', 'train_data_with_headers.csv')
    
    train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_data.csv')
    
    test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_data.csv')
    
    print('Saving training features to {}'.format(train_features_output_path))
    train_full.to_csv(train_features_output_path, header=False, index=False)
    print("Complete")
    
    print("Save training data with headers to {}".format(train_features_headers_output_path))
    train_full.to_csv(train_features_headers_output_path, index=False)
                 
    print('Saving test features to {}'.format(test_features_output_path))
    test_full.to_csv(test_features_output_path, header=False, index=False)
    print("Complete")

c.次のコードを使用して前処理コードを Amazon S3 バケットにコピーし、[Run] (実行) を選択します。

# Copy the preprocessing code over to the s3 bucket
codeprefix = prefix + '/code'
codeupload = sess.upload_data('preprocessing.py', bucket=rawbucket, key_prefix=codeprefix)
print(codeupload)

d.SageMaker Processing ジョブの完了後にトレーニングデータとテストデータを保存する場所を指定します。Amazon SageMaker Processing は、指定された場所にデータを自動的に保存します。

train_data_location = rawbucket + '/' + traindataprefix
test_data_location = rawbucket+'/'+testdataprefix
print("Training data location = {}".format(train_data_location))
print("Test data location = {}".format(test_data_location))


e.次のコードをコピーして貼り付け、Processing ジョブを開始します。このコードは、sklearn_processor.run を呼び出すことでジョブを開始し、トレーニングおよびテストの出力が保存された場所など、処理ジョブに関するいくつかのオプションのメタデータを抽出します。

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code=codeupload,
                      inputs=[ProcessingInput(
                        source=raw_data_location,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                source='/opt/ml/processing/train',
                               destination='s3://' + train_data_location),
                               ProcessingOutput(output_name='test_data',
                                                source='/opt/ml/processing/test',
                                               destination="s3://"+test_data_location),
                               ProcessingOutput(output_name='train_data_headers',
                                                source='/opt/ml/processing/train_headers',
                                               destination="s3://" + rawbucket + '/' + prefix + '/train_headers')],
                      arguments=['--train-test-split-ratio', '0.2']
                     )

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'train_data':
        preprocessed_training_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'test_data':
        preprocessed_test_data = output['S3Output']['S3Uri']


プロセッサに提供される出力のコード、トレーニング、およびテストデータの場所に留意してください。また、処理スクリプトに提供されている引数にも留意してください。 

ステップ 5: Amazon SageMaker Experiment を作成する

データセットをダウンロードし、Amazon S3 にステージングしました。次に、Amazon SageMaker Experiment を作成できます。実験は、同じ機械学習プロジェクトに関連した処理ジョブとトレーニングジョブの集合です。Amazon SageMaker Experiments は、トレーニングの実行を自動的に管理および追跡します。

新しい実験を作成するには、以下のステップを実行します。

注: 詳細については、Amazon SageMaker ドキュメントの Experiments を参照してください。


a.次のコードをコピーして貼り付け、Build-train-deploy- という名前の実験を作成します。

# Create a SageMaker Experiment
cc_experiment = Experiment.create(
    experiment_name=f"Build-train-deploy-{int(time.time())}", 
    description="Predict credit card default from payments data", 
    sagemaker_boto_client=sm)
print(cc_experiment)

すべてのトレーニングジョブはトライアルとしてログに記録されます。各トライアルは、エンドツーエンドのトレーニングジョブの反復です。トレーニングジョブに加えて、前処理ジョブと後処理ジョブ、およびデータセットやその他のメタデータを追跡することもできます。1 つの実験に複数のトライアルを含めることができるため、Amazon SageMaker Studio Experiments ペイン内で複数の反復を経時的に追跡することが容易になります。


b.次のコードをコピーして貼り付け、Experiments での前処理ジョブとトレーニングパイプラインのステップを追跡します。

# Start Tracking parameters used in the Pre-processing pipeline.
with Tracker.create(display_name="Preprocessing", sagemaker_boto_client=sm) as tracker:
    tracker.log_parameters({
        "train_test_split_ratio": 0.2,
        "random_state":0
    })
    # we can log the s3 uri to the dataset we just uploaded
    tracker.log_input(name="ccdefault-raw-dataset", media_type="s3/uri", value=raw_data_location)
    tracker.log_input(name="ccdefault-train-dataset", media_type="s3/uri", value=train_data_location)
    tracker.log_input(name="ccdefault-test-dataset", media_type="s3/uri", value=test_data_location)

c.実験の詳細を表示します。[Experiments] (実験) ペインで、Build-train-deploy- という名前の実験を右クリックし、[Open in trial components list] (トライアルコンポーネントリストで開く) を選択します。


d.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。次に、コードを詳しく見てみましょう。

XGBoost 分類子をトレーニングするには、最初に Amazon SageMaker によって管理されている XGBoost コンテナをインポートします。次に、[Trial] (トライアル) で実行されたトレーニングをログに記録して、SageMaker Experiments が [Trial] (トライアル) 名でトレーニングを追跡できるようにします。前処理ジョブはパイプラインの一部であるため、同じトライアル名で含まれています。次に、SageMaker Estimator オブジェクトを作成します。このオブジェクトは、選択した基盤となるインスタンスタイプを自動的にプロビジョニングし、処理ジョブで指定された出力場所からトレーニングデータをコピーし、モデルをトレーニングして、モデルアーティファクトを出力します。

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '1.0-1')
s3_input_train = sagemaker.s3_input(s3_data='s3://' + train_data_location, content_type='csv')
preprocessing_trial_component = tracker.trial_component

trial_name = f"cc-default-training-job-{int(time.time())}"
cc_trial = Trial.create(
        trial_name=trial_name, 
            experiment_name=cc_experiment.experiment_name,
        sagemaker_boto_client=sm
    )

cc_trial.add_trial_component(preprocessing_trial_component)
cc_training_job_name = "cc-training-job-{}".format(int(time.time()))

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    train_max_run=86400,
                                    output_path='s3://{}/{}/models'.format(rawbucket, prefix),
                                    sagemaker_session=sess) # set to true for distributed training

xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        verbosity=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit(inputs = {'train':s3_input_train},
       job_name=cc_training_job_name,
        experiment_config={
            "TrialName": cc_trial.trial_name, #log training job in Trials for lineage
            "TrialComponentDisplayName": "Training",
        },
        wait=True,
    )
time.sleep(2)

トレーニングジョブは、完了するまでに約 70 秒かかります。以下のような出力が表示されます。

Completed - Training job completed

e.左側のツールバーで、[Experiment] (実験) を選択します。Build-train-deploy- 実験を右クリックし、[Open in trial components list] (トライアルコンポーネントリストで開く) を選択します。Amazon SageMaker Experiments は、失敗したトレーニング実行を含むすべての実行をキャプチャします。


f.完了した [Training jobs] (トレーニングジョブ) の 1 つを右クリックし、[Open in Trial Details] (トライアルの詳細で開く) を選択して、トレーニングジョブに関連付けられているメタデータを調べます。 

注: 最新の結果を表示するには、ページを更新する必要がある場合があります。 

ステップ 6: オフライン推論用のモデルをデプロイする

前処理ステップで、いくつかのテストデータを生成しました。このステップでは、トレーニングされたモデルからオフラインまたはバッチ推論を生成して、見えないテストデータでのモデルのパフォーマンスを評価します。

オフライン推論用のモデルをデプロイするには、次のステップを実行します。

注: 詳細については、Amazon SageMaker ドキュメントのバッチ変換を参照してください。


a.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。

このステップでは、テストデータセットを Amazon S3 の場所からローカルフォルダーにコピーします。 

test_data_path = 's3://' + test_data_location + '/test_data.csv'
! aws s3 cp $test_data_path .

b.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。

test_full = pd.read_csv('test_data.csv', names = [str(x) for x in range(len(data.columns))])
test_full.head()

c.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。このステップでは、ラベル列を抽出します。

label = test_full['0'] 

d.次のコードをコピーして貼り付け、[Run] (実行) を選択してバッチ変換ジョブを作成します。次に、コードを詳しく見てみましょう。

トレーニングジョブと同様に、SageMaker はすべての基盤となるリソースをプロビジョニングし、トレーニングされたモデルアーティファクトをコピーし、Batch エンドポイントをローカルに設定し、データをコピーし、データに対して推論を実行して、出力を Amazon S3 にプッシュします。input_filter を設定することにより、バッチ変換に、ラベル列であるテストデータの最初の列を無視するように通知していることに留意してください。

%%time

sm_transformer = xgb.transformer(1, 'ml.m5.xlarge', accept = 'text/csv')

# start a transform job
sm_transformer.transform(test_data_path, split_type='Line', input_filter='$[1:]', content_type='text/csv')
sm_transformer.wait()

バッチ変換ジョブが完了するまでに約 4 分かかります。その後、モデルの結果を評価できます。


e.次のコードをコピーして実行し、モデルのメトリクスを評価します。次に、コードを詳しく見てみましょう。

最初に、Amazon S3 バケットから拡張子が .out のファイルに含まれているバッチ変換ジョブの出力をプルする関数を定義します。次に、予測されたラベルをデータフレームに抽出し、このデータフレームに実際のラベルを追加します。

import json
import io
from urllib.parse import urlparse

def get_csv_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')
output = get_csv_output_from_s3(sm_transformer.output_path, 'test_data.csv.out')
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)
output_df['Predicted']=np.round(output_df.values)
output_df['Label'] = label
from sklearn.metrics import confusion_matrix, accuracy_score
confusion_matrix = pd.crosstab(output_df['Predicted'], output_df['Label'], rownames=['Actual'], colnames=['Predicted'], margins = True)
confusion_matrix

画像のような出力が表示されます。これは、[Actual] (実際の) 値と比較した True 値と False 値の [Predicted] (予測された) 値の総数を示しています。 


f.次のコードを使用して、ベースラインモデルの精度とモデルの精度の両方を抽出します。

注: ベースラインの精度に役立つモデルは、デフォルト以外の場合の割合である場合があります。ユーザーがデフォルトしないことを常に予測するモデルは、その精度を備えています。

print("Baseline Accuracy = {}".format(1- np.unique(data['Label'], return_counts=True)[1][1]/(len(data['Label']))))
print("Accuracy Score = {}".format(accuracy_score(label, output_df['Predicted'])))

結果は、単純なモデルがすでにベースラインの精度を上回っていることを示しています。結果を改善するために、ハイパーパラメータを調整できます。SageMaker でハイパーパラメータ最適化 (HPO) を使用して、モデルを自動調整できます。詳細については、ハイパーパラメータ調整のしくみを参照してください。 

注: このチュートリアルには含まれていませんが、トライアルの一部としてバッチ変換を含めることもできます。.transform 関数を呼び出すときは、トレーニングジョブの場合と同じように experiment_config を渡すだけです。Amazon SageMaker は、バッチ変換をトライアルコンポーネントとして自動的に関連付けます。

ステップ 7: モデルをエンドポイントとしてデプロイし、データキャプチャを設定する

このステップでは、モデルを RESTful HTTPS エンドポイントとしてデプロイして、ライブ推論を提供します。Amazon SageMaker は、モデルのホスティングとエンドポイントの作成を自動的に処理します。

モデルをエンドポイントとしてデプロイし、データキャプチャを設定するには、次のステップを実行します。

注: 詳細については、Amazon SageMaker ドキュメントの推論のためのモデルのデプロイを参照してください。


a.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。

from sagemaker.model_monitor import DataCaptureConfig
from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer

sm_client = boto3.client('sagemaker')

latest_training_job = sm_client.list_training_jobs(MaxResults=1,
                                                SortBy='CreationTime',
                                                SortOrder='Descending')

training_job_name=TrainingJobName=latest_training_job['TrainingJobSummaries'][0]['TrainingJobName']

training_job_description = sm_client.describe_training_job(TrainingJobName=training_job_name)

model_data = training_job_description['ModelArtifacts']['S3ModelArtifacts']
container_uri = training_job_description['AlgorithmSpecification']['TrainingImage']

# create a model.
def create_model(role, model_name, container_uri, model_data):
    return sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer={
        'Image': container_uri,
        'ModelDataUrl': model_data,
        },
        ExecutionRoleArn=role)
    

try:
    model = create_model(role, training_job_name, container_uri, model_data)
except Exception as e:
        sm_client.delete_model(ModelName=training_job_name)
        model = create_model(role, training_job_name, container_uri, model_data)
        

print('Model created: '+model['ModelArn'])


b.データ構成設定を指定するには、次のコードをコピーして貼り付け、[Run] (実行) を選択します。

このコードは、エンドポイントが受信した推論ペイロードを 100% キャプチャし、入力と出力の両方をキャプチャし、入力コンテンツタイプを csv として記録するように SageMaker に指示します。

s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(rawbucket, prefix)
data_capture_configuration = {
    "EnableCapture": True,
    "InitialSamplingPercentage": 100,
    "DestinationS3Uri": s3_capture_upload_path,
    "CaptureOptions": [
        { "CaptureMode": "Output" },
        { "CaptureMode": "Input" }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"],
       "JsonContentTypes": ["application/json"]}

c.以下のコードをコピーして貼り付け、[Run] (実行) を選択します。このステップでは、エンドポイント設定を作成し、エンドポイントをデプロイします。コードでは、インスタンスタイプや、すべてのトラフィックをこのエンドポイントに送信するかどうかなどを指定できます。

def create_endpoint_config(model_config, data_capture_config): 
    return sm_client.create_endpoint_config(
                                                EndpointConfigName=model_config,
                                                ProductionVariants=[
                                                        {
                                                            'VariantName': 'AllTraffic',
                                                            'ModelName': model_config,
                                                            'InitialInstanceCount': 1,
                                                            'InstanceType': 'ml.m4.xlarge',
                                                            'InitialVariantWeight': 1.0,
                                                },
                                                    
                                                    ],
                                                DataCaptureConfig=data_capture_config
                                                )




try:
    endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)
except Exception as e:
    sm_client.delete_endpoint_config(EndpointConfigName=endpoint)
    endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)

print('Endpoint configuration created: '+ endpoint_config['EndpointConfigArn'])

d.次のコードをコピーして貼り付け、[Run] (実行) を選択してエンドポイントを作成します。 

# Enable data capture, sampling 100% of the data for now. Next we deploy the endpoint in the correct VPC.

endpoint_name = training_job_name
def create_endpoint(endpoint_name, config_name):
    return sm_client.create_endpoint(
                                    EndpointName=endpoint_name,
                                    EndpointConfigName=training_job_name
                                )


try:
    endpoint = create_endpoint(endpoint_name, endpoint_config)
except Exception as e:
    sm_client.delete_endpoint(EndpointName=endpoint_name)
    endpoint = create_endpoint(endpoint_name, endpoint_config)

print('Endpoint created: '+ endpoint['EndpointArn'])

e.左側のツールバーで、[Endpoints] (エンドポイント) を選択します。[Endpoints] (エンドポイント) リストには、稼働中のすべてのエンドポイントが表示されます。

build-train-deploy エンドポイントが [Creating] (作成中) のステータスを示していることに留意してください。モデルをデプロイするには、Amazon SageMaker は最初にモデルアーティファクトと推論イメージをインスタンスにコピーし、クライアントアプリケーションまたは RESTful API とインターフェイス接続するように HTTPS エンドポイントを設定する必要があります。

エンドポイントが作成されると、ステータスは [InService] (稼働中) に変わります。(エンドポイントの作成には約 5〜10 分かかる場合があります。)  

注: 更新されたステータスを取得するには、[Refresh] (更新) をクリックする必要がある場合があります。


f.JupyterLab ノートブックで、次のコードをコピーして実行し、テストデータセットのサンプルを取得します。このコードは最初の 10 行を取ります。

!head -10 test_data.csv > test_sample.csv

g.次のコードを実行して、いくつかの推論リクエストをこのエンドポイントに送信します。

注: 別のエンドポイント名を指定した場合は、以下の endpoint をエンドポイント名に置き換える必要があります。 

from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer

predictor = RealTimePredictor(endpoint=endpoint_name, content_type = 'text/csv')

with open('test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = predictor.predict(data=payload[2:])
        sleep(0.5)
print('done!')

h.次のコードを実行して、Model Monitor が受信データを正しくキャプチャしていることを確認します。

コードでは、current_endpoint_capture_prefix は、ModelMonitor 出力が格納されているディレクトリパスをキャプチャします。Amazon S3 バケットに移動して、予測リクエストがキャプチャされているかどうかを確認します。この場所は、上記のコードの s3_capture_upload_path と一致する必要があることに留意してください。

# Extract the captured json files.
data_capture_prefix = '{}/monitoring'.format(prefix)
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/datacapture/{}/AllTraffic'.format(data_capture_prefix, endpoint_name)
print(current_endpoint_capture_prefix)
result = s3_client.list_objects(Bucket=rawbucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))


capture_files[0]

キャプチャされた出力は、データキャプチャが設定され、着信リクエストを保存していることを示します。 

注: 最初に Null 応答が表示された場合、データキャプチャを最初に初期化したときに、データが Amazon S3 パスに同期的にロードされていない可能性があります。約 1 分待ってから、もう一度お試しください。


i.次のコードを実行して、json ファイルの 1 つのコンテンツを抽出し、キャプチャされた出力を表示します。 

# View contents of the captured file.
def get_obj_body(bucket, obj_key):
    return s3_client.get_object(Bucket=rawbucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(rawbucket, capture_files[0])
print(json.dumps(json.loads(capture_file.split('\n')[5]), indent = 2, sort_keys =True))

出力は、データキャプチャがモデルの入力ペイロードと出力の両方をキャプチャしていることを示します。 

ステップ 8: SageMaker Model Monitor でエンドポイントをモニタリングする

このステップでは、SageMaker Model Monitor を有効にして、デプロイされたエンドポイントのデータドリフトをモニタリングします。これを行うには、モデルに送信されたペイロードと出力をベースラインと比較し、入力データまたはラベルにドリフトがあるかどうかを判断します。 

モデルのモニタリングを有効にするには、次のステップを実行します。

注: 詳細については、Amazon SageMaker ドキュメントの Amazon SageMaker Model Monitor を参照してください。


a.次のコードを実行して、Amazon S3 バケットにフォルダを作成し、Model Monitor の出力を保存します。

このコードは 2 つのフォルダを作成します。1 つのフォルダには、モデルのトレーニングに使用したベースラインデータが格納されます。2 つ目のフォルダには、そのベースラインからの違反が格納されます。

model_prefix = prefix + "/" + endpoint_name
baseline_prefix = model_prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(rawbucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(rawbucket, baseline_results_prefix)
train_data_header_location = "s3://" + rawbucket + '/' + prefix + '/train_headers'
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))
print(train_data_header_location)


b.次のコードを実行して、Model Monitor のベースラインジョブを設定し、トレーニングデータの統計をキャプチャします。これを行うために、Model Monitor は、Apache Spark 上に構築された deequ ライブラリを使用して、データの単体テストを実行します。 

from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600)

my_default_monitor.suggest_baseline(
    baseline_dataset=os.path.join(train_data_header_location, 'train_data_with_headers.csv'),
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

Model Monitor は、個別のインスタンスを設定し、トレーニングデータをコピーして、いくつかの統計を生成します。このサービスは多くの Apache Spark ログを生成しますが、無視してかまいません。ジョブが完了すると、[Spark job completed] (Spark ジョブの完了) という出力が表示されます。


c.次のコードを実行して、ベースラインジョブによって生成された出力を確認します。

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df

constraints.jsonstatistics.json という 2 つのファイルが表示されます。次に、その内容を詳細に確認します。

上記のコードは、/statistics.json の json 出力を pandas データフレームに変換します。deequ ライブラリが列のデータ型、Null 値または欠落値の有無、および入力データストリームの平均、最小、最大、合計、標準偏差、スケッチパラメータなどの統計パラメータをどのように推測するかに留意してください。 

同様に、constraints.json ファイルは、値の非負性や特徴フィールドのデータ型など、トレーニングデータセットが従ういくつかの制約で構成されます。

constraints.jsonstatistics.json という 2 つのファイルが表示されます。次に、その内容を詳細に確認します。

上記のコードは、/statistics.json の json 出力を pandas データフレームに変換します。deequ ライブラリが列のデータ型、Null 値または欠落値の有無、および入力データストリームの平均、最小、最大、合計、標準偏差、スケッチパラメータなどの統計パラメータをどのように推測するかに留意してください。   

同様に、constraints.json ファイルは、値の非負性や特徴フィールドのデータ型など、トレーニングデータセットが従ういくつかの制約で構成されます。

constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df

d.次のコードを実行して、エンドポイントモニタリングの頻度を設定します。

毎日または毎時を指定できます。このコードは 1 時間ごとの頻度を指定しますが、1 時間ごとの頻度は大量のデータを生成するため、本番アプリケーションではこれを変更することをお勧めします。Model Monitor は、検出したすべての違反で構成されるレポートを作成します。

reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(rawbucket,reports_prefix)
print(s3_report_path)

from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'Built-train-deploy-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,

)

このコードは、出力を CloudWatch に送信するように Model Monitor に指示する Amazon CloudWatch メトリクスを有効にすることに留意してください。このアプローチを使用すると、CloudWatch アラームを使用してアラームをトリガーし、データドリフトが検出されたときにエンジニアまたは管理者に通知できます。 

ステップ 9: SageMaker Model Monitor のパフォーマンスをテストする

このステップでは、いくつかのサンプルデータに照らして Model Monitor を評価します。テストペイロードをそのまま送信する代わりに、テストペイロード内のいくつかの機能の分布を変更して、Model Monitor が変更を検出できることをテストします。

次のステップを実行して、Model Monitor のパフォーマンスをテストします。

注: 詳細については、Amazon SageMaker ドキュメントの Amazon SageMaker Model Monitor を参照してください。


a.次のコードを実行して、テストデータをインポートし、変更されたサンプルデータを生成します。

COLS = data.columns
test_full = pd.read_csv('test_data.csv', names = ['Label'] +['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
)
test_full.head()

b.次のコードを実行して、いくつかの列を変更します。前のステップとの違い (この画像内で赤でマークされています) に留意してください。ラベル列を削除し、変更したサンプルテストデータを保存します。

faketestdata = test_full
faketestdata['EDUCATION'] = -faketestdata['EDUCATION'].astype(float)
faketestdata['BILL_AMT2']= (faketestdata['BILL_AMT2']//10).astype(float)
faketestdata['AGE']= (faketestdata['AGE']-10).astype(float)

faketestdata.head()
faketestdata.drop(columns=['Label']).to_csv('test-data-input-cols.csv', index = None, header=None)

c.次のコードを実行して、この変更されたデータセットでエンドポイントを繰り返し呼び出します。

from threading import Thread

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        for row in f:
            payload = row.rstrip('\n')
            response = runtime_client.invoke_endpoint(EndpointName=ep_name,
                                          ContentType='text/csv', 
                                          Body=payload)
            time.sleep(1)
            
def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint, 'test-data-input-cols.csv', runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()
# Note that you need to stop the kernel to stop the invocations

d.次のコードを実行して、Model Monitor ジョブのステータスを確認します。

desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

[Schedule status: Scheduled] (スケジュールされたステータス: スケジュール済み) という出力が表示されます。


e.次のコードを実行して、モニタリング出力が生成されているかどうかを 10 分ごとに確認します。最初のジョブは約 20 分のバッファで実行される可能性があることに留意してください。 

mon_executions = my_default_monitor.list_executions()
print("We created ahourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(600)
    mon_executions = my_default_monitor.list_executions()

f.Amazon SageMaker Studio の左側のツールバーで、[Endpoints] (エンドポイント) を選択します。build-train-deploy エンドポイントを右クリックし、[Describe Endpoint] (エンドポイントの説明) を選択します。


g.[Monitoring job history] (モニタリングジョブの履歴) を選択します。[Monitoring status] (モニタリングステータス) に [In progress] (進行中) と表示されていることに留意してください。

ジョブが完了すると、[Monitoring status] (モニタリングステータス) に [Issue found] (問題が見つかりました) と表示されます (問題が見つかった場合)。 


h.問題をダブルクリックして詳細を表示します。Model Monitor が、以前変更した EDUCATION フィールドと BILL_AMT2 フィールドで大きなベースラインドリフトを検出したことがわかります。

Model Monitor は、他の 2 つのフィールドのデータ型の違いもいくつか検出しました。トレーニングデータは整数ラベルで構成されていますが、XGBoost モデルは確率スコアを予測します。したがって、Model Monitor は不一致を報告しました。 


i.JupyterLab ノートブックで、次のセルを実行して、Model Monitor からの出力を確認します。

latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

j.次のコードを実行して、Model Monitor によって生成されたレポートを表示します。

report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

statistics.jsonconstraints.json に加えて、constraint_violations.json という名前の新しいファイルが生成されていることがわかります。このファイルの内容は、Amazon SageMaker Studio の上部に表示されました (ステップ g)。


注: データキャプチャを設定すると、Amazon SageMaker Studio は、モニタリングジョブを実行するための上記のコードを含むノートブックを自動的に作成します。ノートブックにアクセスするには、エンドポイントを右クリックして、[Describe Endpoint] (エンドポイントの説明) を選択します。[Monitoring results] (モニタリング結果) タブで、[Enable Monitoring] (モニタリングを有効にする) を選択します。このステップにより、上記で作成したコードを含む Jupyter ノートブックが自動的に開きます。

ステップ 10.クリーンアップ

このステップでは、このラボで使用したリソースを終了させます。

重要: あまり使用されていないリソースを終了することは、コストを削減するためのベストプラクティスです。リソースを終了しないと、お使いのアカウントに料金が発生します。


a.モニタリングスケジュールを削除する: Jupyter ノートブックで以下のコードをコピーして貼り付け、[Run] (実行) を選択します。

注: エンドポイントに関連付けられているすべてのモニタリングジョブが削除されるまで、Model Monitor エンドポイントを削除することはできません。

my_default_monitor.delete_monitoring_schedule()
time.sleep(10) # actually wait for the deletion

b.エンドポイントを削除する: Jupyter ノートブックで以下のコードをコピーして貼り付け、[Run] (実行) を選択します。

注: エンドポイントに関連付けられているすべてのモニタリングジョブを最初に削除したことを確認してください。

sm.delete_endpoint(EndpointName = endpoint_name)

すべてのトレーニングアーティファクト (モデル、前処理済みのデータセットなど) を削除する場合には、以下のコードをコピーしてコードセルに貼り付け、[Run] (実行) を選択します。

注意: ACCOUNT_NUMBER をユーザーのアカウントに置き換えてください。

%%sh
aws s3 rm --recursive s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data

お疲れ様でした。

Amazon SageMaker Studio を使用して、機械学習モデルを作成、トレーニング、デプロイ、およびモニタリングしました。

このチュートリアルは役に立ちましたか?

Amazon SageMaker Studio のツアーを見る

Amazon SageMaker Autopilot を使用して ML モデルを自動的に構築、トレーニング、およびデプロイする方法を学ぶ