Amazon Web Services ブログ

Amazon SageMaker のトレーニングと推論の間でデータ処理コードの一貫性を確保する

このブログ記事では、推論パイプラインを紹介します。これは、推論リクエストごとに実行される一連の手順を指定できる、Amazon SageMaker の新機能です。この機能を使用すると、同じコードの 2 つの別のコピーを保持する必要なしで、推論中のトレーニングで適用されたデータ処理手順を再利用できます。これにより、予測が正確になり、開発のオーバーヘッドを削減できます。ここでの例では、Apache Spark MLlib で変換器を使用してトレーニングと推論の入力データを前処理し、Amazon SageMaker の XGBoost アルゴリズムを使用して自動車の状態を予測する機械学習モデルをトレーニングします。

概要

データサイエンティストや開発者は、機械学習 (ML) モデルをトレーニングする前に、データのクリーニングと準備に多くの時間を費やしています。これは、現実のデータを直接使用することができないためです。値が欠落していたり、情報が重複していたり、標準化する必要がある同じ情報の複数のバリエーションがあったりするからです。さらに多くの場合、機械学習アルゴリズムで使用できるために、データをある形式から別の形式に変換する必要があります。たとえば、XGBoost アルゴリズムは数値データしか受け入れないため、入力データが文字列またはカテゴリ形式の場合は、使用する前に数値形式に変換する必要があります。他には、複数の入力の特徴を単一の特徴に組み合わせることで、より正確な機械学習モデルとなります。たとえば、気温と湿度を組み合わせて飛行遅延を予測すると、より正確なモデルが作成されます。

機械学習モデルを本稼働にデプロイして新しいデータを予測する場合 (推論と呼ばれるプロセス)、トレーニングで使用されたのと同じデータ処理手順がそれぞれの推論リクエストにも適用されるようにする必要があります。そうしないと、誤った予測結果となる可能性があります。今までは、トレーニングと推論に使用するために同じデータ処理手順の 2 つのコピーを維持し、それらが常に同期していることを確認する必要がありました。また、データ処理手順を、機械学習モデルへのリクエストを行うアプリケーションコードと組み合わせるか、推論ロジックに組み込む必要がありました。その結果、開発のオーバーヘッドと複雑さが必要以上に高くなり、迅速に繰り返す能力が制限されていました。

現在は、Amazon SageMaker に推論パイプラインを作成することで、推論中のトレーニングと同じデータ処理手順を再利用できます。推論パイプラインを使用すると、最大 5 つのデータ処理および推論の手順を指定できます。これらの手順は、全ての予測リクエストに対して実行されます。トレーニングのデータ処理手順を再利用できるので、データ処理コードのコピーを 1 つだけ管理し、クライアントアプリケーションや推論ロジックを更新することなくデータ処理手順を個別に更新することができます。

Amazon SageMaker は、推論パイプラインの作成方法に柔軟性をもたらします。データ処理手順では、Scikit-Learn および Apache SparkMLlib で利用可能な組み込みのデータ変換器を使用して、一般的なユースケースのためにデータをある形式から別の形式に処理および変換するか、カスタムの変換器を作成することができます。推論では、Amazon SageMaker で利用可能な組み込みの機械学習アルゴリズムとフレームワークを使用することもできますし、カスタムのトレーニングモデルを使用することもできます。リアルタイム推論とバッチ推論で同じ推論パイプラインを使用できます。推論パイプラインのすべての手順が同じインスタンスで実行されるため、レイテンシーによる影響は最小限になります。

この例では、AWS Glue を使用するデータ処理に Apache Spark MLLib を使用し、推論中にデータ処理コードを再利用します。UCI の Machine Learning RepositoryCar Evaluation データセットを使用します。目標は、unaccaccgoodvgoodの値の中から、特定の車の容認可能性を予測することです。根本的には分類問題であり、Amazon SageMaker の組み込みの XGBoost アルゴリズムを使って、機械学習モデルをトレーニングします。ただし、このデータセットには 6 つのカテゴリの特徴に関する文字列、buyingmaintdoorspersonslug_bootsafety しか含まれておらず、XGBoost は数値形式のデータしか処理できません。そのため、SparkML StringIndexer とそれに続いて OneHotEncoder を使用して入力データを前処理し、数値形式に変換します。また、IndexToString を使用して、予測結果に後処理手順を適用し、推論の出力を自動車の予測状態に対応する元のラベルに変換します。

