Amazon Web Services ブログ

Amazon SageMaker Model Monitor を活用したデータドリフト検知の解説

はじめに

AWS では、機械学習モデルの開発、学習、推論に最適な AWS サービスを提供しています。これらを活用することで、お客様は、データに基づく適切なビジネス判断、高品質な顧客サービスの提供、コンプライアンスの遵守等の実現が可能となります。ただし、機械学習モデルが扱うデータには細心の注意が必要です。機械学習モデルの学習データと推論時に入力されるデータに、許容範囲を超えた乖離が発生すると、モデルは期待した精度で推論結果を返すことができなくなります。昨今、 COVID-19 や企業のデジタルトランスフォーメーション (DX) の影響等で、一般消費者の生活スタイルやソーシャルネットワークでの言動、企業における従業員のワークスタイルや財務状況に至るまで、様々な指標に、予測を超えた変化が起きている可能性があります。これらは、それまでに学習された機械学習モデルの精度に影響を与える可能性があります。このように、推論時の入力データや出力結果が、学習時の許容範囲を超えて変化することをデータドリフトと呼びます。( 注釈 : データドリフトまたはデータシフトの文脈では、共変量シフト (covariate shift) やコンセプトドリフト (concept drift) などの概念、用語が使われることもありますが、本記事では、データドリフトという用語を、上記の定義に基づいて使用します。) このデータドリフトを継続的に監視する仕組みが、精度の高い推論を提供し続けるために重要です。

AWS は、このようなデータドリフト検知を含め、機械学習を組み込んだアーキテクチャにおけるベストプラクティスを、Machine Learning Lens というホワイトペーパーで公開しています (日本語版) 。また、 Amazon SageMaker を活用しているお客様は、 Amazon SageMaker Model Monitor ( 以下、 Model Monitor ) を活用して、すぐにデータドリフトの検知を開始することが可能です。本記事では、 AWS を利用した機械学習システムの検討や実装に関わる皆様を対象に、 Model Monitor のアウトプットをどのように解釈しデータドリフトの検知に利用できるのか、関連する論文や技術を、できるだけ具体的、かつ簡潔に解説したいと思います。

機械学習のビジネス活用においては、モデルのベースとなるアルゴリズムの正しい理解が重要です。それと同じように、データドリフトの検知に活用されている様々なアルゴリズムや技術についても理解することは、誤ったビジネス活用を未然に防ぐために必要不可欠です。AWS では、クラウドならではのスピード感で、すぐに Model Monitor を活用したデータドリフト検知を開始することが可能です。私たちプロフェッショナルサービスのコンサルタントは日頃、AWSを活用した機械学習モデルの開発、学習、推論のためのアーキテクチャの最適化についてガイドしています。コンサルタントにはデータサイエンティストや Machine Learning ( 以下、ML ) エンジニアなどがおり、関連論文に関する知識や現場で得たノウハウを蓄えています。本記事により、みなさまのご理解の一助となれば幸いです。

なお、データドリフトに関する記事はシリーズ化を予定しており、本記事はその第一弾で、初級編となります。続編では、より踏み込んだ、数理的、実践的な内容をご紹介する予定ですのでご期待ください。

ベストプラクティスとメトリクス

先ほど紹介した Machine Learning Lens というホワイトペーパーでは、データドリフトについて「一般的な設計の原則 (19 ページ) 」の中で、次のように解説しています。

経時的にデータドリフトを管理するには、モデルが本稼働状態になった後で推論の精度を継続的に測定します。通常、MLで使用されるデータは複数のソースから取得され、そのデータの形状と意味は、アップストリームのシステムやプロセスの変更に伴い変化する可能性があります。適切なアクションを実行できるように、こうした変化を検出するメカニズムを確保します。

この記述の通り、データドリフトの管理には、推論の精度を継続的に測定する必要があり、複数のアップストリームのデータソースから、その精度の低下につながったデータの変化を迅速に推定することが重要です。そして、データを使った再学習を行う際の指標は、「運用の優秀性 (49 ページ) 」において、次のように記載されています。

MLOPS 06:新しいデータや更新データでML モデルを再トレーニングするタイミングはどのように判断しますか? MLワークロードは、初期段階では価値が高い予測値を提供しますが、その同じモデルによる予測の精度は時間とともに低下します。これは「ドリフト」という現象によることが多く、時間の経過による正解データの変化など、さまざまな要因が影響した結果である可能性があります。

(略)

追加データを検討するためのベストプラクティスは、次のとおりです。

