使用 Amazon SageMaker Studio
构建、训练、部署和监控机器学习模型
Amazon SageMaker Studio 是第一款用于机器学习的完全集成开发环境 (IDE),它可提供基于 Web 的单一可视界面以执行 ML 开发的所有步骤。
在本教程中,您将使用 Amazon SageMaker Studio 构建、训练、部署和监控 XGBoost 模型。您将了解从功能工程和模型训练到 ML 模型的批量和实时部署的完整机器学习 (ML) 工作流程。
在本教程中,您将学习如何完成以下各项:
- 设置 Amazon SageMaker Studio 控制面板
- 使用 Amazon SageMaker Studio 笔记本下载公共数据集并将其上传到 Amazon S3
- 创建 Amazon SageMaker 实验以跟踪和管理训练和处理作业
- 运行 Amazon SageMaker Processing 作业以从原始数据生成特性
- 使用内置 XGBoost 算法训练模型
- 使用 Amazon SageMaker 批量转换在测试数据集上测试模型性能
- 部署模型作为终端节点,并设置监控作业以监控模型终端节点在生产中有无数据漂移。
- 使用 SageMaker 模型监控器显示结果并监控模型,以确定培训数据集与部署的模型之间的任何差异。
该模型将使用包含关于客户基本资料、支付历史和记账历史的信息的 UCI Credit Card Default 数据集进行训练。
关于本教程 | |
---|---|
时间 | 1 小时 |
费用 | 低于 10 USD |
使用案例 | Machine Learning |
产品 | Amazon SageMaker |
受众 | 开发人员、数据科学家 |
级别 | 中级 |
上次更新日期 | 2020 年 8 月 17 日 |
第 2 步:创建 Amazon SageMaker Studio 控制面板
完成以下步骤,以载入 Amazon SageMaker Studio 并设置 Amazon SageMaker Studio 控制面板。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的开始使用 Amazon SageMaker Studio。
a.登录到 Amazon SageMaker 控制台。
注:在右上角,确保选择已推出 SageMaker Studio 的 AWS 区域。有关区域清单,请参阅载入 Amazon SageMaker Studio。



Amazon SageMaker 将创建一个具有所需权限的角色,并将其分配给您的实例。


第 3 步:下载数据集
Amazon SageMaker Studio 笔记本是一键式 Jupyter 笔记本,包含构建和测试训练脚本需要的所有元素。SageMaker Studio 还包括实验跟踪和可视化,以便于从一个位置轻松地管理整个机器学习工作流程。
完成以下步骤,以创建 SageMaker 笔记本,下载数据集,然后将数据集上传到 Amazon S3。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的使用 Amazon SageMaker Studio 笔记本。


import boto3
import sagemaker
from sagemaker import get_execution_role
import sys
if int(sagemaker.__version__.split('.')[0]) == 2:
!{sys.executable} -m pip install sagemaker==1.72.0
print("Installing previous SageMaker Version. Please restart the kernel")
else:
print("Version is good")
role = get_execution_role()
sess = sagemaker.Session()
region = boto3.session.Session().region_name
print("Region = {}".format(region))
sm = boto3.Session().client('sagemaker')
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from time import sleep, gmtime, strftime
import json
import time
!pip install sagemaker-experiments
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

rawbucket= sess.default_bucket() # Alternatively you can use our custom bucket here.
prefix = 'sagemaker-modelmonitor' # use this prefix to store all files pertaining to this workshop.
dataprefix = prefix + '/data'
traindataprefix = prefix + '/train_data'
testdataprefix = prefix + '/test_data'
testdatanolabelprefix = prefix + '/test_data_no_label'
trainheaderprefix = prefix + '/train_headers'

e.下载数据集,并使用 pandas 库将其导入。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls
data = pd.read_excel('default of credit card clients.xls', header=1)
data = data.drop(columns = ['ID'])
data.head()

f.将最后一列重命名为 Label,并单独提取标签列。对于 Amazon SageMaker 内置 XGBoost 算法,标签列必须是 dataframe 中的第一列。要进行此更改,将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。
data.rename(columns={"default payment next month": "Label"}, inplace=True)
lbl = data.Label
data = pd.concat([lbl, data.drop(columns=['Label'])], axis = 1)
data.head()