前処理スクリプトと後処理スクリプトを 1 回作成し、AWS Glue を使用してトレーニングデータを処理するためにそれらのスクリプトを適用します。 次に、機械学習パイプライン用の一般的なシリアライゼーション形式および実行エンジンである MLeap を使用して、AWS Glue によって生成されたこれらのアーティファクトをシリアル化し、Amazon S3 にキャプチャします。これは、Amazon SageMaker が提供する SparkML Serving コンテナを使用して、リアルタイムリクエストの推論中に前処理手順を再利用できるようにするためです。最後に、前処理、推論、後処理の各手順を推論パイプラインにデプロイし、リアルタイム推論リクエストごとにこれらの手順を実行します。

次の図は、これから実行する手順をまとめたものです。

次の図は、リアルタイム推論のためのエンドポイントに推論パイプラインがどのようにデプロイされるかを示しています。同じ推論パイプラインは、バッチリクエストを処理するためのバッチ変換ジョブでも使用できます。

ノートブックインスタンスの起動、およびノートブックのダウンロード

この例では、AWS エコシステム内の 2 つの補完的なワークフローを示します: 1 つ目は AWS マネジメントコンソールを使用し、2 つ目は Amazon SageMaker ノートブックインスタンスで Boto3 と Jupyterノートブックを使用します。どちらのワークフローも、Jupyter ノートブック内で開始され、セットアップのスピードアップに役立ちます。これにより、必要なファイルをアカウントの Amazon S3 バケットに配置し、必要な AWS Identity and Access Management (IAM) ロールを設定して、Amazon SageMaker と AWS Glue がデータへの必要なアクセス権を持つようにします。また、推論パイプラインのデプロイに高レベルの Python SDK を使用して、このを参照することもできます。SparkML の代わりに Scikit-Learn を使用したい場合は、このを参考にすることができます。

[Services]、[Machine Learning] の下の [Amazon SageMaker] の順に選択して、コンソールで Amazon SageMaker に移動することから始めます。この機能は Amazon SageMaker を使えるどのリージョンでも使用できますが、この例では、右上の [Region] が Oregon に設定されていることを確認してください。Amazon S3 バケットと、使用しているサービスの両方が同じリージョンにあることを確認する必要があります。Amazon SageMaker コンソールの [Notebook] で、Notebook instances を選択します。ここで、[Create notebook instance] を選択します。

新しいノートブックインスタンスに名前を付ける必要があります。「processing example」という名前を付けます。他のほとんどの設定と同様に、この例ではデフォルトのインスタンスサイズで十分です。ただし、Amazon SageMaker の機能を実行するには、引き続き IAM ロールを作成する必要があります。[IAM role] で、[Create a new role] を選択します。

新しい IAM ロールを作成するときは、[S3 buckets you specify] に対して [None] を指定することができます。これは、この例では名前の一部として名前 sagemaker を使用して S3 バケットを作成し、デフォルトのロールがこのバケットにアクセスできるようにするためです。[Create role] を選択します。

ノートブックインスタンスの設定は、以下のようになります。

[Create notebook instance] を選択します。

数分後、ノートブックインスタンスの準備が整います。ステータスが InService に設定されたら、Open Jupyter のリンクを選択します。

ノートブックがロードされたら、SageMaker の例のラベルが付いているタブを開き、[Advanced Functionality] ヘッダーを選択します。inference_pipeline_sparkml_xgboost_car_evaluation という名前のフォルダを選択し、.ipynb ノートブックの横にある [Use] オプションを選択します。これによってノートブックのコピーが作成され、Jupyter ノートブックインターフェイスで開きます。

ファイルとロールの準備

ノートブックまたはコンソールのどちらで、この例に従うかにかかわらず、初期設定がいくつかあります。これは、ノートブックで行う方がより便利です。AWS 環境が適切に設定されたら、ノートブックまたはコンソールのどちらでも自由に行うことができます。

まず、アカウント内で S3 バケットを設定し、必要なファイルをこのバケットにアップロードする必要があります。バケットを設定するには、「Setup S3 bucket」というラベルが付いた最初のコードブロックを実行します。コードセルが選択されている間にセルを実行するには、Shift キーと Return キーを同時に押すか、Jupyter ノートブックの上部にある [Run] ボタンをクリックします。