– モデルのパフォーマンスと精度を示すメトリクスを定義します。
– それらのメトリクスを定期的に取得するメカニズムを構築して分析を行い、メトリクスのしきい値に基づいたアラートを出します。たとえば、あるモデル予測までさかのぼってダウンストリームの結果の確認、データ取得およびトラッキングを行い、エラー率などのメトリクスを経時的に計算するシステムが必要になることがあります。
– モデルの再トレーニングが適切かどうかを評価します。追加の正解データが利用可能または取得可能かどうか、追加データをラベル付けする必要があるのかどうかを判断します。新しいデータを使用してトレーニングを定期実行する、新しいデータを再トレーニングのトリガーとする、メトリクスのしきい値を使用して再トレーニングを評価するなど、既知のワークロードの特性に基づく再トレーニングの初期戦略を立てます。この戦略では、変化量、再トレーニングコストおよび本番モデル更新の潜在的な価値のトレードオフを評価する必要があります。定義された戦略に基づいて、再トレーニングの自動化を設定します。」

これら記述から、データドリフトの検知とその対応のために、計測するメトリクスをあらかじめ定義し、そのメトリクスを継続的にモニタリングした上で、もし閾値を超えた場合はモデルの再学習を行うことがベストプラクティスであると読み取れます。

では、モニタリングされ、再学習のトリガーに利用される「メトリクス」にはどのようなものが考えられるでしょうか。これを理解するためには、機械学習モデルの学習データや、推論時の入力、出力データの特性を簡潔に表現する指標や、それらデータの差異を表現する指標を理解する必要があります。前者には、平均値や中央値や標準偏差、スケッチ(後述)、データの複雑性などが知られています。そして後者には、前述の指標の比較に加えて、確率分布間の距離推定の利用がされています。このようなデータ特性を表現するための指標には、これまでにいくつもの手法が提案されてきました。ここでは、 Amazon で開発し、使用されているオープンソースソフトウェアである Deequ を具体的な実装例として紹介します。 Deequ はデータ品質を単体テストするための宣言的な API を提供するモジュールであり、活用することでデータセットのデータ品質メトリクスを計算したり、データが満たすべき制約に対するチェックをしたりすることができます。2018年に論文が公開されたのち、2019 年にも後続の論文が発表されています。また、現在 (2020/11/05 時点) は最新バージョン 1.0.5 が公開されており、継続して発展を遂げています。

Deequ はSpark アプリケーションとして開発された Open Source Software で、 Amazon EMR などの Apache Spark システムで稼働させることができます。 Model Monitor は、そのコンテナ環境をマネージドサービスとして、 SageMaker SDK を介して利用することもできます。

統計情報、スケッチ、距離

Deequ は、統計情報やスケッチなどのメトリクスを出力したり、データの分布距離を評価したりすることが可能です。前述の Deequ に関する論文や公開されているソースコード、および SageMaker の開発者ガイドを参照し、それぞれで利用可能なメトリクスを次のように整理しました。

表1: Deequ や Amazon SageMaker Model Monitor で利用可能なメトリクスの例

メトリクス 概要
基本的な統計情報 完全性 (NULL 値以外の割合) 、品質制約に準拠する割合、固有値 (Uniqueness) の割合、個別値 (Distinctness) の割合、2つの列間での相関係数、最大値、最小値、平均値、合計
スケッチ (分位スケッチ) データから一部をサンプリングし、要約する手法もしくはその単位のこと。様々な用途に用いることができ、分位スケッチは、分位を得るために使われる。 KLL Sketch など。
分布距離 (またはダイバージェンス) 分布距離により、2つの確率分布の類似性を正数値で比較することができる。確率分布の類似性を図る手法として、ダイバージェンスも利用される。 Kolmogorov-Smirnov (KS) ダイバージェンスなど。

注釈:ブログ執筆時点 (2020/11 月末時点) の公開情報に基づいています。

基本的な統計情報を、学習データに対して、事前に取得しそれをベースラインとして取得しておけば、推論時の入力データと比較することができます。これにより、データの入力源となるセンサーの故障等によって、一部の特徴量が不完全 (NULL) となるような事象を検知することができます。また、データ品質制約によって、特定のセンサーの交換等により、データの出力に仕様変更が生じた場合にそれを検知できる可能性があります。

スケッチは、際限なく生成されるストリーミングデータの特性を要約するための少量のデータ集合もしくは、その集合を生成する処理を指しています。 XGBoost で重み付き分位点スケッチが使われていることで、ご存知の方もいるかと思います。この場合、スパース性を考慮した分位による分割方法を得るために、スケッチが使われています。分位やヒストグラムだけでなく、固有値の計測、最頻値の計測、サンプリングなど、それぞれに最適なアルゴリズムが研究されています。スケッチの特徴として、「ストリーミングデータに対し、ある一時点のデータを 2 回以上取得する必要がない ( ワンタッチ ) ため、リアルタイムアプリケーションに適している」、「データサイズに対して計算量はリニアに増大しない」、「分散処理したとしても加法性があるためにスケーラブルである」、「 ( 一部の制限を除いて ) データ非依存であり順序や分散、値がどのようなものであっても影響を受けない」などが挙げられます。また、重要な特徴として、スケッチのアルゴリズムは、誤差の範囲を含む理論的な根拠が数学的に証明されていることです。これにより、任意のクエリーにおけるエラーを推定することができます。これらのことは、 Apache Foundations のプロジェクトである Apache DataSketches のウェブサイトで紹介されています。推論時の入力データは、際限なく送られることが仮定され、出力結果とともにストリームデータと言えますので、これらの要約を得るのにスケッチは有効です。

