Amazon Web Services ブログ

Amazon SageMaker のバッチ変換を使用して予測結果を入力データに関連付ける

大規模なデータセットに対して予測を実行するときは、実行前にいくつかの入力属性を削除することをお勧めします。これは、これらの属性が信号を伝達しない、または機械学習 (ML) モデルのトレーニングに使用するデータセットの一部ではなかったという理由からです。ジョブの完了後、予測結果をすべてまたは一部の入力データに分析用としてマッピングすることにも役立ちます。

たとえば、ID 属性を持つデータセットを考えてみましょう。一般に、観測 ID は特定の ML 問題に対する信号を伝送しないランダムに生成した番号か連続番号です。このため通常、観測 ID はトレーニングデータ属性の一部ではありません。しかし、バッチ予測を行う場合は、出力に観測 ID と予測結果の両方を 1 つのレコードとして含めることもできます。

バッチ変換機能は Amazon SageMakerで、Amazon S3 に格納されているデータセットに対して予測を実行します。以前はバッチ変換ジョブの作成前に入力データをフィルター処理し、ジョブの完了後に予測結果を目的の入力フィールドに結合する必要がありました。Amazon SageMaker のバッチ変換を使えば、予測を実行する前に属性を除外できるようになりました。CSV、テキスト、JSON 形式のデータを使用すると、予測結果を部分的または全体の入力データ属性と結合することもできます。このため、追加の前処理や後処理が不要となり、ML プロセス全体が高速化します。

この投稿では、この新しい機能を使用して Amazon SageMaker のバッチ変換ジョブの入力データをフィルター処理し、予測結果を入力データセットの属性と結合する方法を説明します。

背景

Amazon SageMaker は ML のワークフロー全体を対象にした完全マネージド型サービスです。このサービスは、データにラベルを付けてデータを準備し、アルゴリズムを選択して、モデルのトレーニングを行い、デプロイ用にモデルを調整および最適化を行い、予測を行い、実行します。

Amazon SageMaker はバッチ変換ジョブの開始時に、リソースのプロビジョニングを管理します。ジョブが完了するとリソースを解放するので、ジョブの実行中に使用したリソースに対してのみ支払うことになります。ジョブが完了すると、Amazon SageMaker は指定された S3 バケットに予測結果を保存します。

バッチ変換の例

UCI の乳がん検出のためのパブリックデータセットを使って、特定の腫瘍が悪性 (1) または良性 (0) である可能性があるかどうかを検出するバイナリ分類モデルをトレーニングします。このデータセットには各腫瘍の ID 属性が付属しています。これらはトレーニングと予測の際に除外されます。ただし、バッチ変換ジョブからの各腫瘍の悪性腫瘍について予測した確率を使用して、ID 属性を最終的なアウトプットに戻し、記録します。

手引きとなる Jupyter ノートブックもダウンロードできます。この記事の以下の各セクションはノートブックのセクションに対応していますので、読みながら各ステップのコードを実行してください。

設定

まず、Pandas や NumPy などの ML 用の一般的な Python ライブラリを、後でトレーニングとバッチ変換ジョブの実行に使用する Amazon SageMaker および Boto3 ライブラリとともにインポートします。

S3 バケットを設定し、トレーニングデータ、検証データ、バッチ変換ジョブ実行の対象となるデータセットもアップロードします。Amazon SageMaker はバッチ変換ジョブの出力とモデルアーティファクトをこのバケットに格納します。フォルダー構造を利用して、入力データセットをモデルアーティファクトとジョブ出力から分離します。

import os
import boto3
import sagemaker
import pandas as pd
import numpy as np

role = sagemaker.get_execution_role()
sess = sagemaker.Session()

bucket=sess.default_bucket()
prefix = 'sagemaker/breast-cancer-prediction-xgboost' # place to upload training files within the bucket

データ準備

Notebook インスタンスにパブリックデータセットをダウンロードし、予備分析用サンプルを解析します。この例のデータセットは小さいですが (569 個の観測値と 32 個の列を含みます)、Amazon SageMaker のバッチ変換機能はペタバイト級のデータを持ような大規模なデータセットにも使用できます。

data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data', header = None)

# specify columns extracted from wbdc.names
data.columns = ["id","diagnosis","radius_mean","texture_mean","perimeter_mean","area_mean","smoothness_mean",
                "compactness_mean","concavity_mean","concave points_mean","symmetry_mean","fractal_dimension_mean",
                "radius_se","texture_se","perimeter_se","area_se","smoothness_se","compactness_se","concavity_se",
                "concave points_se","symmetry_se","fractal_dimension_se","radius_worst","texture_worst",
                "perimeter_worst","area_worst","smoothness_worst","compactness_worst","concavity_worst",
                "concave points_worst","symmetry_worst","fractal_dimension_worst"] 

次のテーブルでは、データセットの最初の列が腫瘍 ID、2 番目の列が診断 (M は悪性の場合、B は良性の場合) です。教師あり学習のコンテキストでは、これが目標、あるいは予測する値になります。次の属性は、予測子とも呼ばれる機能です。