g.将 CSV 数据集上传到 Amazon S3 存储桶。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。
if not os.path.exists('rawdata/rawdata.csv'):
!mkdir rawdata
data.to_csv('rawdata/rawdata.csv', index=None)
else:
pass
# Upload the raw dataset
raw_data_location = sess.upload_data('rawdata', bucket=rawbucket, key_prefix=dataprefix)
print(raw_data_location)
完成了! 代码输出显示 S3 存储桶 URI,如下例所示:
s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data

第 4 步:使用 Amazon SageMaker Processing 处理数据
在这一步中,您使用 Amazon SageMaker Processing 预处理数据集,包括扩展列以及将数据集拆分成训练和测试数据。通过 Amazon SageMaker Processing,您可以在完全托管的基础设施上运行预处理、后处理和模型评估工作负载。
完成以下步骤以使用 Amazon SageMaker Processing 处理数据和生成功能。
注:Amazon SageMaker Processing 在来自您的笔记本的独立计算实例上运行。这意味着在处理作业运行期间,您可以继续实验并在笔记本中运行代码。这将会产生额外的收费,用于在处理作业期间保持正常运行的实例的成本。当处理作业完成后,SageMaker 就会自动终止该实例。有关定价详情,请参阅 Amazon SageMaker 定价。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的处理数据和评估模型。
a.导入 scikit-learn 处理容器。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。
注:Amazon SageMaker 可为 scikit-learn 提供托管容器。有关更多信息,请参阅使用 scikit-learn 处理数据和评估模型。
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
role=role,
instance_type='ml.c4.xlarge',
instance_count=1)
b.将以下预处理脚本复制并粘贴到新单元格中,然后选择 Run(运行)。
%%writefile preprocessing.py
import argparse
import os
import warnings
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.exceptions import DataConversionWarning
from sklearn.compose import make_column_transformer
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--train-test-split-ratio', type=float, default=0.3)
parser.add_argument('--random-split', type=int, default=0)
args, _ = parser.parse_known_args()
print('Received arguments {}'.format(args))
input_data_path = os.path.join('/opt/ml/processing/input', 'rawdata.csv')
print('Reading input data from {}'.format(input_data_path))
df = pd.read_csv(input_data_path)
df.sample(frac=1)
COLS = df.columns
newcolorder = ['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
split_ratio = args.train_test_split_ratio
random_state=args.random_split
X_train, X_test, y_train, y_test = train_test_split(df.drop('Label', axis=1), df['Label'],
test_size=split_ratio, random_state=random_state)
preprocess = make_column_transformer(
(['PAY_AMT1'], StandardScaler()),
(['BILL_AMT1'], MinMaxScaler()),
remainder='passthrough')
print('Running preprocessing and feature engineering transformations')
train_features = pd.DataFrame(preprocess.fit_transform(X_train), columns = newcolorder)
test_features = pd.DataFrame(preprocess.transform(X_test), columns = newcolorder)
# concat to ensure Label column is the first column in dataframe
train_full = pd.concat([pd.DataFrame(y_train.values, columns=['Label']), train_features], axis=1)
test_full = pd.concat([pd.DataFrame(y_test.values, columns=['Label']), test_features], axis=1)
print('Train data shape after preprocessing: {}'.format(train_features.shape))
print('Test data shape after preprocessing: {}'.format(test_features.shape))
train_features_headers_output_path = os.path.join('/opt/ml/processing/train_headers', 'train_data_with_headers.csv')
train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_data.csv')
test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_data.csv')
print('Saving training features to {}'.format(train_features_output_path))
train_full.to_csv(train_features_output_path, header=False, index=False)
print("Complete")
print("Save training data with headers to {}".format(train_features_headers_output_path))
train_full.to_csv(train_features_headers_output_path, index=False)
print('Saving test features to {}'.format(test_features_output_path))
test_full.to_csv(test_features_output_path, header=False, index=False)
print("Complete")
d.指定当 SageMaker Processing 作业完成后您要将训练和测试数据存储在哪里。Amazon SageMaker Processing 自动将数据存储在指定的位置。
train_data_location = rawbucket + '/' + traindataprefix
test_data_location = rawbucket+'/'+testdataprefix
print("Training data location = {}".format(train_data_location))
print("Test data location = {}".format(test_data_location))
e.复制并粘贴以下代码以启动处理作业。此代码通过调用 sklearn_processor.run 并提取关于该处理作业的一些可选元数据(例如,存储训练和测试输出的位置)来启动作业。
from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_processor.run(code=codeupload,
inputs=[ProcessingInput(
source=raw_data_location,
destination='/opt/ml/processing/input')],
outputs=[ProcessingOutput(output_name='train_data',
source='/opt/ml/processing/train',
destination='s3://' + train_data_location),
ProcessingOutput(output_name='test_data',
source='/opt/ml/processing/test',
destination="s3://"+test_data_location),
ProcessingOutput(output_name='train_data_headers',
source='/opt/ml/processing/train_headers',
destination="s3://" + rawbucket + '/' + prefix + '/train_headers')],
arguments=['--train-test-split-ratio', '0.2']
)
preprocessing_job_description = sklearn_processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
if output['OutputName'] == 'train_data':
preprocessed_training_data = output['S3Output']['S3Uri']
if output['OutputName'] == 'test_data':
preprocessed_test_data = output['S3Output']['S3Uri']
记下在输出中提供给处理器的代码、训练和测试数据的位置。另外,记下提供给处理脚本的参数。
第 5 步:创建 Amazon SageMaker 实验
现在,您已经在 Amazon S3 中下载并暂存了数据集,您可以开始创建 Amazon SageMaker 实验。实验是与同一机器学习项目相关的处理和训练作业的集合。Amazon SageMaker Experiments 自动为您管理和跟踪您的训练运行。
完成以下步骤以创建新的实验。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的实验。
a.复制并粘贴以下代码,以创建名为 Build-train-deploy- 的实验。
# Create a SageMaker Experiment
cc_experiment = Experiment.create(
experiment_name=f"Build-train-deploy-{int(time.time())}",
description="Predict credit card default from payments data",
sagemaker_boto_client=sm)
print(cc_experiment)
每个训练作业记录为一个试用。每个试用都是端对端训练作业的一个迭代。除了训练作业之外,它还可跟踪预处理和后处理作业以及数据集和其他元数据。单个实验可包括多个试用,因此您可以轻松地在 Amazon SageMaker Studio 实验窗格内跟踪多个迭代随时间的变化。

b.复制并粘贴以下代码,以跟踪实验下的预处理作业以及训练管道中的步骤。
# Start Tracking parameters used in the Pre-processing pipeline.
with Tracker.create(display_name="Preprocessing", sagemaker_boto_client=sm) as tracker:
tracker.log_parameters({
"train_test_split_ratio": 0.2,
"random_state":0
})
# we can log the s3 uri to the dataset we just uploaded
tracker.log_input(name="ccdefault-raw-dataset", media_type="s3/uri", value=raw_data_location)
tracker.log_input(name="ccdefault-train-dataset", media_type="s3/uri", value=train_data_location)
tracker.log_input(name="ccdefault-test-dataset", media_type="s3/uri", value=test_data_location)
c.查看实验的详细信息:在 Experiments(实验)窗格中,右键单击名为 Build-train-deploy- 的实验并选择 Open in trial components list(在试用组件列表中打开)。

d.复制并粘贴以下代码,然后选择 Run(运行)。然后,仔细了解一下代码:
要训练 XGBoost 分类器,您应先导入 Amazon SageMaker 维护的 XGBoost 容器。然后,您将训练运行记录在 Trial(试用)下,以使 SageMaker Experiments 能够在 Trial(试用)名称下跟踪它。预处理作业包括在相同试用名称下,因为它是该管道的一部分。接下来,创建一个 SageMaker Estimator 对象,它可自动预置您选择的底层实例类型,将训练数据从处理作业复制到指定的输出位置,训练模型,并输出模型构件。
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '1.0-1')
s3_input_train = sagemaker.s3_input(s3_data='s3://' + train_data_location, content_type='csv')
preprocessing_trial_component = tracker.trial_component
trial_name = f"cc-default-training-job-{int(time.time())}"
cc_trial = Trial.create(
trial_name=trial_name,
experiment_name=cc_experiment.experiment_name,
sagemaker_boto_client=sm
)
cc_trial.add_trial_component(preprocessing_trial_component)
cc_training_job_name = "cc-training-job-{}".format(int(time.time()))
xgb = sagemaker.estimator.Estimator(container,
role,
train_instance_count=1,
train_instance_type='ml.m4.xlarge',
train_max_run=86400,
output_path='s3://{}/{}/models'.format(rawbucket, prefix),
sagemaker_session=sess) # set to true for distributed training
xgb.set_hyperparameters(max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.8,
verbosity=0,
objective='binary:logistic',
num_round=100)
xgb.fit(inputs = {'train':s3_input_train},
job_name=cc_training_job_name,
experiment_config={
"TrialName": cc_trial.trial_name, #log training job in Trials for lineage
"TrialComponentDisplayName": "Training",
},
wait=True,
)
time.sleep(2)
完成训练作业将需要大约 70 秒。您应该能看到以下输出。
Completed - Training job completed