ここで作成された S3 バケット名をメモします。コンソールで先へ進むつもりなら、後でこの名前が必要になります。

ここで、未加工データと AWS Glue 処理スクリプトを Amazon S3 にアップロードする必要があります。これは、ノートブックの「Upload files to S3」というラベルが付いたコードブロックを実行することで実現できます。最初のコードブロックはノートブックインスタンスにファイルをダウンロードし、2 番目のコードブロックは S3 の関連するバケットにファイルをアップロードします。

これで、S3 バケットはここでの例のために設定されています。

AWS Glue での Apache Spark を使用した前処理

ダウンロードしたデータを見ると、すべてのフィールドが文字列形式のカテゴリデータであり、XGBoost はネイティブでは処理できないことが分かります。Amazon SageMaker の XGBoost を利用するには、データをホットエンコードされた 1 列の並びに前処理する必要があります。Apache Spark が、必要な前処理パイプライン機能を提供します。

さらに、エンドポイントを特に便利にするために、このスクリプトでポストプロセッサも生成します。これにより、ラベルインデックスを元のラベルに戻すことができます。これらのプロセッサのアーティファクトはすべて、後で Amazon SageMaker で使用するために S3 に保存されます。

この例では、pre-processor.py スクリプトをダウンロードします。時間をかけて、Spark パイプラインの処理方法を検討することをお勧めします。それでは、Spark パイプラインを定義してフィットさせるコードの関連部分を見てみましょう。

    # ターゲットラベル
    catIndexer = StringIndexer(inputCol="cat", outputCol="label")
    
    labelIndexModel = catIndexer.fit(train)
    train = labelIndexModel.transform(train)
    
    converter = IndexToString(inputCol="label", outputCol="cat")

    # インデックスラベル、メタデータをラベル列に追加。
    # インデックスにすべてのラベルを含めるためにデータセット全体にフィットさせる。
    buyingIndexer = StringIndexer(inputCol="buying", outputCol="indexedBuying")
    maintIndexer = StringIndexer(inputCol="maint", outputCol="indexedMaint")
    doorsIndexer = StringIndexer(inputCol="doors", outputCol="indexedDoors")
    personsIndexer = StringIndexer(inputCol="persons", outputCol="indexedPersons")
    lug_bootIndexer = StringIndexer(inputCol="lug_boot", outputCol="indexedLug_boot")
    safetyIndexer = StringIndexer(inputCol="safety", outputCol="indexedSafety")
    

    # インデックス付き機能に関する 1 つのホットエンコーダ
    buyingEncoder = OneHotEncoder(inputCol="indexedBuying", outputCol="buyingVec")
    maintEncoder = OneHotEncoder(inputCol="indexedMaint", outputCol="maintVec")
    doorsEncoder = OneHotEncoder(inputCol="indexedDoors", outputCol="doorsVec")
    personsEncoder = OneHotEncoder(inputCol="indexedPersons", outputCol="personsVec")
    lug_bootEncoder = OneHotEncoder(inputCol="indexedLug_boot", outputCol="lug_bootVec")
    safetyEncoder = OneHotEncoder(inputCol="indexedSafety", outputCol="safetyVec")

    # ベクトル構造化データを作成する (label,features(vector))
    assembler = VectorAssembler(inputCols=["buyingVec", "maintVec", "doorsVec", "personsVec", "lug_bootVec", "safetyVec"], outputCol="features")

    # パイプラインのチェーンフィーチャライザー
    pipeline = Pipeline(stages=[buyingIndexer, maintIndexer, doorsIndexer, personsIndexer, lug_bootIndexer, safetyIndexer, buyingEncoder, maintEncoder, doorsEncoder, personsEncoder, lug_bootEncoder, safetyEncoder, assembler])

    # モデルをトレーニングする。 これはインデクサーも実行する。
    model = pipeline.fit(train)

このスニペットは、プリプロセッサとポストプロセッサの両方を定義しています。プリプロセッサはカテゴリラベルのすべてのトレーニング列をホットエンコードされた 1 列のベクトルに変換し、ポストプロセッサはラベルのインデックスを人間が読める文字列に戻します。