id 診断 radius_mean texture_mean perimeter_mean concave points_worst symmetry_worst fractal_dimension_worst
288 8913049 B 11.26 19.96 73.72 0.09314 0.2955 0.07009
375 901303 B 16.17 16.07 106.3 0.1251 0.3153 0.0896
467 9113514 B 9.668 18.1 61.06 0.025 0.3057 0.07875
203 87880 M 13.81 23.75 91.56 0.2013 0.4432 0.1086
148 86973702 B 14.44 15.18 93.97 0.1599 0.2691 0.07683
118 864877 M 15.78 22.91 105.7 0.2034 0.3274 0.1252
224 8813129 B 13.27 17.02 84.55 0.09678 0.2506 0.07623
364 9010877 B 13.4 16.95 85.48 0.06987 0.2741 0.07582

最小限のデータ準備を行った後、データを 3 つのセットに分けます。

  • 元のデータの 80% で構成されるトレーニングセット。
  • モデルの適切な評価を実行するためのアルゴリズムの検証セット。
  • 今は確保しておき、後で新しい入出力結合機能を使ってバッチ変換ジョブを実行するために使用するバッチセット。

モデルをトレーニングして検証するには、radius_meantexture_meanperimeter_mean などのすべての機能を保持しておきます。腫瘍が悪性かどうかを判断するのには関係がないため、id 属性は削除します。

本番でトレーニング済みモデルがある場合は、通常それに対して予測を実行します。そのための 1 つの方法は、Amazon SageMaker ホスティングサービスを利用してリアルタイムでの予測モデルをデプロイすることです。

ただし今回はリアルタイムの予測は必要ありません。代わりに、Amazon S3 の .csv ファイルとして腫瘍のバックログがあります。これは、腫瘍 ID で識別する腫瘍のリストで構成されています。バッチ変換ジョブを使用して、各腫瘍について悪性である可能性を予測します。ここで腫瘍のバックログを作成するには、id 属性を使い、diagnostic 属性を使用せずにバッチセットを作成します。これが、バッチ変換ジョブで予測しようとしているものになります。

次のコード例は、3 つのデータセット間のデータ構成を示しています。

# replace the M/B diagnosis with a 1/0 boolean value
data['diagnosis']=data['diagnosis'].apply(lambda x: ((x =="M"))+0) 

# data split in three sets, training, validation and batch inference
rand_split = np.random.rand(len(data))
train_list = rand_split < 0.8
val_list = (rand_split >= 0.8) & (rand_split < 0.9)
batch_list = rand_split >= 0.9

data_train = data[train_list].drop(['id'],axis=1)
data_val = data[val_list].drop(['id'],axis=1)
data_batch = data[batch_list].drop(['diagnosis'],axis=1)

data_train = data[train_list].drop(['id'],axis=1)
data_val = data[val_list].drop(['id'],axis=1)
data_batch = data[batch_list].drop(['diagnosis'],axis=1)

最後に、これら 3 つのデータセットを S3 にアップロードします。

train_file = 'train_data.csv'
data_train.to_csv(train_file,index=False,header=False)
sess.upload_data(train_file, key_prefix='{}/train'.format(prefix))

validation_file = 'validation_data.csv'
data_val.to_csv(validation_file,index=False,header=False)
sess.upload_data(validation_file, key_prefix='{}/validation'.format(prefix))

batch_file = 'batch_data.csv'
data_batch.to_csv(batch_file,index=False,header=False)
sess.upload_data(batch_file, key_prefix='{}/batch'.format(prefix))  

トレーニングジョブ

Amazon SageMaker XGBoost 組み込みアルゴリズムを使用して、トレーニングデータセットと検証データセットに基づいたバイナリ分類用にモデルをすばやくトレーニングします。トレーニングの目的を binary:logisticに設定し、次のコード例に示すように、観測値が陽性クラス (この例では悪性) に属する確率を出力するよう XGBoost をトレーニングします。

%%time
from time import gmtime, strftime
from sagemaker.amazon.amazon_estimator import get_image_uri

job_name = 'xgb-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = 's3://{}/{}/output/{}'.format(bucket, prefix, job_name)
image = get_image_uri(boto3.Session().region_name, 'xgboost')

sm_estimator = sagemaker.estimator.Estimator(image,
                                             role,
                                             train_instance_count=1,
                                             train_instance_type='ml.m5.4xlarge',
                                             train_volume_size=50,
                                             input_mode='File',
                                             output_path=output_location,
                                             sagemaker_session=sess)

sm_estimator.set_hyperparameters(objective="binary:logistic",
                                 max_depth=5,
                                 eta=0.2,
                                 gamma=4,
                                 min_child_weight=6,
                                 subsample=0.8,
                                 silent=0,
                                 num_round=100)