e.在左侧工具栏中,选择 Experiment(实验)。右键单击 Build-train-deploy- 实验,并选择 Open in trial components list(在试用组件列表中打开)。Amazon SageMaker Experiments 可捕获所有运行,包括任何失败的训练运行。

f.右键单击已完成的 Training jobs(训练作业)之一,并选择 Open in Trial Details(在试用详情中打开)以探索与训练作业相关的元数据。
注:您可能需要刷新页面以查看最新结果。

第 6 步:部署用于脱机推理的模型
在预处理步骤中,您已生成了一些测试数据。在这一步中,您从经过训练的模型生成脱机或批量推理,以在不可见测试数据上评估模型性能。
完成以下步骤以部署用于脱机推理的模型。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的批量转换。
a.复制并粘贴以下代码,然后选择 Run(运行)。
这一步可将测试数据集从 Amazon S3 位置复制到本地文件夹中。
test_data_path = 's3://' + test_data_location + '/test_data.csv'
! aws s3 cp $test_data_path .

b.复制并粘贴以下代码,然后选择 Run(运行)。
test_full = pd.read_csv('test_data.csv', names = [str(x) for x in range(len(data.columns))])
test_full.head()
c.复制并粘贴以下代码,然后选择 Run(运行)。这一步提取标签列。
label = test_full['0']
d.复制并粘贴以下代码,然后选择 Run(运行)以创建批量转换作业。然后,仔细了解一下代码:
与培训作业相似,SageMaker 预置所有基础资源、复制经过培训的模型构件、在本地设置批量终端节点、复制数据,然后对这些数据运行推理并将输出推送到 Amazon S3。请注意,通过设置 input_filter,您可让批量转换知道忽略测试数据中的第一列,即标签列。
%%time
sm_transformer = xgb.transformer(1, 'ml.m5.xlarge', accept = 'text/csv')
# start a transform job
sm_transformer.transform(test_data_path, split_type='Line', input_filter='$[1:]', content_type='text/csv')
sm_transformer.wait()
批量转换作业将需要大约 4 分钟时间来完成,然后您就可以评估模型结果。