ここでは、 Deequ や Model Monitor で利用可能な KLL Sketch をベースに解説を行います。 KLL Sketch は、 2016 年に発表された論文の著者 (Zohar Karnin 氏, Kevin Lang 氏, Edo Liberty 氏 ) の頭文字から、そう呼ばれている分位スケッチの一種です。 KLL Sketch を利用することでストリーミングデータの一部をサンプリングし、全体のデータを要約します。

KLL Sketch を理解する上では、まず Compactor を理解することが必要です。Compactor は、ストリーミングデータを一時的に格納、ソートした上で、偶数だけ、奇数だけといったようにデータの半数だけを抽出するという「コンパクト処理」を行うものです。どのくらいのデータを格納するかは、パラメーター (k) で設定します。k=2,048 であれば、 Compactor のサイズは 2,048 ということになります。コンパクト処理をされた後、 (半分になった分だけ) 重み付けを行います。これを事前に設定された回数だけ繰り返します。これらコンパクト処理や重み付けにより発生した誤差は、回数分だけ積み上がっていきますが、ある範囲で想定できます。また、 Compactor の処理ごとに k の値をどの程度減らしていくかを、 c というパラメータで指定します。 c=0.64 であれば、約2/3ずつ減衰させるということです。 KLL Sketch などのアルゴリズムについて、本シリーズの次回以降で、解説する予定です。

分布距離(または、ダイバージェンス)は、 2 つの確率分布間の類似性を評価するための指標に使われます。同様の目的では、距離の公理を満たすものだけでなく、ダイバージェンスが利用されることもあります。Generative adversarial network( 通称GAN) をご存知の方にとっては、 Wasserstein GAN( 通称 WGAN) で利用されている Wasserstein 距離に馴染みがあると思います。Deequ では、 Kolmogorov–Smirnov ダイバージェンスが利用可能です。2つの対象データにおける特徴量の累積分布関数の差により、その類似性を評価するものです。Chebyshev 距離を指定することも可能です。

Model Monitor を使ったデータドリフト検知

これまでに、基本的な統計情報、分位スケッチ、分布距離についての簡単なご紹介をしました。Model Monitor を活用することで、これらを可視化することができます。ここでは、これらを元に、データドリフトが発生した場合、どのように、それらを把握できるかを見ていきたいと思います。今回利用するデータは、 New York City Taxi のトリップレコードデータです。パブリックに利用可能なデータセットを容易に検索できる “Registry of Open Data on AWS” に登録されています。このデータをデータドリフト検出の題材として利用したいと思います。NYC Taxi のトリップデータは、 1 レコードが 1 回のピックアップ ( 乗車 ) からドロップオフ ( 降車 ) までを表しており、タクシーの乗車時刻、降車時刻、乗車区域、降車区域、乗客数や料金などの情報で成り立っています。これらが、毎月のデータとして CSV で提供されています。

データの前処理

まず、データの前処理と確認を行いましょう。まず、 AWS 環境でのデータ分析に役立つ aws-data-wrangler を Jupyter Notebook 環境に導入します。AWS SageMaker Notebook インスタンスを利用する場合は、事前に作成しておきます。


!pip install awswrangler

import awswrangler as wr
import os
import pandas as pd

pd.set_option('display.max_columns', 100)
pd.set_option('display.max_rows', 2000)

S3 バケットから CSV ファイルを読み込むための Python 関数を定義します。NYC Taxi にはマンハッタン島で乗客をピックアップ可能な Yellow ライセンスと、マンハッタン島以外でそれが可能な Green ライセンスがあり、それぞれのデータは別のファイルとして提供されています。Yellow ライセンスの乗車データは1ヶ月あたり 600 – 700MB 程とデータサイズが比較的大きいことから、今回は 1/10 程度の Green ライセンスの乗車データを対象にします。マンハッタン島の乗車データを分析してみたい方は、 “cab_color” を “yellow” に設定してチャレンジしてみてください。( データ量や処理時間の増加に注意してください。 )


def read_and_parse_csv(cab_color, year, month):
    # Set next month as string
    if int(month) < 12:
        limit_year = year
        limit_month = str(int(month) + 1).zfill(2)
    else:
        limit_year = str(int(year) + 1).zfill(4)
        limit_month = '01'
    df = wr.s3.read_csv(f's3://nyc-tlc/trip data/{cab_color}_tripdata_{year}-{month}.csv')
    # Convert pickup/drop timestamp string to timestamp data type
    if cab_color == 'yellow':
        df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    elif cab_color == 'green':
        df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    else:
        raise Exception('Unexpected cab_color type')
    df['trip_duration_sec'] = df.apply(lambda x: (x.dropoff_datetime - x.pickup_datetime).total_seconds(), axis=1)
    df['pickup_ymdh'] = df.pickup_datetime.dt.strftime('%Y-%m-%d %H:00:00')
    df['date'] = df.pickup_datetime.dt.strftime('%Y-%m-%d')
    # Drop abnormal records based on trip duration (sec) and pickup timestamp
    df = df[(df.trip_duration_sec >= 10) & (df.trip_duration_sec <= 14400)]
    df = df[(df.pickup_ymdh >= f'{year}-{month}-01 00:00:00') & (df.pickup_ymdh < f'{limit_year}-{limit_month}-01 00:00:00')].copy()
    return df
