Amazon Web Services ブログ

Amazon SageMaker 推論パイプラインと Scikit-learn を使用して予測を行う前に入力データを前処理する

Amazon SageMaker を使用すると、開発者やデータサイエンティストは大規模な機械学習 (ML) モデルを構築、トレーニング、調整、デプロイすることができます。目に見えないデータのリアルタイム予測またはバッチ予測のためにトレーニング済み ML モデルをデプロイできます。推論と呼ばれるプロセスです。ただし、ほとんどの場合、未加工の入力データは前処理する必要があり、予測で直接使用することはできません。これは、ほとんどの ML モデルが事前に定義された形式のデータを想定しているため、ML モデルでデータを処理するには、まず未加工データをクリーンアップして形式を設定する必要があるためです。

このブログ記事では、入力データの前処理に Amazon SageMaker の組み込みの Scikit-learn ライブラリを使用し、次に予測に Amazon SageMaker の組み込みの Linear Learner (線形回帰) アルゴリズムを使用する方法を説明します。Amazon SageMaker の推論パイプライン機能を使用して、ライブラリとアルゴリズムの両方を同じエンドポイントにデプロイするので、未加工の入力データを直接 Amazon SageMaker に渡すことができます。また、ML のワークフローをモジュール化し、トレーニングと推論の間で前処理コードを再利用して開発のオーバーヘッドやエラーを削減する方法も示します。

ここでの例 (GitHub でも公開されています) では、UCI 機械学習リポジトリからの abalone (アワビ) データセットを使用します。このデータセットには、性別、長さ、直径、高さ、殻の重さ、身の重さ、全体重、内臓の重さ、年齢など、アワビ (貝類の一種) に関するさまざまなデータが含まれています。  アワビの年齢を測定するのは時間がかかる作業であるため、アワビの年齢を予測するモデルを構築することで、物理的測定のみに基づいてアワビの年齢を推定することができ、アワビの年齢を手動で測定する必要がなくなります。

これを実現するために、まず Amazon SageMaker 組み込みの Scikit-learn ライブラリを使って簡単な前処理を行います。未加工のアワビデータに、SimpleImputerStandardScalerOneHotEncoder の変換器を使用します。これらは Scikit-learn の前処理ライブラリに含まれる一般的に使用されるデータ変換器であり、データを ML モデルに必要な形式に処理します。  次に、処理したデータを使用して、Amazon SageMaker の Linear Learner アルゴリズムをトレーニングし、アワビの年齢を予測します。  最後に、Amazon SageMaker の推論パイプラインを使用して、データ処理とモデル予測の手順を組み合わせたパイプラインを作成します。このパイプラインを使用して、最初に前処理され、次に特定のアワビについて予測を行うために使用される単一のエンドポイントに未加工の入力データを渡すことができます。

ステップ 1: SageMaker ノートブックインスタンスを起動して、演習コードを設定する

SageMaker のランディングページの左のペインで [Notebook instances] を選択し、[Create notebook Instance] を選択します。

ノートブックインスタンスに名前を付け、必ず Amazon S3 へのアクセス許可を持つ AWS Identity and Access Management (IAM) ロールを選択します。  このプロジェクトでは Amazon S3 バケットにデータをアップロードする必要があるため、アクセスできるバケットがあることを確認します。  Amazon S3 バケットがない場合は、このガイドに従って作成してください。

他のすべてのフィールドをデフォルトのままにして、[Create notebook Instance] を選択してノートブックインスタンスを起動します (ノートブックの起動には数分かかります)。  ステータスが「InService」になったら、ノートブックインスタンスの準備は完了です。  [Open Jupyter] をクリックして、ノートブック環境を開きます。

ノートブック環境が開いたら、[New ] を選択してから、右上隅の [ conda_python3] を選択して新しい Python ノートブックを作成します。  以降のステップは、ノートブックに完全に含まれています。

ステップ 2: Amazon SageMaker ロールを設定して、データをダウンロードする

まず、トレーニングデータとモデル出力を保存するために Amazon S3 バケットを設定する必要があります。  ENTER BUCKET NAME HERE プレースホルダーを、ステップ 1 のバケットの名前に置き換えます。

# S3 プレフィックス
s3_bucket = '< ENTER BUCKET NAME HERE >'
prefix = 'Scikit-LinearLearner-pipeline-abalone-example'

ここで、Amazon SageMaker が AWS の他の部分と通信できるように、Amazon SageMaker の実行ロールを設定する必要があります。

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# このノートブックインスタンスで使用される SageMaker 互換ロールを取得します。
role = get_execution_role()