e.复制并运行以下代码,以评估模型指标。然后,仔细了解一下代码:
首先,您定义一个函数,让它从 Amazon S3 存储桶提取包含在扩展名为 .out 的文件中的批量转换作业的输出。然后,您将预测的标签提取到 dataframe 中,并加 true 标签附加到此 dataframe。
import json
import io
from urllib.parse import urlparse
def get_csv_output_from_s3(s3uri, file_name):
parsed_url = urlparse(s3uri)
bucket_name = parsed_url.netloc
prefix = parsed_url.path[1:]
s3 = boto3.resource('s3')
obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
return obj.get()["Body"].read().decode('utf-8')
output = get_csv_output_from_s3(sm_transformer.output_path, 'test_data.csv.out')
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)
output_df['Predicted']=np.round(output_df.values)
output_df['Label'] = label
from sklearn.metrics import confusion_matrix, accuracy_score
confusion_matrix = pd.crosstab(output_df['Predicted'], output_df['Label'], rownames=['Actual'], colnames=['Predicted'], margins = True)
confusion_matrix
您应看到与图中相似的输出,它显示了预测的 True 和 False 值与实际值。

f.使用以下代码提取基准模型精度和模型精度。
注:有用的基准精度模型可能是非默认案例的一小部分。始终预测用户不使用默认值的模型将可达到该精度。
print("Baseline Accuracy = {}".format(1- np.unique(data['Label'], return_counts=True)[1][1]/(len(data['Label']))))
print("Accuracy Score = {}".format(accuracy_score(label, output_df['Predicted'])))
结果显示,简单模型可能已超过了基准精度。为了改善结果,您可以调整超参数。您可以在 SageMaker 上使用超参数优化 (HPO) 以进行自动模型调优。要了解更多信息,请参阅超参数调优的工作原理。
注:虽然未包含在本教程中,您仍可以选择包含批量转换作为您的试用中的组成部分。当您调用 .transform 函数时,只需像您在培训作业中那样传递到 experiment_config 中即可。Amazon SageMaker 自动关联批量转换作为试用组件。