def generate_trip_summary(df_records):
    groupby_target = ['date', 'pickup_ymdh', 'PULocationID']
    calc_target = ['trip_duration_sec', 'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
    g = df_records.groupby(by=groupby_target)
    df_count = g.VendorID.count().rename('count')
    df_mean = g[calc_target].mean().rename(columns={x:f'{x}_mean' for x in calc_target})
    df_std = g[calc_target].std().rename(columns={x:f'{x}_std' for x in calc_target})
    df_summary = pd.concat([df_count, df_mean, df_std], axis=1).reset_index()
    return df_summary
def save_s3_by_date(df_summary, cab_color, bucket, prefix):
    ymd_list = sorted(list(set(df_summary.date.tolist())))
    for ymd in ymd_list:
        file_ymd = ymd.replace('-', '')
        df_save = df_summary[df_summary.date == ymd]
        wr.s3.to_csv(df_save, f's3://{bucket}/{prefix}/nyctaxi_tripdata_{cab_color}_{file_ymd}.csv', index=False)

今回は全体の傾向をつかむために、 2019 年 1 月から 2020 年 6 月までのデータを取得した上で、 1 時間ごとの乗車回数や平均の乗車時間、料金などにサマリーし、自身の S3 Bucket に保存します。 なお、下のコードの bucket 変数を、ご自身で保有する S3 Bucket 名に変更してから実行するよう注意してください。 ( S3 Bucket は、事前に作成してください。)


year_month = [('2019', '01'), ('2019', '02'), ('2019', '03'), ('2019', '04'),
              ('2019', '05'), ('2019', '06'), ('2019', '07'), ('2019', '08'),
              ('2019', '09'), ('2019', '10'), ('2019', '11'), ('2019', '12'),
              ('2020', '01'), ('2020', '02'), ('2020', '03'), ('2020', '04'),
              ('2020', '05'), ('2020', '06')]
cab_color = 'green'
bucket = 'my-model-monitor-bucket-name'
prefix = 'data/nyctaxi/daily'

for year, month in year_month:
    df_records = read_and_parse_csv(cab_color, year, month)
    df_summary = generate_trip_summary(df_records)
    save_s3_by_date(df_summary, cab_color, bucket, prefix)

ここからは、先ほど指定した S3 Bucket に格納された CSV データを読み込んで処理していきましょう。この記事の執筆時点では、 2019 年 1 月 1 日から約 1 年半分のデータが取得できています。読み込み処理には数分かかります。


import boto3
	
s3 = boto3.client('s3')
response = s3.list_objects(Bucket=bucket, Prefix=prefix)
nyctaxi_summary_files = sorted([x['Key'] for x in response['Contents']])
nyctaxi_summary_files = [x for x in nyctaxi_summary_files if ('green' in x) and ('2019' in x or '2020' in x)]
nyctaxi_summary_files

df_data = pd.DataFrame()
for file in nyctaxi_summary_files:
    df_read = wr.s3.read_csv(f's3://{bucket}/{file}')
    df_data = pd.concat([df_data, df_read])

print('Number of records:', df_data.shape[0])

データの概観をつかむために、pandas の DataFrame に変換されたデータをチャートにプロットします。


df_data.groupby(by='pickup_ymdh')['count'].sum().plot(figsize=(16,4), ylabel='Number of pickup')

plot
このチャートは、「 1 時間あたりの green タクシーピックアップ数」の推移を表しています。2019 年 1 月から漸減傾向ながらもピーク時間帯には 1000 を超える程度のピックアップ数で推移していました。しかし、 2020 年の 2 月末から 4 月にかけて顕著に減少していることが読み取れます。COVID-19 の影響が、 New York City Taxi のピックアップ数に影響を及ぼした可能性が示唆されます。

予測モデルの開発

ここから、 NYC Taxi のトリップ数の簡易的な予測モデルを開発します。ここでは XGBoost を使用し、自己回帰に近いモデルを作成することにします。今回は、「 2019 年 1 月から 12 月までのデータを学習したモデルを開発し、 2020 年 1 月からモデルの運用を開始した」という状況であったと仮定します。 つまり、モデルの運用から約 3 ヶ月間は、学習時のデータと同様の傾向を示す予測精度の高い状態を維持していたものの、 4 ヶ月後にデータドリフトに直面することになります。Model Monitor による監視をしていたら、どのような指標の変化を検出できるのか、を見ていきましょう。


def same_hour_mean(data):
    current_row = data.tail(1)
    current_hour = current_row.index.strftime('%H').values[0]
    except_for_current = data.head(data.shape[0]-1)

    return except_for_current[except_for_current.index.strftime('%H') == current_hour].mean()

def same_hour_min(data):
    current_row = data.tail(1)
    current_hour = current_row.index.strftime('%H').values[0]
    except_for_current = data.head(data.shape[0]-1)

    return except_for_current[except_for_current.index.strftime('%H') == current_hour].min()

def same_hour_max(data):
    current_row = data.tail(1)
    current_hour = current_row.index.strftime('%H').values[0]
    except_for_current = data.head(data.shape[0]-1)

    return except_for_current[except_for_current.index.strftime('%H') == current_hour].max()

# Aggregate all locations based on "YYYY-MM-DD HH"
g = df_data.groupby(by='pickup_ymdh')
df_features = pd.DataFrame()
df_features['y'] = g['count'].sum()

# Set DateTimeIndex to DataFrame
df_features = df_features.reset_index()
df_features['pickup_ymdh'] = pd.to_datetime(df_features.pickup_ymdh)
df_features = df_features.set_index('pickup_ymdh')

# Set time periods to express taxi pickup trend
df_features['time_hh'] = df_features.index.strftime('%H')
df_features['time_flag_early_morning'] = df_features.time_hh.apply(lambda x: 1 if x in ['00', '01', '02', '03', '04', '05', '06', '07'] else 0)
df_features['time_flag_daytime'] = df_features.time_hh.apply(lambda x: 1 if x in ['08', '09', '10', '11', '12', '13', '14', '15', '16'] else 0)
df_features['time_flag_evening'] = df_features.time_hh.apply(lambda x: 1 if x in ['17', '18', '19', '20', '21'] else 0)
df_features['time_flag_midnight'] = df_features.time_hh.apply(lambda x: 1 if x in ['22', '23'] else 0)
df_features['weekday'] = df_features.index.strftime('%w')

for hour in [1, 2, 3, 4, 5, 24, 48, 72, 96, 168]:
    df_features[f'history_{hour}h_b'] = df_features.y.shift(hour)

# Calc mean, minimum and maximum values for each hour
df_features['same_hour_mean'] = df_features.y.rolling(window=24*7*8).apply(same_hour_mean)
df_features['same_hour_min'] = df_features.y.rolling(window=24*7*8).apply(same_hour_min)
df_features['same_hour_max'] = df_features.y.rolling(window=24*7*8).apply(same_hour_max)

# Convert hour feature from str to int
df_features['time_hh'] = df_features.time_hh.astype(dtype=int)

# Calc delta from previous hour to current hour
df_features['y_diff'] = df_features.y - df_features.history_1h_b
df_features['diff_ratio'] = df_features.y_diff / (df_features.history_1h_b + 0.00001)
df_features['y_diff_mean'] = df_features.y_diff.rolling(window=24*7*8).apply(same_hour_mean)
df_features['diff_ratio_mean'] = df_features.diff_ratio.rolling(window=24*7*8).apply(same_hour_mean)
df_features = df_features.drop(columns=['y_diff', 'diff_ratio'])

まず、モデルの開発に利用できるのは 2019 年中のデータなので、 2019 年 1 月から 8 月までのデータを学習用、同年 9 月と 10 月のデータを検証用、同年 11 月と 12 月のデータをテスト用に使用することにします。Amazon SageMaker が提供する XGBoost アルゴリズムは S3 からデータを取得して動作するので、分離したデータを S3 に保管します。

その後にモデルのトレーニングと、作成したモデルの推論エンドポイントへのデプロイを行います。


%%time
import boto3
import time
from time import gmtime, strftime
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri

# Split data to train, validation and test, and save the data in S3
df_train = df_features['2019-02-01':'2019-08-31']
df_validation = df_features['2019-09-01':'2019-10-31']
df_test = df_features['2019-11-01':'2019-12-31']
model_data_prefix = 'data/nyctaxi/model_training'
train_data_path = f's3://{bucket}/{model_data_prefix}/data.train'
validation_data_path = f's3://{bucket}/{model_data_prefix}/data.validation'

wr.s3.to_csv(df_train, train_data_path, index=False, header=False)
wr.s3.to_csv(df_validation, validation_data_path, index=False, header=False)

# Set environments for model training
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
container = get_image_uri(region, 'xgboost', '1.0-1')

job_name = 'xgboost-regression-nyctaxi-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_prefix = 'model/xgboost-regression-nyctaxi'
print("Training job", job_name)

# Define model training parameters
create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": f's3://{bucket}/{model_prefix}'
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.m5.large",
        "VolumeSizeInGB": 5
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "max_depth":"20",
        "eta":"0.2",
        "gamma":"0",
        "min_child_weight":"1",
        "subsample":"1",
        "silent":"0",
        "objective":"reg:squarederror",
        "num_round":"200"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 3600
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": train_data_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": validation_data_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        }
    ]
}