最後に、abalone データセットを Amazon SageMaker ノートブックインスタンスにダウンロードします。  これが、この例で使用されているデータセットです:

!wget --directory-prefix=./abalone_data https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

ステップ 3:  Amazon SageMaker の入力データをアップロードする

Amazon SageMaker が後で呼び出すことができるように、データセットを Amazon S3 にアップロードする必要があります。

WORK_DIRECTORY = 'abalone_data'

train_input = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'abalone.csv'),
    bucket=s3_bucket,
    key_prefix='{}/{}'.format(prefix, 'train'))

ステップ 4: 前処理スクリプトを作成する

このステップで説明されているコードは SageMaker インスタンスに既に存在しているので、セクションのコードを実行する必要はありません – 次のステップで既存のスクリプトを呼び出すだけです。  ただし、時間を取って、コードを読んでパイプラインがどのように処理されるかを確認することをお勧めします。

これで、トレーニング済みの Linear Learner モデルに送信される前にデータを前処理するコンテナを作成する準備が整いました。  このコンテナは sklearn_abalone_featurizer.py のスクリプトを実行します。このスクリプトは、トレーニングと予測の両方のために Amazon SageMaker によってインポートされます。引数を解析し、Amazon S3 から未加工の abalone データセットを読み取り、次に数値フィーチャに対して SimpleImputerStandardScaler を、カテゴリカルフィーチャに対して SimpleImputerOneHotEncoder を実行するメインメソッドをエントリポイントとしてトレーニングが実行されます。トレーニングの最後に、このスクリプトは、推論中に使用できるように、フィットした ColumnTransformer を Amazon S3 にシリアル化します。

from __future__ import print_function

import time
import sys
from io import StringIO
import os
import shutil

import argparse
import csv
import json
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

# ヘッダなしの CSV ファイルが得られるので、ここで列名を指定します。
feature_columns_names = [
    'sex', # M、F、および I (幼児)
    'length', # 貝殻の最長の長さ
    'diameter', # 長さに対して垂直方向
    'height', # 身が貝殻に入った重さ
    'whole_weight', # アワビ全体の重さ
    'shucked_weight', # 身の重さ
    'viscera_weight', # 内蔵の重さ (血を抜いた後)
    'shell_weight'] # 乾燥後

label_column = 'rings'

feature_columns_dtype = {
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64}

label_column_dtype = {'rings': np.float64} # +1.5 gives the age in years

def merge_two_dicts(x, y):
    z = x.copy()   # x のキーと値から始める
    z.update(y)    # y のキーと値で z を修正し、「None」を返す
    return z

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    # Sagemaker 固有の引数。環境変数にはデフォルト値が設定されています。
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # 一連のファイルを 1 つの pandas データフレームに読み込む
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [ pd.read_csv(
        file,
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
    concat_data = pd.concat(raw_data)

    # 以下のフィーチャを使って分類子をトレーニングします。
    # 数値フィーチャ:
    # - length:  貝殻の最長の長さ
    # - diameter: 長さに対して垂直方向の直径
    # - height:  身が貝殻に入った重さ
    # - whole_weight: アワビ全体の重さ
    # - shucked_weight: 身の重さ
    # - viscera_weight: 内蔵の重さ (血を抜いた後)
    # - shell_weight: 乾燥後の重さ
    # カテゴリカルフィーチャ:
    # - sex: カテゴリは文字列 {'M', 'F', 'I'} としてエンコードされます。ここで 'I' は幼児です。
    numeric_features = list(feature_columns_names)
    numeric_features.remove('sex')
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_features = ['sex']
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)],
        remainder="drop")

    preprocessor.fit(concat_data)

    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))

スクリプトにある次のメソッドは推論中に使用されます。input_fn および output_fn メソッドは、データペイロードを解析してレスポンスの形式を再設定するために Amazon SageMaker によって使用されます。この例では、入力メソッドは content-type として「text / csv」しか受け入れませんが、他の入力形式を受け入れるように簡単に変更することができます。また、input_fn 関数は、渡された csv の長さもチェックして、ラベルを含むトレーニングデータまたは予測データを前処理するかどうかを決定します。デフォルトでは推論パイプラインはコンテナ間で JSON を想定しているため、出力メソッドは JSON 形式で返しますが、他の出力形式を追加するように変更することができます。

