Amazon Web Services ブログ

RAPIDSをAmazon SageMaker Processingで実行する

こんにちは、アマゾン ウェブ サービス ジャパンの藤田です。Amazon SageMakerは、すべての開発者が機械学習モデルを迅速に構築、トレーニング、デプロイできるようにするための完全マネージド型サービスです。Amazon SageMakerは機械学習モデルの開発を容易にするため、機械学習の各プロセスから負荷の大きな部分を取り除きます。
今回は、開発者が機械学習の前処理、後処理、モデル評価といったワークフローをAmazon SageMaker上で簡単に行うためのPython SDKであるAmazon SageMaker ProcessingとデータサイエンスのワークフローをGPUで高速化するフレームワーク、RAPIDSを使用し、サーバレスに前処理、特徴量作成を行う方法を紹介します。

Amazon SageMakerで機械学習の開発を行う利点

機械学習の開発を1つのEC2インスタンスで行う場合、開発に使用するマシンリソース(インスタンスタイプ)は多くの場合、ワークフローで使用するマシンリソースのピークに設定することになるかと思います。それは多くの場合、機械学習モデルの学習フェーズであったり、特徴量作成の一部分ではないかと思います。また、前処理・特徴量作成や学習のフェーズでGPUを利用したい場合は、GPUインスタンスを選択する必要もでてきます。
一方で、開発全体で見るとこれらには多くのマシンリソースの無駄が生じてしまいます。具体的には、環境構築や(時に膨大な時間を費やす)コーディングの時間をはじめ、マシンリソースがフル活用されていない全ての時間があてはまります。下図は一例として、1つのインスタンスで開発を進めた場合と Amazon SageMakerで適切なリソースを選択しながら開発を進めた場合のリソースの使用量を表しています。この面積の差がマシンリソースの効率の差であり、開発全体にかかるコストの差にもつながります。

Amazon SageMakerは、コーディングを行うための開発用ノートブックインスタンスと、前処理・特徴量作成・後処理用のインスタンス(今回紹介するAmazon SageMaker Processing)、学習用のインスタンス、推論用のインスタンスを使い分けることでマシンリソースの効率的な活用を実現します。また、Amazon SageMaker Processing、学習用インスタンス、推論用インスタンス(バッチ変換の場合)は、Jobの実行が完了すると自動的に停止、削除されるため、かかるコストはJobの実行から停止までの間のみとなります。
実際のケースでは、新たなプロジェクトを進めていくにあたって、あらかじめ必要になるマシンリソースが読めないケースも多いと思います。その場合でもAmazon SageMakerでインスタンスタイプを柔軟に調整していくことで必要な時に必要な分だけのリソースを使用して開発を進められます。

Amazon SageMaker Processingとは?

Amazon SageMaker Processingは機械学習の前処理、後処理、モデル評価といったワークフローをAmazon SageMaker上で簡単に行うためのPython SDKです。実行環境(コンテナ)を用意し、実行したい処理を記述し、実行するインスタンスタイプを指定することでJobを実行することができます。コンテナと処理を記述したスクリプトを実行し、処理が終わるとインスタンスが停止するというシンプルな機能から、上述した機械学習関連のデータ処理にとどまらず、学習や推論にもお使い頂くような場合もあります。

SDKには主に下記の4つのクラスが用意されています(詳しいドキュメントはこちら)。

  • sagemaker.processing.Processor
  • sagemaker.processing.ScriptProcessor
  • sagemaker.sklearn.processing.SKLearnProcessor
  • sagemaker.processing.PySparkProcessor

SKLearnProcessorとPySparkProcessorはそれぞれ、あらかじめ用意されたscikit-learnとPySparkの実行環境(コンテナ)を使用するクラスで、frameworkのバージョンやインスタンスタイプ、インスタンスの台数を指定してオブジェクトを作成し、実行したい処理が含まれるpythonスクリプトをrunメソッドの引数に与えることで実行できます。