# Start model training
client = boto3.client('sagemaker', region_name=region)
client.create_training_job(**create_training_params)

# Wait for model training completion
status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
while status !='Completed' and status!='Failed':
    time.sleep(60)
    status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print(status)

# Create model
model_name=job_name + '-model'
info = client.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
primary_container = {
    'Image': container,
    'ModelDataUrl': model_data
}

create_model_response = client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])
print('Model creation completed.')
print('')

# Deploy model to endpoint
endpoint_config_name = 'demo-xgboostendpointconfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print('Endpoint deployment started.')

create_endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.t2.medium',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

endpoint_name = 'demo-xgboostendpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print('Endpoint name:', endpoint_name)
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)

# Check endpoint creation status
resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
while status=='Creating':
    print("Status: " + status)
    time.sleep(60)
    resp = client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']

print('Endpoint deployment completed.')
print('Endpoint arn:', resp['EndpointArn'])

デプロイされた推論 Endpointを利用して、モデルトレーニング時に使っていないテストデータ期間の NYC Taxi ピックアップ数が予測できるかを確かめてみます。ここでは、推論 Endpointを呼び出すために3つの関数を定義します。テストデータ期間の DataFrame を与えると、関数内でcsvに変換して推論endpointが呼び出されます。推論Endpointからの応答も関数内で解釈されてリスト形式で結果が返却されます。