また、Spark パイプラインのアーティファクトをシリアル化して MLeap 形式で保存できるようにするコードを調べることも役に立つかもしれません。Spark フレームワークはバッチユースケースを中心に設計されているため、ここでは MLeap を使用する必要があります。MLeap は、Spark ML パイプラインをシリアル化し、リアルタイムで低レイテンシーのユースケースにデプロイするためのランタイムを提供します。Amazon SageMaker は、推論に使いやすくするために MLEAP を使用する SparkML Serving コンテナを開始しました。以下のコードを見てみましょう。

    # MLeap によるシリアル化と保存  
    SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", predictions)
    
    # SageMaker は .tar.gz ファイルを想定しているが、MLeap は .zip ファイルを生成するので、解凍する。
    import zipfile
    with zipfile.ZipFile("/tmp/model.zip") as zf:
        zf.extractall("/tmp/model")

    # コンテンツを .tar.gz ファイルとして書き戻す
    import tarfile
    with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
        tar.add("/tmp/model/bundle.json", arcname='bundle.json')
        tar.add("/tmp/model/root", arcname='root')

    s3 = boto3.resource('s3')
    file_name = args['s3_model_bucket_prefix'] + '/' + 'model.tar.gz'
    s3.Bucket(args['s3_model_bucket']).upload_file('/tmp/model.tar.gz', file_name)

    os.remove('/tmp/model.zip')
    os.remove('/tmp/model.tar.gz')
    shutil.rmtree('/tmp/model')
    
    # ポストプロセッサを保存する
    SimpleSparkSerializer().serializeToBundle(converter, "jar:file:/tmp/postprocess.zip", predictions)

    with zipfile.ZipFile("/tmp/postprocess.zip") as zf:
        zf.extractall("/tmp/postprocess")

    # コンテンツを .tar.gz ファイルとして書き戻す
    import tarfile
    with tarfile.open("/tmp/postprocess.tar.gz", "w:gz") as tar:
        tar.add("/tmp/postprocess/bundle.json", arcname='bundle.json')
        tar.add("/tmp/postprocess/root", arcname='root')

    file_name = args['s3_model_bucket_prefix'] + '/' + 'postprocess.tar.gz'
    s3.Bucket(args['s3_model_bucket']).upload_file('/tmp/postprocess.tar.gz', file_name)

    os.remove('/tmp/postprocess.zip')
    os.remove('/tmp/postprocess.tar.gz')
    shutil.rmtree('/tmp/postprocess')

このアーカイブを解凍して、Amazon SageMaker が認識できる tar.gz ファイルに再アーカイブすることに気付くでしょう。

Amazon SageMaker で Spark パイプラインを実行するために、ノートブックインスタンスを利用します。Amazon SageMaker ノートブックで、「AWS Glue 前処理ジョブの作成および実行」というラベルが付いたセルを実行できます。これは次のようになります。

### AWS Glue 前処理ジョブの作成および実行

# AWS Glue でジョブを定義する
glue = boto3.client('glue')

try:
    glue.get_job(JobName='preprocessing-cars')
    print("Job already exists, continuing...")
except glue.exceptions.EntityNotFoundException:
    response = glue.create_job(
        Name='preprocessing-cars',
        Role=role,
        Command={
            'Name': 'glueetl',
            'ScriptLocation': 's3://{}/scripts/preprocessor.py'.format(bucket_name)
        },
        DefaultArguments={
            '--s3_input_data_location': 's3://{}/data/car.data'.format(bucket_name),
            '--s3_model_bucket_prefix': 'model',
            '--s3_model_bucket': bucket_name,
            '--s3_output_bucket': bucket_name,
            '--s3_output_bucket_prefix': 'output',
            '--extra-py-files': 's3://{}/scripts/python.zip'.format(bucket_name),
            '--extra-jars': 's3://{}/scripts/mleap_spark_assembly.jar'.format(bucket_name)
        }
    )

    print('{}\n'.format(response))

# AWS Glue でジョブを実行する
try:
    job_name='preprocessing-cars'
    response = glue.start_job_run(JobName=job_name)
    job_run_id = response['JobRunId']
    print('{}\n'.format(response))
except glue.exceptions.ConcurrentRunsExceededException:
    print("Job run already in progress, continuing...")

    
# ジョブのステータスを確認する
import time

job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

このセルは AWS Glue でジョブを定義し、そのジョブを実行し、そのジョブが完了するまでステータスを監視します。

要約すると、すべての文字列値に対して 1 つのホットエンコーディングを使用して、データをトレーニングおよび検証のセットに前処理しました。また、これらのパイプラインを後でエンドポイントで再利用できるように、プリプロセッサとポストプロセッサを MLeap 形式にシリアル化しました。次の手順は、機械学習モデルをトレーニングすることです。これには、Amazon SageMaker の組み込み XGBoost を使用します。