train_data = sagemaker.session.s3_input('s3://{}/{}/train'.format(bucket, prefix), distribution='FullyReplicated',
                                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input('s3://{}/{}/validation'.format(bucket, prefix), distribution='FullyReplicated',
                                             content_type='text/csv', s3_data_type='S3Prefix')
data_channels = {'train': train_data, 'validation': validation_data}

# Start training by calling the fit method in the estimator
sm_estimator.fit(inputs=data_channels, logs=True)

バッチ変換

Python SDK を使ってバッチ変換ジョブを開始し、バッチデータセットに対して推論を実行して、推論結果を S3 に保存します。

バッチデータセットは最初の列に id 属性を含んでいます。先に説明したように、この属性はトレーニングには使用しなかったので、新しい入力フィルター機能を使う必要があります。入出力結合機能が公開される前だと、バッチ変換ジョブの出力は次のような確率のリストになるはずでした。

0.0226082857698

0.987275004387

0.836603999138

0.00795079022646

0.0182465240359

0.995905399323

0.0129367504269

0.961541593075

0.988895177841

これらの確率を対応する入力の腫瘍にマッピングするには、何らかの事後推論ロジックが必要になったでしょう。バッチ交換フィルターはこの作業を簡素化します。

入力を結合ソースとして指定します。次に、入力フィルター全体を必要としないことを示す出力フィルターを指定します (これには ID の後に 30 個の機能が続きます)。代わりに、腫瘍 ID と悪性であるという可能性を提示したいとします。また、最初の列 (id) と最後の (推論結果) 列のみを表示するとします。

バッチ変換フィルターは JSONPath 式を使用して、必要とする入力データまたは出力データを選択し抽出します。バッチ変換でサポートしている JSONPath 演算子については、こちらのリンクをご参照ください。

JSONPath は JSON データを扱うために開発されました。CSV データで JSONPath を使用するために、CSV 行をゼロから始まるインデックスを持つ JSON 配列と見なします。たとえば、「8810158、B、13.110、22.54、87.02」の行に「$[0,1]」を適用すると、その行の 1 列目と 2 列目の「8810158、B」が返されます。

この例では、入力フィルターは「$[1:]」です。推論を処理する前に列 0 (id) を除外し、列 1 から最後の列までをすべて保持しています

(すべての機能または予測子)。出力フィルターは「$[0,-1]」です。 出力を表示するときは、0 列目 (id) と最後の (-1) 列 (inference_result) のみを保持します。これは、特定の腫瘍が悪性である確率です。

id 以外の入力列を元に戻すことも検討できます。グラウンドトゥルースでの診断を受けている現在の例では、予測の横にある出力ファイルに id 以外の入力列を戻すことができます。こうすれば、並べて比較し、予測を評価することができます。

%%time

sm_transformer = sm_estimator.transformer(1, 'ml.m4.xlarge', assemble_with = 'Line', accept = 'text/csv')

# start a transform job
input_location = 's3://{}/{}/batch/{}'.format(bucket, prefix, batch_file) # use input data with ID column
sm_transformer.transform(input_location, split_type='Line', content_type='text/csv', input_filter='$[1:]', join_source='Input', output_filter='$[0,-1]')
sm_transformer.wait()

結果

出力場所で S3 の CSV 出力を読み込みます。

import json
import io
from urllib.parse import urlparse

def get_csv_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')

output = get_csv_output_from_s3(sm_transformer.output_path, '{}.out'.format(batch_file))
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.sample(10)    

これで、腫瘍 ID が識別した腫瘍のリストと対応する悪性である確率を示すはずです。結ファイルは次のようになります。

844359,0.990931391716 

84458202,0.968179702759 

8510824,0.006071804557 

855138,0.780932843685 

857155,0.0154032697901 

857343,0.0171982143074 

861598,0.540158748627 

86208,0.992102086544 

862261,0.00940885581076 

862989,0.00758415739983 

864292,0.006071804557 

864685,0.0332484431565 

....

結論

この記事では、Amazon SageMaker のバッチ変換機能を使用して、バッチ変換ジョブに入力と出力フィルターを提供する方法を説明しました。これによって、入力データと出力データをそれぞれ前処理または後処理する必要がなくなります。加えて、対応する入力データに予測結果を関連付けることで、入力データの属性すべてまたは一部を柔軟に保持できます。この機能の詳細については、「Amazon SageMaker 開発者ガイド」をご参照ください。


著者について

Ro Mullier はAWS のシニアソリューションアーキテクトです。お客様が AWS でさまざまなアプリケーション、特に機械学習ワークロードを実行する際のサポートを行っています。余暇には、家族や友人と過ごしたり、サッカーをしたり、機械学習に関するコンテストに参加しています。

 

 

 

Han Wang は AWS AI のソフトウェア開発エンジニアです。極めてスケーラブルで並列分散処理による機械学習プラットフォームの開発に取り組んでいます。休日には、映画を観たり、ハイキングをしたり、「ゼルダの伝説: ブレスオブザワイルド」をプレイするのが好きです。