import numpy as np
import json
import math

def list_to_csv(value_list):
    csv = ''
    for i, value in enumerate(value_list):
        if i == (len(value_list) - 1):
            csv += str(value)
        elif i != 0:
            csv += str(value)+","  
        else:
            pass
    return csv

def call_model_endpoint(endpoint_name, payload):
    runtime_client = boto3.client('runtime.sagemaker', region_name=region)
    response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                       ContentType='text/csv',
                                       Body=payload)
    result = response['Body'].read().decode("utf-8")
    result = result.split(',')
    result = [math.ceil(float(i)) for i in result]
    return result[0]

def exec_prediction(df):
    all_pred = []
    for i, row in df.iterrows():
        payload = list_to_csv(row[1:])
        prediction = call_model_endpoint(endpoint_name, payload)
        all_pred.append(prediction)

    return all_pred

df_test['pred'] = exec_prediction(df_test)
df_test[:168].plot(y=['y', 'pred'], figsize=(12,4))

推論の結果と正解データを1週間分プロットすると、下のようなグラフが描画されます。全体として、ある程度は正解データに追随する推論結果が出ているので、このモデルが利用するデータのドリフトを検出するステップに進みましょう。

prediction1

推論 Endpoint のモニタリング

次に、Model Monitorの設定を次の手順で行います。

1. 推論Endpointに入力されたデータのリアルタイムキャプチャー設定
2. 学習データからベースライン作成
3. Model Monitorのスケジュール設定

1. 推論 Endpoint に入力されたデータのリアルタイムキャプチャー設定

順に見てきましょう。まず、前のステップで作成済みの推論 Endpoint の設定を更新します。
そのために Model Monitor がキャプチャーしたデータを保管する S3 上のパスを設定します。ここでは、モデルのトレーニングに利用したバケットをそのまま使います。


s3_capture_upload_path = f's3://{bucket}/model_monitor/endpoint-data-capture'

次に、 Model Monitor の Data capture 設定を作成して推論 Endpoint の設定を更新します。


from sagemaker.model_monitor import DataCaptureConfig
from sagemaker import Predictor
from sagemaker import session
import boto3
sm_session = session.Session(boto3.Session())

# Change parameters as you would like - adjust sampling percentage,
#  chose to capture request or response or both.
#  Learn more from our documentation
data_capture_config = DataCaptureConfig(
                        enable_capture = True,
                        sampling_percentage=50,
                        destination_s3_uri=s3_capture_upload_path,
                        kms_key_id=None,
                        capture_options=["REQUEST", "RESPONSE"],
                        csv_content_types=["text/csv"],
                        json_content_types=["application/json"])

# Now it is time to apply the new configuration and wait for it to be applied
predictor = Predictor(endpoint_name=endpoint_name)
predictor.update_data_capture_config(data_capture_config=data_capture_config)
sm_session.wait_for_endpoint(endpoint=endpoint_name)

2. 学習データからベースライン作成

次に、学習データを元にしたベースラインを作成します。Model Monitor は、このベースラインをインプットとして、データドリフトの分析を可能とします。
入力となるデータや、出力先のパス定義を設定します。


baseline_data_path = f's3://{bucket}/model_monitor/baseline_input/features-2019.csv'
df_data2019 = df_features[(df_features.index >= '2019-01-01') & (df_features.index < '2020-01-01')].iloc[:, 1:]
wr.s3.to_csv(df_data2019, baseline_data_path, index=False)

baseline_results_uri = f's3://{bucket}/model_monitor/baseline/'

print('Baseline data uri: {}'.format(baseline_data_path))
print('Baseline results uri: {}'.format(baseline_results_uri))