第 7 步:部署模型作为终端节点并设置数据捕获
在这一步中,您可以部署模型作为 RESTful HTTPS 终端节点,以充当实时推理。Amazon SageMaker 自动处理模型托管以及为您创建终端节点的工作。
完成以下步骤,以部署模型作为终端节点并设置数据捕获。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的部署用于推理的模型。
a.复制并粘贴以下代码,然后选择 Run(运行)。
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer
sm_client = boto3.client('sagemaker')
latest_training_job = sm_client.list_training_jobs(MaxResults=1,
SortBy='CreationTime',
SortOrder='Descending')
training_job_name=TrainingJobName=latest_training_job['TrainingJobSummaries'][0]['TrainingJobName']
training_job_description = sm_client.describe_training_job(TrainingJobName=training_job_name)
model_data = training_job_description['ModelArtifacts']['S3ModelArtifacts']
container_uri = training_job_description['AlgorithmSpecification']['TrainingImage']
# create a model.
def create_model(role, model_name, container_uri, model_data):
return sm_client.create_model(
ModelName=model_name,
PrimaryContainer={
'Image': container_uri,
'ModelDataUrl': model_data,
},
ExecutionRoleArn=role)
try:
model = create_model(role, training_job_name, container_uri, model_data)
except Exception as e:
sm_client.delete_model(ModelName=training_job_name)
model = create_model(role, training_job_name, container_uri, model_data)
print('Model created: '+model['ModelArn'])

b.要指定数据配置设置,复制并粘贴以下代码,然后选择 Run(运行)。
此代码告诉 SageMaker 捕获终端节点收到的 100% 的推理负载,捕获输入和输出,并将输入内容类型注释为 csv。
s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(rawbucket, prefix)
data_capture_configuration = {
"EnableCapture": True,
"InitialSamplingPercentage": 100,
"DestinationS3Uri": s3_capture_upload_path,
"CaptureOptions": [
{ "CaptureMode": "Output" },
{ "CaptureMode": "Input" }
],
"CaptureContentTypeHeader": {
"CsvContentTypes": ["text/csv"],
"JsonContentTypes": ["application/json"]}
c.复制并粘贴以下代码,然后选择 Run(运行)。这一步创建终端节点配置,并部署该终端节点。在代码中,您可以指定实例类型以及您是否要将所有流量发送到此终端节点,等等。
def create_endpoint_config(model_config, data_capture_config):
return sm_client.create_endpoint_config(
EndpointConfigName=model_config,
ProductionVariants=[
{
'VariantName': 'AllTraffic',
'ModelName': model_config,
'InitialInstanceCount': 1,
'InstanceType': 'ml.m4.xlarge',
'InitialVariantWeight': 1.0,
},
],
DataCaptureConfig=data_capture_config
)
try:
endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)
except Exception as e:
sm_client.delete_endpoint_config(EndpointConfigName=endpoint)
endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)
print('Endpoint configuration created: '+ endpoint_config['EndpointConfigArn'])
d.复制并粘贴以下代码,然后选择 Run(运行)以创建终端节点。
# Enable data capture, sampling 100% of the data for now. Next we deploy the endpoint in the correct VPC.
endpoint_name = training_job_name
def create_endpoint(endpoint_name, config_name):
return sm_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=training_job_name
)
try:
endpoint = create_endpoint(endpoint_name, endpoint_config)
except Exception as e:
sm_client.delete_endpoint(EndpointName=endpoint_name)
endpoint = create_endpoint(endpoint_name, endpoint_config)
print('Endpoint created: '+ endpoint['EndpointArn'])