Amazon SageMaker XGBoost モデルのトレーニング

XGBoost が認識できる形式でデータを前処理したので、今度は簡単なトレーニングジョブを実行してデータに基づいて分類子モデルをトレーニングできます。これは、以下の設定でコンソールから行うことができます: [Job name] を「xgboost-cars」に設定します (同じジョブ名を以前に実行したことがある場合は、これに固有の文字を追加する必要があります)。ノートブックインスタンスに対して、上記で作成した IAM ロールを選択します。[Algorithm source] については、「Amazon SageMaker built-in algorithm」を選択し、[Algorithm] で「XGBoost」を選択します。

[Hyperparameters] で、 early_stopping_rounds5num_rounds10 に設定し、objectivemulti:softmaxnum_class4eval_metricmlogloss に変更します。これにより、AWS Glue で前処理されたデータを処理する分類モデルを実行するように XGBoost が設定されます。

 

[Input data configuration] については、Channel nametrain のままにし、Content type では csv を入力します。Compression typeNoneRecord wrapperNoneS3 data typeS3PrefixS3 data distribution typeFullyReplicated とします。最後に、S3 locations3://<your-bucket-name>/output/train とする必要があります。

[Add channel] を選択し、検証セットに対してこの入力を繰り返します。Channel namevalidation と設定し、Content type では csv を入力します。Compression typeNoneRecord wrapperNoneS3 data typeS3PrefixS3 data distribution typeFullyReplicated とします。最後に、S3 locations3://<your-bucket-name>/output/validation とする必要があります。

最後に、[Output data configuration] では、S3 output paths3://<your-bucket-name>/xgb と設定します。

[Create training job] を選択します。

あるいは、このプロセス全体を Jupyter ノートブックで実行することもできます。Run Amazon SageMaker XGBoost トレーニングジョブの実行というラベルが付いている、以下のセルを実行します。

### Amazon SageMaker XGBoost トレーニングジョブの実行

from sagemaker.amazon.amazon_estimator import get_image_uri

import random
import string

# 現在のリージョンの XGBoost コンテナイメージを取得する
training_image = get_image_uri(region, 'xgboost', repo_version="latest")

# 一意のトレーニングジョブ名を作成する
training_job_name = 'xgboost-cars-'+''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))

# Amazon SageMaker でトレーニングジョブを作成する
sagemaker = boto3.client('sagemaker')
response = sagemaker.create_training_job(
    TrainingJobName=training_job_name,
    HyperParameters={
        'early_stopping_rounds ': '5',
        'num_round': '10',
        'objective': 'multi:softmax',
        'num_class': '4',
        'eval_metric': 'mlogloss'

    },
    AlgorithmSpecification={
        'TrainingImage': training_image,
        'TrainingInputMode': 'File',
    },
    RoleArn=role,
    InputDataConfig=[
        {
            'ChannelName': 'train',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': 's3://{}/output/train'.format(bucket_name),
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'ContentType': 'text/csv',
            'CompressionType': 'None',
            'RecordWrapperType': 'None',
            'InputMode': 'File'
        },
        {
            'ChannelName': 'validation',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': 's3://{}/output/validation'.format(bucket_name),
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'ContentType': 'text/csv',
            'CompressionType': 'None',
            'RecordWrapperType': 'None',
            'InputMode': 'File'
        },
    ],
    OutputDataConfig={
        'S3OutputPath': 's3://{}/xgb'.format(bucket_name)
    },
    ResourceConfig={
        'InstanceType': 'ml.m4.xlarge',
        'InstanceCount': 1,
        'VolumeSizeInGB': 1
    },
    StoppingCondition={
        'MaxRuntimeInSeconds': 3600
    },)

print('{}\n'.format(response))

# 完了するまで、ステータスを監視する
job_run_status = sagemaker.describe_training_job(TrainingJobName=training_job_name)['TrainingJobStatus']
while job_run_status not in ('Failed', 'Completed', 'Stopped'):
    job_run_status = sagemaker.describe_training_job(TrainingJobName=training_job_name)['TrainingJobStatus']
    print (job_run_status)
    time.sleep(30)