ベースラインの作成処理をトリガーします。ここで指定しているように、ベースラインの算出は別のインスタンスを起動して行うので、データボリュームに応じてインスタンスタイプを変更することが可能です。


from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker import get_execution_role

role = get_execution_role()

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

このjobが完了すると、`baseline_results_uri` に指定したパスに statics.json と constraints.json が生成されます。それぞれ、以下のコードで取得して表示することが可能です。statistics.json には、データの要約統計量や分位スケッチが出力されており、データの概要を理解することができます。


import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

screenshot02_baseline_stats.png
KLL スケッチによって算出された分位情報も、以下のようにして出力することができます。


kll_buckets = schema_df[schema_df.name == 'history_1h_b']['numerical_statistics.distribution.kll.buckets'].tolist()[0]
for kll_bucket in kll_buckets:
    print(kll_bucket)

{'lower_bound': 5.0, 'upper_bound': 188.6, 'count': 1538.0}
{'lower_bound': 188.6, 'upper_bound': 372.2, 'count': 937.0}
{'lower_bound': 372.2, 'upper_bound': 555.8, 'count': 1018.0}
{'lower_bound': 555.8, 'upper_bound': 739.4, 'count': 1793.0}
{'lower_bound': 739.4, 'upper_bound': 923.0, 'count': 1560.0}
{'lower_bound': 923.0, 'upper_bound': 1106.6, 'count': 939.0}
{'lower_bound': 1106.6, 'upper_bound': 1290.2, 'count': 602.0}
{'lower_bound': 1290.2, 'upper_bound': 1473.8, 'count': 286.0}
{'lower_bound': 1473.8, 'upper_bound': 1657.4, 'count': 74.0}
{'lower_bound': 1657.4, 'upper_bound': 1841.0, 'count': 12.0}

また、 constraints.json は、学習データを元に、 Model Monitor が推奨するデータ品質の項目を表しています。


constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

screenshot04_constraint.png

3. Model Monitor のスケジュール設定

ここまでの手順で、推論 Endpoint に対してデータキャプチャーを設定し、トレーニングデータからベースラインを作成しました。これら 2 つを元に、モニタリングのスケジュールを設定することで、データドリフトの監視ができるようになります。スケジュールは、 Model Monitor が提供する CronExpressionGenerator を利用することで簡単に設定可能です。hourly メソッドを利用すると毎時の処理をします。指定した頻度で、SageMaker のProcessing Job が起動され、推論エンドポイントへの入力データや出力された推論結果や、ベースラインをS3から読み込み、分析するコンテナが実行されます。


from sagemaker.model_monitor import CronExpressionGenerator

mon_schedule_name = 'demo-xgboost-monitoring'
s3_report_path = f's3://{bucket}/model_monitor/monitoring_report'
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

ここまでのステップが完了すると、推論 Endpoint に対して入力されたデータと推論結果がキャプチャーされ、ベースラインと比較したデータドリフトの評価が継続的に行われている状態となります。それでは、 1 月、 2 月、 3 月の推論データを順次投入して、データドリフトが顕在化するかどうかを見ていきましょう。


df_202001 = df_features[(df_features.index >= '2020-01-01') & (df_features.index < '2020-03-01')].copy()
df_202001['pred'] = exec_prediction(df_202001)

実際のモデル運用では時間経過に伴って順次推論のリクエストが投入されることになりますが、ここでは、シンプルに 2020 年 1 月から 2 月の 2 ヶ月分のデータを推論しています。推論エンドポイントはその INPUT と OUTPUT を S3 に出力し、 Model Monitor はそれを入力とするプロセスをスケジュール実行します。 1 時間毎のスケジュールの場合は、 1 時間毎に S3 パスに、 statics.json と constraint_violations.json が出力されます。最新のレポートの配置場所を確認するには、次のようにします。


mon_executions = client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name, MaxResults=1)

for execution_summary in mon_executions['MonitoringExecutionSummaries']:
    job_name = execution_summary['ProcessingJobArn'].split('/')[1]
    print("ProcessingJob: {}".format(execution_summary['ProcessingJobArn'].split('/')[1]))
    print('MonitoringExecutionStatus: {} \n'.format(execution_summary['MonitoringExecutionStatus']))

    desc_analytics_job_result=client.describe_processing_job(ProcessingJobName=job_name)
    report_uri=desc_analytics_job_result['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Report Uri: {}'.format(report_uri))

出力先は、次のように表示されます。

ProcessingJob: model-monitoring-202011150900-
MonitoringExecutionStatus: CompletedWithViolations

Report Uri: s3://<バケット名>/<パス名>

分析には、 aws-samples で公開されているコードを利用する例を紹介します。次のように必要なパッケージを導入します。


!wget https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
!pip install Jinja2

import os
from IPython.display import HTML, display
from sagemaker.s3 import S3Downloader
from sagemaker.model_monitor import MonitoringExecution
import utils as mu

そして、次の処理を行うと、レポートファイルを基に、データドリフトの状況を可視化することができます。