e.在左侧工具栏中,选择 Endpoints(终端节点)。Endpoints(终端节点)列表显示服务中的所有终端节点。
请注意,build-train-deploy 终端节点显示 Creating(正在创建)状态。要部署模型,Amazon SageMaker 必须先将模型构件和推理图像复制到实例上,并设置 HTTPS 终端节点以使用客户端应用程序或 RESTful API 进行推理。

创建了终端节点之后,状态就会变成 InService(可用)。(请注意,创建部署可能需要大约 5-10 分钟。)
注:您可能需要单击 Refresh(刷新)以获得更新后的状态。

f.在 JupyterLab 笔记本中,复制并运行以下代码以获取测试数据集的样本。此代码将获取前 10 行。
!head -10 test_data.csv > test_sample.csv
g.运行以下代码以向此终端节点发送一些推理请求。
注:如果您指定了不同的终端节点名称,您需要将下面的 endpoint 替换为您的终端节点名称。
from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer
predictor = RealTimePredictor(endpoint=endpoint_name, content_type = 'text/csv')
with open('test_sample.csv', 'r') as f:
for row in f:
payload = row.rstrip('\n')
response = predictor.predict(data=payload[2:])
sleep(0.5)
print('done!')
h.运行以下代码以验证模型监控器正在正确地获取入站数据。
在代码中,current_endpoint_capture_prefix 捕获存储您的模型监控器的目录路径。导航到 Amazon S3 存储桶,以查看是否正在捕获预测请求。请注意,此位置应与上述代码中的 s3_capture_upload_path 一致。
# Extract the captured json files.
data_capture_prefix = '{}/monitoring'.format(prefix)
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/datacapture/{}/AllTraffic'.format(data_capture_prefix, endpoint_name)
print(current_endpoint_capture_prefix)
result = s3_client.list_objects(Bucket=rawbucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))
capture_files[0]
捕获的输出指示数据捕获已配置,并且正在保存入站请求。
注:如果您最初看到 Null 响应,则当您首次初始化数据捕获时数据可能未同步加载到 Amazon S3 路径。等待大约一分钟,然后重试。

i.运行以下代码以提取 json 文件之一的内容,并查看捕获的输出。
# View contents of the captured file.
def get_obj_body(bucket, obj_key):
return s3_client.get_object(Bucket=rawbucket, Key=obj_key).get('Body').read().decode("utf-8")
capture_file = get_obj_body(rawbucket, capture_files[0])
print(json.dumps(json.loads(capture_file.split('\n')[5]), indent = 2, sort_keys =True))
输出指示数据捕获正在捕获输入负载和模型的输出。

第 8 步:利用 SageMaker 模型监控器监控终端节点
在这一步中,您可以启用 SageMaker 模型监控器以监控部署的终端节点有无数据漂移。为此,您应比较发送到模型的负载和输出与基准,并确定输入数据或标签是否存在任何漂移。
完成以下步骤以启用模型监控。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的 Amazon SageMaker 模型监控器。
a.运行以下代码,以在 Amazon S3 存储桶中创建一个用于存储模型监控器输出的文件夹。
此代码可创建两个文件夹:一个文件夹存储您用于训练模型的基准数据;第二个文件夹存储与该基准的任何偏离。
model_prefix = prefix + "/" + endpoint_name
baseline_prefix = model_prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'
baseline_data_uri = 's3://{}/{}'.format(rawbucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(rawbucket, baseline_results_prefix)
train_data_header_location = "s3://" + rawbucket + '/' + prefix + '/train_headers'
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))
print(train_data_header_location)