これにより、Amazon SageMaker で XGBoost トレーニングジョブを実行し、その進行状況を監視します。ジョブのステータスが [Completed] になったら、次のセルに進むことができます。

これにより、先ほど作成した前処理済みデータでモデルがトレーニングされます。数分後、通常は 5 分以内に、ジョブは正常に完了し、モデルのアーティファクトが指定した S3 の場所に出力されます。これが完了すると、前処理、推論、後処理の手順で構成される推論パイプラインをデプロイすることができます。

前処理のアーティファクトを使用して Amazon SageMaker エンドポイントをデプロイする

これで一連のモデルアーティファクトができたので、Amazon SageMaker で順次実行される推論パイプラインを設定できます。まずすべてのモデルアーティファクトを指すモデルを設定し、次にハードウェアを指定するエンドポイント構成を設定し、最後にエンドポイントを立ち上げます。このエンドポイントでは、未加工データを渡します。アプリケーションコードに前処理ロジックを記述する必要はなくなります。トレーニングのために実行された同じ前処理手順を推論入力データに適用できるので、一貫性と管理の容易さが向上します。

Amazon SageMaker コンソールで、[Models] を選択し、左にある [Inference] オプションを選びます。[Create model] を選択します。これでモデルの設定に移動します。[Model name] には、pipeline-xgboost と入力します。[IAM role] では、ノートブックインスタンス用に以前に作成した SageMaker 実行ロールを選択します。次のようになります。

[Container definition 1] については、[Container input options] で、Provide model artifacts and inference image location を選択します。[Location of inference image] に 246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2 と入力します。これは、Amazon SageMaker によって提供される SparkML Serving イメージです。すべてのリージョンに提供されている SparkML イメージの完全なリストはこちらで利用可能です。[Location of model artifacts] で、s3://<your-bucket-name>/model/model.tar.gz と入力します。これらは、以前に実行した AWS Glue ジョブの実行時に作成されたプリプロセッサのアーティファクトです。

次に、環境変数によって SparkML Serving コンテナのスキーマを定義する必要があります。Key には SAGEMAKER_SPARKML_SCHEMA を入力し、Value には以下を入力します:

{"input":[{"type":"string","name":"buying"},{"type":"string","name":"maint"},{"type":"string","name":"doors"},{"type":"string","name":"persons"},{"type":"string","name":"lug_boot"},{"type":"string","name":"safety"}],"output":{"type":"double","name":"features","struct":"vector"}}

 

[Add container] を選択します。

[Container definition 2] については、[Container input options] で、Provide model artifacts and inference image location を選択します。

[Location of inference image] に 433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest と入力します。これは、Amazon SageMakerが提供する XGBoost 処理コンテナです。[Location of model artifacts] で、s3://<your-bucket-name>/xgb/xgb/output/model.tar.gz と入力します。このアーカイブには、以前のトレーニングジョブから得たシリアル化された XGBoost モデルのアーティファクトが含まれています。

Container definition 2 では、環境変数は不要です。

[Add container] を選択します。

最後に、[Container definition 3] については、[Container input options] で、Provide model artifacts and inference image location を選択します。[Location of inference image] に 246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2 と入力します。これは、Amazon SageMaker によって提供された、Container definition 1 で使用したものと同じ SparkML 処理イメージです。[Location of model artifacts] で、s3://<your-bucket-name>/model/postprocess.tar.gz と入力します。これは、XGBoost によって出力されたインデックス付きの値から元のラベルに戻すことを可能にするリバースインデクサです。

次に、環境変数を使用して SparkML Serving コンテナのスキーマを定義する必要があります。Key には SAGEMAKER_SPARKML_SCHEMA を入力し、Value には以下を入力します:

{"input": [{"type": "double", "name": "label"}], "output": {"type": "string", "name": "cat"}}

3 つのコンテナ定義がすべて整ったら、[Create model] を選択します。

これで、Amazon SageMaker コンソールの[Inference]、[Models] の下に自分のモデルが見つかります。リストから pipeline-xgboost モデルを選択して、モデルの詳細を表示します。ここで、[Create endpoint] ボタンを選択します。

[Endpoint]、[Endpoint name] の下に pipeline-xgboost と入力します。

[New endpoint configuration] の [Endpoint configuration name] に pipeline-xgboost と入力します。[Create endpoint configuration] を選択します。

最後に、下部で [Create endpoint] を選択します。