mon_executions = client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name, MaxResults=1)

for execution_summary in mon_executions['MonitoringExecutionSummaries']:
    processing_job_arn = execution_summary['ProcessingJobArn']

    execution = MonitoringExecution.from_processing_arn(sagemaker_session=session.Session(), processing_job_arn=processing_job_arn)
    exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
    exec_results = execution.output.destination

    baseline_statistics_filepath = exec_inputs['baseline']['S3Input']['S3Uri'] if 'baseline' in exec_inputs else None
    execution_statistics_filepath = os.path.join(exec_results, 'statistics.json')
    violations_filepath = os.path.join(exec_results, 'constraint_violations.json')

    baseline_statistics = json.loads(S3Downloader.read_file(baseline_statistics_filepath)) if baseline_statistics_filepath is not None else None
    print(execution_statistics_filepath)
    execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
    violations = json.loads(S3Downloader.read_file(violations_filepath))['violations']

mu.show_violation_df(baseline_statistics=baseline_statistics, latest_statistics=execution_statistics, violations=violations)

レポートを見ると、1 、2 月時点で、ベースラインに対してデータドリフトが起こっているようです。 baseline_drift は、ベースラインと推論時のデータとのダイバージェンスを表しており、今回はKolmogorov–Smirnov 検定が自動的に選択され、実行されています。このことは、 constraints.json の distribution_constraints セクションで確認可能です。 0 (0%) が最もズレが少なく、 1 (100%) に近づくほどズレが大きくなります。たとえば、 1 週間の移動平均 (history_168h_b) に当たる特徴量における、ベースラインデータの分布と推論時の入力データの分布の間には、上限を27.25 %とするズレが生じていると解釈できます。

violation1
先ほどと同様の手順で、今度は 2020 年 3 月と 4 月のデータを利用した推論を実行し、再度 1 時間後に、レポートを確認してみましょう。次のように、ズレが大きくなっている様子がわかります。

violation2

先ほどと同様の手順で、今度は 2020 年 5 月と 7 月のデータを利用した推論を実行し、再度 1 時間後に、レポートを確認してみましょう。各特徴量のダイバージェンスは、70% 以上に拡大している様子がわかります。

violation2

Amazon SageMaker Studioを使うと、このような可視化がより簡単に行えます。Studio の画面左側のツール選択ボタンから、 ENDPOINTS を選択し、モニタリング中のエンドポイントの「 Monitoring job history 」タブを選択すると、ジョブレポートが確認可能です。ジョブレポートをクリックし、詳細を確認することもできます。

monitoring1

ジョブの詳細画面で、「 View Amazon SageMaker notebook 」をクリックすると、先ほど紹介した、 aws-samples に公開されているコードと同じものを含むサンプル NoteBook をすぐに利用することができます。

monitoring2

ここまでは、 jupyter notebook 上で、データドリフト検知とその分析をしました。データサイエンティストだけではなく、 ML ワークロード全体を運用管理するシステム管理者にとっては、 Amazon CloudWatch を利用することも効果的です。

CloudWatch を活用することでメトリクスのモニタリングを行うことができます。CloudWatch の名前空間は、デフォルトの機能を使う場合は /aws/sagemaker/Endpoints/data-metrics となります。この名前空間に、 Endpoint 名、 MonitoringSchedule 名のディメンジョン毎のメトリクスが記録されます。このようなメトリクスの変化を、 Amazon CloudWatch アラームによる検知が可能です。これにより、変化の予兆を具に把握し事前に対処することが可能になります。なお、 Model Monitor におけるモニタリングスケジュールの最小単位は 1 時間です ( 2020/11 末現在 ) 。次の画面は、実際にアラームを設定している様子を示しています。特徴量ごとのダイバージェンスに対し、しきい値を設定することができます。

monitoring3

また、 Amazon CloudWatch Dashboard でメトリクスを可視化することも有効です。次の設定画面では、ダイバージェンスを表すメトリクスをグラフで表示していますが、ダイバージェンスがより大きくなっていることが可視化されています。これにより、データドリフトが発生し始めた時期を遡って確認することが可能になります。

monitoring4

まとめ

本記事では、 Deequ や Amazon SageMaker Model Monitor を題材に データドリフトの検知に役立つメトリクスと、その解釈について解説しました。メトリクスとしては、基本的な統計情報、スケッチ、分布距離(もしくはダイバージェンス)の 3 つの主要な手法と、実例として、 NYC Taxi のトリップデータにおけるデータドリフト検知をご紹介しました。次回は、関連する論文について、数理的な観点からご紹介する予定です。またお会いしましょう。

この記事はアマゾン ウェブ サービス ジャパンの島野、大月が執筆し、畔勝が監修しました。

著者について

島野 貞純: Amazon Web Services Japan プロフェッショナルサービス本部 AI/MLコンサルタント
大月 眞史: Amazon Web Services Japan プロフェッショナルサービス本部 シニアデータサイエンティスト