b.运行以下代码,以将模型监控器的基准作业设置为捕获训练数据的统计信息。为此,模型监控器使用在 Apache Spark 上构建的 deequ 库对数据开展单元测试。
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
my_default_monitor = DefaultModelMonitor(
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
volume_size_in_gb=20,
max_runtime_in_seconds=3600)
my_default_monitor.suggest_baseline(
baseline_dataset=os.path.join(train_data_header_location, 'train_data_with_headers.csv'),
dataset_format=DatasetFormat.csv(header=True),
output_s3_uri=baseline_results_uri,
wait=True
)
模型监控器设置一个独立的实例,复制训练数据,并生成一些统计信息。该服务会生成大量的 Apache Spark 日志,您可以忽略。当作业完成后,您将会看到 Spark job completed(Spark 作业已完成)输出。

c.运行以下代码以查看基准作业生成的输出。
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df
您将会看到两个文件:constraints.json 和 statistics.json。接下来,更深入地探究其内容:
上面的代码可将 /statistics.json 中的 json 输出转换成 pandas dataframe。注意 deequ 库如何推理列的数据类型、是否存在 Null 或缺失值,以及均值、最小值、最大值、总和、标准差之类的统计参数和输入数据流的 sketch 参数。
同样,constraints.json 文件由训练数据集所遵守的若干限制构成,例如非负值和功能字段的数据类型。

您将会看到两个文件:constraints.json 和 statistics.json。接下来,更深入地探究其内容:
上面的代码可将 /statistics.json 中的 json 输出转换成 pandas dataframe。注意 deequ 库如何推理列的数据类型、是否存在 Null 或缺失值,以及均值、最小值、最大值、总和、标准差之类的统计参数和输入数据流的 sketch 参数。

同样,constraints.json 文件由训练数据集所遵守的若干限制构成,例如非负值和功能字段的数据类型。
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df

d.运行以下代码以设置终端节点监控的频率。
您可以指定每天或每小时。此代码指定每小时频率,但是您可能想为生产应用更改此设置,因为每小时频率将会产生大量数据。模型监控器将会生成包含其找到所有违规事件的报告。
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(rawbucket,reports_prefix)
print(s3_report_path)
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime
mon_schedule_name = 'Built-train-deploy-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
monitor_schedule_name=mon_schedule_name,
endpoint_input=predictor.endpoint,
output_s3_uri=s3_report_path,
statistics=my_default_monitor.baseline_statistics(),
constraints=my_default_monitor.suggested_constraints(),
schedule_cron_expression=CronExpressionGenerator.hourly(),
enable_cloudwatch_metrics=True,
)
请注意,此代码启用 Amazon CloudWatch 指标,它指示模型监控器将输出发送到 CloudWatch。您可以通过此方法利用 CloudWatch 警报触发警报,以使工程师或管理员知道何时检测到数据漂移。

第 9 步:测试 SageMaker 模型监控器性能
在这一步中,您对照一些样本数据评估模型监控器。您应修改测试负载中的多种功能的分布而不原样发送测试负载,以测试模型监控器是否能够检测到变化。
完成以下步骤以测试模型监控器性能。
注:有关更多信息,请参阅 Amazon SageMaker 文档中的 Amazon SageMaker 模型监控器。
a.运行以下代码以导入测试数据,并生成一些修改后的样本数据。
COLS = data.columns
test_full = pd.read_csv('test_data.csv', names = ['Label'] +['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
)
test_full.head()

b.运行以下代码以更改少数行。请注意此处图像中标记为红色的部分与上一步中的差异。删除标签列,并保存修改后的样本测试数据。
faketestdata = test_full
faketestdata['EDUCATION'] = -faketestdata['EDUCATION'].astype(float)
faketestdata['BILL_AMT2']= (faketestdata['BILL_AMT2']//10).astype(float)
faketestdata['AGE']= (faketestdata['AGE']-10).astype(float)
faketestdata.head()
faketestdata.drop(columns=['Label']).to_csv('test-data-input-cols.csv', index = None, header=None)

c.运行以下代码以使用此修改后的数据集反复调用终端节点。
from threading import Thread
runtime_client = boto3.client('runtime.sagemaker')
# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
with open(file_name, 'r') as f:
for row in f:
payload = row.rstrip('\n')
response = runtime_client.invoke_endpoint(EndpointName=ep_name,
ContentType='text/csv',
Body=payload)
time.sleep(1)
def invoke_endpoint_forever():
while True:
invoke_endpoint(endpoint, 'test-data-input-cols.csv', runtime_client)
thread = Thread(target = invoke_endpoint_forever)
thread.start()
# Note that you need to stop the kernel to stop the invocations
d.运行以下代码以检查模型监控器作业的状态。
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))
您应看到输出为 Schedule status: Scheduled(计划状态:已计划)
e.运行以下代码,每 10 分钟检查一次是否生成了任何监控输出。请注意,第一个作业运行时的缓冲大约为 20 分钟。
mon_executions = my_default_monitor.list_executions()
print("We created ahourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")
while len(mon_executions) == 0:
print("Waiting for the 1st execution to happen...")
time.sleep(600)
mon_executions = my_default_monitor.list_executions()