from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role="[Your SageMaker-compatible IAM role]",
    instance_type="ml.m5.xlarge",
    instance_count=1,
)

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
    code="preprocessing.py",
    inputs=[
        ProcessingInput(source="s3://your-bucket/path/to/your/data", destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test"),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
)

ProcessorとScriptProcessorは独自の実行環境(コンテナ)で処理を行いたい時に使用します。

  • Processorクラスはrunメソッドにcode引数(pythonスクリプト)をとらないクラスで、コンテナの作成と実行したい処理をDocker image内に記述して利用します(記述の例はこちら)。
  • ScriptProcessorクラスはコンテナの作成と処理コードを分けて実行するクラスで、あらかじめコンテナを用意し、SKLearnProcessorとPySparkProcessorと同じようにrunメソッドにpythonスクリプトを受け渡し、処理を実行します。

今回は、ScriptProcessorクラスを使用して、RAPIDSを実行できる環境を用意し、前処理・特徴量作成を実行したいと思います。この記事で使用したノートブックはこちらにあります。

  • このノートブックを実行する開発用ノートブックインスタンスにはml.t2.large(vCPU2, メモリ8GB)以上のインスタンスタイプを使用してください。ml.t2.medium(vCPU2, メモリ4GB)の場合、コンテナのビルドでメモリエラーが発生します。

使用するデータ

今回使用するデータはCensus-Income KDDデータセットです。これは1994年と1995年に米国で実施された国勢調査のサンプリングデータです。データにはデモグラ情報や雇用に関する情報が含まれ、レコード数は199,523件となります。機械学習の文脈では決して大きなデータではありませんが、今回はこのデータを活用してRAPIDSを使った前処理や特徴量の作成を行いたいと思います。
このデータでの問題設定として「income(収入)」をターゲット変数として設定し、行単位で格納された国勢調査の回答者の収入が50,000ドルを上回るか否かを分類するタスクを実行したいと思います。

実行環境の準備

コンソールからAmazon SageMakerのノートブックインスタンスを起動したら、まずはコンテナを作成します。作成するコンテナはminicondaにRAPIDSのバージョン0.16が含まれたものになります。

FROM continuumio/miniconda3:4.8.2

RUN conda install -c rapidsai -c nvidia -c conda-forge -c defaults rapids=0.16 python=3.7 cudatoolkit=10.1 boto3

ENV PYTHONUNBUFFERED=TRUE
ENTRYPOINT ["python3"]

次に作成したコンテナをAWSのコンテナレジストリであるAmazon ECRにプッシュします。

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'sagemaker-rapids-example'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
rapids_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $rapids_repository_uri
!docker push $rapids_repository_uri

実行したい処理を記述

コンテナの作成が完了したら、コンテナ上で実行したいスクリプトを作成します。今回は、RAPIDSのcuDFとcuMLを使用して、データの読み込みとカテゴリ変数に対するLabelEncoding, TargetEncodingを行いたいと思います。スクリプトの名前はpreprocess.pyとしています。

%%writefile preprocess.py
from __future__ import print_function, unicode_literals

import boto3
import os
import sys
import time

import cudf
from cuml.preprocessing.LabelEncoder import LabelEncoder
from cuml.preprocessing.TargetEncoder import TargetEncoder

import warnings
warnings.filterwarnings("ignore")


if __name__ == "__main__":
    
    # Get processor script arguments
    args_iter = iter(sys.argv[1:])
    script_args = dict(zip(args_iter, args_iter))
    
    TARGET_COL = script_args['TARGET_COL']
    TE_COLS = [x.strip() for x in script_args['TE_COLS'].split(',')]
    SMOOTH = float(script_args['SMOOTH'])
    SPLIT = script_args['SPLIT']
    FOLDS = int(script_args['FOLDS'])
    
    # Read train, validation data
    train = cudf.read_csv('/opt/ml/processing/input/train/train.csv')
    valid = cudf.read_csv('/opt/ml/processing/input/valid/validation.csv')
    
    start = time.time(); print('Creating Feature...')
    
    # Define categorical columns
    catcols = [x for x in train.columns if x not in [TARGET_COL] and train[x].dtype == 'object']
    
    # Label encoding
    for col in catcols:
        train[col] = train[col].fillna('None')
        valid[col] = valid[col].fillna('None')
        lbl = LabelEncoder()
        lbl.fit(cudf.concat([train[col], valid[col]]))
        train[col] = lbl.transform(train[col])
        valid[col] = lbl.transform(valid[col])
    
    # Target encoding
    for col in TE_COLS:
        out_col = f'{col}_TE'
        encoder = TargetEncoder(n_folds=FOLDS, smooth=SMOOTH, split_method=SPLIT)
        encoder.fit(train[col], train[TARGET_COL])
        train[out_col] = encoder.transform(train[col])
        valid[out_col] = encoder.transform(valid[col])
        
    print('Took %.1f seconds'%(time.time()-start))
        
    # Create local output directories
    try:
        os.makedirs('/opt/ml/processing/output/train')
        os.makedirs('/opt/ml/processing/output/valid')
    except:
        pass
    
    # Save data locally
    train.to_csv('/opt/ml/processing/output/train/train.csv', index=False)
    valid.to_csv('/opt/ml/processing/output/valid/validation.csv', index=False)
    
    print('Finished running processing job')

SageMaker Processingの実行

このコードを実行するには、ScriptProcessorクラスのimage_uriの引数に、先ほど作成したRAPIDSが実行できるコンテナリポジトリのURIを渡し、インスタンスタイプにGPU(今回はml.p3.2xlarge)を指定し、任意の名前のオブジェクトを作成します。

  • ml.p3.2xlargeはNVIDIA Tesla V100が1枚搭載されているインスタンスですが、RAPIDSはマルチGPUにも対応しているため、例えば、NVIDIA Tesla V100が4枚搭載されたp3.8xlargeを使用してpreprocess.pyの中の処理をマルチGPUが実行できるように書き換えることでさらに大規模データへの対応や高速化が実現できます。
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

rapids_processor = ScriptProcessor(
    role=role, 
    image_uri=rapids_repository_uri,
    command=["python3"],
    instance_count=1, 
    instance_type="ml.p3.2xlarge", # use GPU Instance
    volume_size_in_gb=30, 
    volume_kms_key=None, 
    output_kms_key=None, 
    max_runtime_in_seconds=86400, # the default value is 24 hours(60*60*24)
    base_job_name="rapids-preprocessor",
    sagemaker_session=None, 
    env=None, 
    tags=None, 
    network_config=None)

次に、作成したオブジェクトでrunメソッドを実行します。

  • codeには先ほど作成したpreprocess.pyを指定します。
  • inputsのsourceには入力ファイルがあるS3のPATH、destinationにはコンテナ上でそのファイルを置く場所を指定します。
  • outputsのsourceには出力したいファイルのコンテナ上の場所、destinationには出力したいファイルを置くS3のPATHを指定します。
  • argumentsには、任意でスクリプト(preprocess.py)内に渡す引数を設定できます。今回はTargetEncodingに使用するターゲット変数の名前やTargetEncodingする変数名、ハイパーパラメータなどを設定していますが、これらはpreprocess.py内に直接記述することも可能です。

また、inputs、outputsは必ずしも設定する必要はなく、preprocess.py内でインターネット上からファイルを取得して、何かしらの処理をして出力することもできますし、入力ファイルから機械学習モデルを学習し、ひとまず精度を確認したり、学習用インスタンスで実行するためのコードを開発するといったことにも使えます(このnotebookでは前処理・特徴量作成のあとにXGBoostモデルの学習も行っています)。

rapids_processor.run(
    code="preprocess.py",
    inputs=[
        ProcessingInput(source=input_train, destination='/opt/ml/processing/input/train'), 
        ProcessingInput(source=input_validation, destination='/opt/ml/processing/input/valid')
    ], 
    outputs=[
        ProcessingOutput(source='/opt/ml/processing/output/train', destination=output_s3_path),
        ProcessingOutput(source='/opt/ml/processing/output/valid', destination=output_s3_path)
    ],
    arguments=[
        'TARGET_COL', 'income',
        'TE_COLS', 'class of worker, education, major industry code',
        'SMOOTH', '0.001',
        'SPLIT', 'interleaved',
        'FOLDS', '5'
    ],
    wait=True,
    logs=True,
    job_name=None,
    experiment_config=None,
    kms_key=None
)

このJobが完了するとoutputsのdestinationに与えたoutput_s3_pathに前処理、特徴量作成されたファイルがtrain.csvとvalidation.csvの名前で出力されているはずです。

Jobの詳細はSageMakerのコンソール画面からも確認できます。この処理には約186秒かかっており、この時間分ml.p3.2xlargeの料金が発生します。

最後に

Amazon SageMakerでは、コーディングを行うための開発用ノートブックインスタンスと、Amazon SageMaker Processing、学習用インスタンス、推論用インスタンスを使い分けることでマシンリソースの効率的な活用を実現できます。
今回はml.t2.largeインスタンス(vCPU2, メモリ8GB)で開発を行いながら、前処理・特徴量作成の部分にだけGPUインスタンスを使用して、RAPIDSライブラリによる前処理・特徴量作成を行いました。

前述した通り、Amazon SageMaker Processingは前処理・特徴量作成に限らず、機械学習モデルの構築にも使用できます。
学習用インスタンスでは、スポットインスタンスの利用実験管理HPOなどの機械学習の学習フェーズでの開発効率を上げる機能が充実していますが、データの入出力やモデル出力の部分については学習用インスタンスで使用するためのスクリプト書き換えが必要になります。
Amazon SageMaker Processingにはこれらの機能はありませんが、特別な書き換え作業なく学習を実行できるため、開発初期の試行錯誤のフェーズに適しています。ある程度、機械学習モデルの見通しが立ってきた段階で学習用インスタンスへ移行していくことで、より効率的なリソースの活用が実現できるでしょう。

 

著者について

藤田 充矩 (Atsunori Fujita) は AWS の機械学習 プロトタイプ ソリューション アーキテクトで、自然言語処理や時系列分析などを得意にしています。趣味はkaggleです。