あるいは、パイプラインで SageMaker エンドポイントを作成するというラベルが付いているセルを実行することで、これらの手順のすべてをノートブックで実行することもできます。

### パイプラインで SageMaker エンドポイントを作成する
from botocore.exceptions import ClientError

# イメージの場所は次の場所に公開されます: https://github.com/aws/sagemaker-sparkml-serving-container
sparkml_images = {
    'us-west-1': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-west-2': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-1': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-2': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-1': '354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-2': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-1': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-2': '783357654285.dkr.ecr.ap-southeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-south-1': '720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-1': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-2': '764974769150.dkr.ecr.eu-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-central-1': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ca-central-1': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-gov-west-1': '414596584902.dkr.ecr.us-gov-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2'
}



try:
    sparkml_image = sparkml_images[region]

    response = sagemaker.create_model(
        ModelName='pipeline-xgboost',
        Containers=[
            {
                'Image': sparkml_image,
                'ModelDataUrl': 's3://{}/model/model.tar.gz'.format(bucket_name),
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input":[{"type":"string","name":"buying"},{"type":"string","name":"maint"},{"type":"string","name":"doors"},{"type":"string","name":"persons"},{"type":"string","name":"lug_boot"},{"type":"string","name":"safety"}],"output":{"type":"double","name":"features","struct":"vector"}}'
                }
            },
            {
                'Image': training_image,
                'ModelDataUrl': 's3://{}/xgb/{}/output/model.tar.gz'.format(bucket_name, training_job_name)
            },
            {
                'Image': sparkml_image,
                'ModelDataUrl': 's3://{}/model/postprocess.tar.gz'.format(bucket_name),
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input": [{"type": "double", "name": "label"}], "output": {"type": "string", "name": "cat"}}'
                }

            },
        ],
        ExecutionRoleArn=role
    )

    print('{}\n'.format(response))
    
except ClientError:
    print('Model already exists, continuing...')


try:
    response = sagemaker.create_endpoint_config(
        EndpointConfigName='pipeline-xgboost',
        ProductionVariants=[
            {
                'VariantName': 'DefaultVariant',
                'ModelName': 'pipeline-xgboost',
                'InitialInstanceCount': 1,
                'InstanceType': 'ml.m4.xlarge',
            },
        ],
    )
    print('{}\n'.format(response))

except ClientError:
    print('Endpoint config already exists, continuing...')


try:
    response = sagemaker.create_endpoint(
        EndpointName='pipeline-xgboost',
        EndpointConfigName='pipeline-xgboost',
    )
    print('{}\n'.format(response))

except ClientError:
    print("Endpoint already exists, continuing...")


# 完了するまで、ステータスを監視する
endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-xgboost')['EndpointStatus']
while endpoint_status not in ('OutOfService','InService','Failed'):
    endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-xgboost')['EndpointStatus']
    print(endpoint_status)
    time.sleep(30)

数分後、Amazon SageMaker は 1 つのインスタンスで提供されている 3 つのコンテナすべてを使用してエンドポイントを作成します。エンドポイントがペイロードで呼び出されると、ペイロードが最後の出力に達するまで、前のコンテナの出力が後のコンテナへの入力として渡されます。

この例では、未加工の文字列カテゴリが前処理 MLeap コンテナに送信され、Spark パイプラインを通じて機能をホットエンコードします。次に、1 つのホットエンコードされたデータが XGBoost コンテナに送信され、そこでモデルがインデックスを予測します。その後、インデックスは Spark モデルのアーティファクトとともに後処理 MLeap コンテナに渡され、インデックスが元のラベル文字列に変換されて、クライアントに返されます。これらはトレーニングデータの前処理に使用したのと同じ手順であり、コードを 1 回書くだけで済みました。

エンドポイント、モニタリング、メトリクスをテストする

Amazon SageMaker エンドポイントが InService になると、AWS CLI から invoke-endpoint コマンドを呼び出してテストすることができます。たとえば、次のコマンドを使用することができます。

aws sagemaker-runtime invoke-endpoint --point-name pipeline-xgboost --content-type text/csv --body low,low,5more,more,big,high out

成功すると、次のようなメッセージが表示されます。

{
    "ContentType": "text/csv",
    "InvokedProductionVariant": "default-variant-name"
}

呼び出しの出力はファイル out に表示され、次のコマンドでそれを見ることができます。

cat out

成功した場合、以下のいずれかの値を返すはずです: unaccaccgoodvgood