def input_fn(input_data, content_type):
    """入力データペイロードを解析する

    現在は、入力として csv だけを取ります。ラベル付きデータとラベルなしデータの両方を処理する必要があるため、
    提供された列の数を調べることによって
    ラベル列が存在するかどうかを最初に判断します。
    """
    if content_type == 'text/csv':
        # 未加工の入力データを CSV として読み込みます。
        df = pd.read_csv(StringIO(input_data),
                         header=None)

        if len(df.columns) == len(feature_columns_names) + 1:
            # これはラベル付きの例で、リングラベルを含みます。
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # これは、ラベルなしの例です。
            df.columns = feature_columns_names

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """予測出力の形式を設定する

    シリアル推論のコンテナ間のデフォルトの accept / content-type は JSON です。
    また、ContentType または mimetype を accept と同じ値に設定して、
    次のコンテナがレスポンスペイロードを正しく読み取ることができるようにします。
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))

predict_fn は、input_fn によって解析された入力データと、model_fn からの逆シリアル化されたモデル (次に詳しく説明) を受け取り、ソースデータを変換します。ソースデータにラベルがある場合、スクリプトはラベルも追加します。これは、トレーニングデータの前処理の場合です。

def predict_fn(input_data, model):
    """入力データを前処理する

    デフォルトの predict_fn は .predict() を使用しているのでこれを実装しますが、
    モデルはプリプロセッサなので .transform() を使用します。

    出力は以下の順序で返されます:

        ホットエンコードまたは標準化された、残りのフィーチャ
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # ラベル (最初の列として) と一連のフィーチャを返します。
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # 一連のフィーチャだけを返す
        return features

model_fn はシリアル化された モデルの場所を取得し、逆シリアル化されたモデルを Amazon SageMaker に返します。メソッドの定義は、トレーニングで実装されているシリアル化メソッドと密接に関連しているため、これがデフォルトを持たない唯一のメソッドであることに注意してください。この例では、Scikit-learn に含まれている joblib ライブラリを使用します。

def model_fn(model_dir):
    """フィットしたモデルを逆シリアル化する
    """
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

ステップ 5: データプリプロセッサをフィットさせる

これで、ステップ 4 で定義したスクリプトを使ってプリプロセッサを作成しました。  これにより、未加工データをモデルに送信し、処理したデータを出力することができます。  これを行うために、コンストラクター引数をいくつか受け入れる SKLearn 推定子を定義します。

  • entry_point: Amazon SageMaker がトレーニングと予測のために実行する Python スクリプトへのパス (これはステップ 4 で定義したスクリプトです)。
  • role: ロール Amazon Resource Name (ARN)。
  • train_instance_type (オプション): トレーニング用の Amazon SageMaker インスタンスのタイプ。注意: Scikit-learn は GPU トレーニングをネイティブにサポートしていないため、Amazon SageMaker の Scikit-learn は現在 GPU インスタンスタイプのトレーニングをサポートしていません。
  • sagemaker_session (オプション): Amazon SageMaker でトレーニングをするために使用されるセッション。
from sagemaker.sklearn.estimator import SKLearn

script_path = '/home/ec2-user/sample-notebooks/sagemaker-python-sdk/scikit_learn_inference_pipeline/sklearn_abalone_featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session)

sklearn_preprocessor.fit({'train': train_input})

プリプロセッサが作成されるまで数分 (最大 5 分) かかります。  プリプロセッサの準備が整ったら、未加工データをプリプロセッサに送信し、処理したアワビのデータを Amazon S3 に保存し直すことができます。次のステップでこれを行います。

ステップ 6: トレーニングデータをバッチ変換する

これでプリプロセッサの準備が整ったので、これを使用して、未加工データをトレーニング用の前処理済みデータにバッチ変換することができます。  これを行うには、変換器を作成し、Amazon S3 にある未加工データを指すようにします。

# トレーニングされた SKLearn 推定子から SKLearn 変換器を定義する
transformer = sklearn_preprocessor.transformer(
    instance_count=1,
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')
# トレーニング入力を前処理する
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

変換が完了すると、変換されたデータは Amazon S3 に保存されます。  前処理されたデータの場所は、preprocessed_train 変数の値を調べることで見つけることができます。

ステップ 7: Linear Learner モデルを前処理済みデータでフィットさせる

前処理コンテナを構築して未加工データを処理したので、処理したデータの送信先となるモデルコンテナを作成する必要があります。  このコンテナは処理されたデータセットを入力し、(処理された) フィーチャの値に基づいて特定のアワビの年齢を予測するためのモデルをトレーニングします。  予測モデルには、Amazon SageMaker の Linear Learnerを使用します。

最初に、Python SDK のヘルパー関数を使って Linear Learner イメージの場所を定義します。

import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri
ll_image = get_image_uri(boto3.Session().region_name, 'linear-learner')

これでモデルをフィットさせることができます。  モデルをフィットさせるには、次の 4 つの主な手順があります。

  1. モデルの結果を保存する Amazon S3 の場所を定義します。
  2. Linear Learner 推定子を作成します。
  3. フィーチャの数、予測子のタイプ、ミニバッチサイズなどの推定子ハイパーパラメータを設定します。
  4. 変換したデータの場所の変数を定義し (ステップ 6 から)、それを使用して Linear Learner モデルをトレーニングします。
s3_ll_output_key_prefix = "ll_training_output"
s3_ll_output_location = 's3://{}/{}/{}/{}'.format(s3_bucket, prefix, s3_ll_output_key_prefix, 'll_model')

ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role,
    train_instance_count=1,
    train_instance_type='ml.m4.2xlarge',
    train_volume_size = 20,
    train_max_run = 3600,
    input_mode= 'File',
    output_path=s3_ll_output_location,
    sagemaker_session=sagemaker_session)