f.在 Amazon SageMaker Studio 的左侧工具栏中,选择 Endpoints(终端节点)。右键单击 build-train-deploy 终端节点,然后选择 Describe Endpoint(描述终端节点)。

g.选择 Monitoring job history(监控作业历史)。请注意,Monitoring status(监控状态)显示 In progress(正在进行)。

当作业完成后,Monitoring status(监控状态)显示Issue found(找到问题)(对于找到任何问题的情况)。

h.双击问题以查看详细信息。在您以前修改过的 EDUCATION 和 BILL_AMT2 字段中,您可以看到模型监控器检测到很大的基线漂移。
模型监控器还在两个其他字段中检测到数据类型差异。训练数据由整数标签构成,但是 XGBoost 模型预测概率分值。因此,模型监控器会报告不一致。

i.在您的 JupyterLab 笔记本中,运行以下单元格以查看来自模型监控器的输出。
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)
print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))
latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

j.运行以下代码,以查看模型监控器生成的报告。
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))
除了 statistics.json 和 constraints.json 之外,您还可以看到生成了名为 constraint_violations.json 的新文件。此文件的内容在 Amazon SageMaker Studio 中显示在上方(步骤 g)。

注:当您设置数据捕获时,Amazon SageMaker Studio 自动为您创建一个笔记本,它包含上述代码以运行监控作业。要访问该笔记本,右键单击终端节点,然后选择 Describe Endpoint(描述终端节点)。在 Monitoring results(监控结果)选项卡上,选择 Enable Monitoring(启用监控)。这一步自动打开一个 Jupyter 笔记本,其中包含您在上面授权的代码。

第 10 步:清除
在这一步中,您将终止在本实验中使用的资源。
重要说明:终止当前未在使用的资源可降低成本,是最佳做法。不终止资源可能会在您的账户下产生费用。
a.删除监控计划:在您的 Jupyter 笔记本中,复制并粘贴以下代码,然后选择 Run(运行)。
注:与模型监控器终端节点关联的所有监控作业都已删除后,您才能删除该终端节点。
my_default_monitor.delete_monitoring_schedule()
time.sleep(10) # actually wait for the deletion
b.删除您的终端节点:在您的 Jupyter 笔记本中,复制并粘贴以下代码,然后选择 Run(运行)。
注:确保先删除与该终端节点关联的所有监控作业。
sm.delete_endpoint(EndpointName = endpoint_name)
如果您想要清除所有训练构件(模型、预处理数据集等等),请将以下代码复制并粘贴到您的代码单元格中,然后选择 Run(运行)。
注:确保用您自己的账号替换 ACCOUNT-NUMBER。
%%sh
aws s3 rm --recursive s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data
建议的后续步骤
了解 Amazon SageMaker Studio
了解如何使用 Amazon SageMaker Autopilot 自动构建、训练和部署 ML 模型
如果您要了解更多,请完成使用 Amazon SageMaker Autopilot 自动创建机器学习模型的 10 分钟教程。