あるいは、「エンドポイントを呼び出す」というラベルの付いたセルをノートブックで実行することで、これを行うことも可能です。

### エンドポイントを呼び出す
client = boto3.client('sagemaker-runtime')

sample_payload=b'low,low,5more,more,big,high'

response = client.invoke_endpoint(
    EndpointName='pipeline-xgboost',
    Body=sample_payload,
    ContentType='text/csv'
)

print('Our result for this payload is: {}'.format(response['Body'].read().decode('ascii')))

推論パイプラインのメトリクス

デプロイを構築するときに、エンドポイントを監視またはデバッグする必要があると感じる場合があります。新しい推論パイプラインによって、Amazon CloudWatch でのログの表示方法が変わります。単一のエンドポイント内で各コンテナのログとメトリクスを見ることができます。これらのログを確認するには、AWS マネジメントコンソールに戻り、[Services]、[Amazon SageMaker]、[Inference]、[Endpoints] の順に移動します。リストから pipeline-xgboost エンドポイントを見つけ、名前で選択してエンドポイントの詳細を表示します。

[Monitor] セクションを見つけると、[View logs] リンクがあります。それを選択すると、CloudWatch Logs インターフェイスに移動します。このサンプルのエンドポイントでは、各コンテナに 1 つずつ、3 セットのログストリームがあります。次のようになります。

呼び出しによってエラーが発生した場合は、関連する出力が関連するログストリームに表示されます。それぞれのコンテナの stdout に出力される内容は、すべてこの場所になります。

AWS 環境のクリーンアップ

この実験が終了したら、予期しないコストが発生しないように、必ず Amazon SageMaker エンドポイントを削除してください。これをコンソールから行うには、[Services]、[Amazon SageMaker]、[Inference]、[Endpoints] と移動します。[Endpoints] で [pipeline-xgboost] を選択します。右上で、[Delete] を選択します。これで、AWS アカウントからエンドポイントが削除されます。また、ノートブックインスタンスも必ず停止してください。

以下のような「環境のクリーンアップ」というラベルが付いているコードセルを実行すると、ノートブックインスタンスからより広範囲のクリーンアップを実行できます。

### 環境のクリーンアップ

print('Deleting SageMaker endpoint...')
result = sagemaker.delete_endpoint(
    EndpointName='pipeline-xgboost'
)
print(result)

print('Deleting SageMaker endpoint config...')
result = sagemaker.delete_endpoint_config(
    EndpointConfigName='pipeline-xgboost'
)
print(result)

print('Deleting SageMaker model...')
result = sagemaker.delete_model(
    ModelName='pipeline-xgboost'
)
print(result)

print('Deleting Glue job...')
result = glue.delete_job(
    JobName='preprocessing-cars'
)
print(result)

結論

おめでとうございます! これで、Amazon SageMaker ML ワークフローの一部として AWS Glue で Apache Spark を使用して前処理および後処理を行う方法を学びました。これで、Amazon SageMaker の各推論リクエストに対して実行される 5 つのデータ処理および推論の手順のシーケンスをデプロイすることができます。この新機能を使用すると、前処理コードを一度作成して、トレーニングと推論 (リアルタイムまたはバッチ) の両方で使用できます。これにより、ML モデルのトレーニングとデプロイとの間での一貫性が向上します。さらに、Amazon SageMaker が提供する新しい SparkML Serving コンテナを使用すると、リアルタイムデータに Spark パイプラインを利用することができます。ご自由に、このプロセスを異なるデータセットや異なるモデルに適応させてください。

引用

Dua, D. and Karra Taniskidou, E.(2017).UCI Machine Learning Repository.Irvine, CA: University of California, School of Information and Computer Science.

 


著者について

Thomas Hughes は、AWS プロフェッショナルサービスのデータサイエンティストです。彼はカリフォルニア大学サンタバーバラ校で博士号を取得し、社会科学、教育、広告の分野で問題に取り組んできました。彼は現在、機械学習をビッグデータに適用する際に発生する最も厄介な問題のいくつかを解決することに取り組んでいます。

 

 

 

Urvashi Chowdhary は、Amazon SageMaker 担当のシニアプロダクトマネージャーです。 お客様に寄り添い、また、機械学習をさらに使いやすいものにしていくことに情熱を傾けています。余暇には、セーリング、パドルボード、カヤックを楽しんでいます。