ll_estimator.set_hyperparameters(feature_dim=10, predictor_type='regressor', mini_batch_size=32)

ll_train_data = sagemaker.session.s3_input(
    preprocessed_train,
    distribution='FullyReplicated',
    content_type='text/csv',
    s3_data_type='S3Prefix')

data_channels = {'train': ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

前のトレーニングジョブと同様に、推定子モデルをフィットさせるのに数分 (最大 5 分) かかります。

ステップ 8: 推論パイプラインを作成する

ステップ 5 で、入力データを受け取り、フィーチャを前処理する推論プリプロセッサを作成しました。  ここでは、このプリプロセッサとステップ 7 の Linear Learner モデルを組み合わせて、未加工データを処理し、予測のために予測モデルに送信する推論パイプラインを作成します。  パイプラインの設定は簡単です。モデルを定義して名前を割り当てた後は、前処理モデルと予測モデルを指す「PipelineMode」を作成するだけです。  次に、パイプラインモデルを単一のエンドポイントにデプロイします。

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name,
    role=role,
    models=[
        scikit_learn_inferencee_model,
        linear_learner_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

エンドポイントを作成すると、いよいよパイプラインは使用準備完了です。

ステップ 9: 推論パイプラインを使って予測する

予測のためにデータを送信することで、パイプラインをテストすることができます。  パイプラインは未加工データを受け取り、ステップ 3 と 4 で作成したプリプロセッサを使用して変換し、ステップ 7 で作成した Linear Learner モデルを使用して予測を行います。

まず、パイプラインを介して送信するデータを含む「payload」変数を定義します。  次に、パイプラインエンドポイントを使用して予測子を定義し、ペイロードを予測子に送信して、モデル予測を出力します。

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = 'M, 0.44, 0.365, 0.125, 0.516, 0.2155, 0.114, 0.155'
actual_rings = 10

predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload))

このモデルは、ペイロードに定義されているアワビの年齢を 9.53 歳と予測しています。未加工データをパイプラインに送信したことに注意してください。パイプラインはスコアリングのためにこのデータを線形モデルに送信する前に前処理しています。

ステップ 10: エンドポイントを削除する

終了したら、この例で使用されているエンドポイントを削除できます。

sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

結論

このブログ記事では、Amazon SageMaker と組み込みの Scikit-learn ライブラリを使用して未加工データを処理する ML パイプラインを構築しました。Amazon SageMaker 組み込みの Linear Learner アルゴリズムを使用して、処理済みデータで ML モデルをトレーニングし、トレーニング済みモデルを使用して予測を行いました。  これにより、毎回中間データ処理手順を繰り返すことなく、未加工データをパイプラインに渡して Amazon S3 でモデル予測を取得することができます。

引用

Dua, D. and Karra Taniskidou, E.(2017).UCI Machine Learning Repository.Irvine, CA: University of California, School of Information and Computer Science.


著者について

Matt McKenna は、Amazon Alexa での機械学習に重点を置いているデータサイエンティストであり、現実世界の問題を解決するために統計および機械学習の方法を適用することに情熱を傾けています。余暇には、ギターの演奏、ランニング、ビール作り、ボストンのスポーツチームの応援を楽しんでいます。

 

 

 

Eric Kim は、Amazon AI ラボのアルゴリズムおよびプラットフォームグループのエンジニアです。彼は、AWS のサービスである SageMaker のサポートを手伝っており、機械学習の研究、開発、応用の経験があります。仕事以外では、彼は熱心な音楽愛好家であり、すべての犬が大好きです。

 

 

 

Urvashi Chowdhary は、Amazon SageMaker 担当のシニアプロダクトマネージャーです。お客様に寄り添い、また、機械学習をさらに使いやすいものにしていくことに情熱を傾けています。余暇には、セーリング、パドルボード、カヤックを楽しんでいます。