使用 Amazon SageMaker Studio

构建、训练、部署和监控机器学习模型

Amazon SageMaker Studio 是第一款用于机器学习的完全集成开发环境 (IDE),它可提供基于 Web 的单一可视界面以执行 ML 开发的所有步骤。

在本教程中,您将使用 Amazon SageMaker Studio 构建、训练、部署和监控 XGBoost 模型。您将了解从功能工程和模型训练到 ML 模型的批量和实时部署的完整机器学习 (ML) 工作流程。

在本教程中,您将学习如何完成以下各项:

  1. 设置 Amazon SageMaker Studio 控制面板
  2. 使用 Amazon SageMaker Studio 笔记本下载公共数据集并将其上传到 Amazon S3
  3. 创建 Amazon SageMaker 实验以跟踪和管理训练和处理作业
  4. 运行 Amazon SageMaker Processing 作业以从原始数据生成特性
  5. 使用内置 XGBoost 算法训练模型
  6. 使用 Amazon SageMaker 批量转换在测试数据集上测试模型性能
  7. 部署模型作为终端节点,并设置监控作业以监控模型终端节点在生产中有无数据漂移。
  8. 使用 SageMaker 模型监控器显示结果并监控模型,以确定培训数据集与部署的模型之间的任何差异。

该模型将使用包含关于客户基本资料、支付历史和记账历史的信息的 UCI Credit Card Default 数据集进行训练。

关于本教程
时间 1 小时                                      
费用 低于 10 USD
使用案例 Machine Learning
产品 Amazon SageMaker
受众 开发人员、数据科学家
级别 中级
上次更新日期 2020 年 8 月 17 日

步骤 1.创建 AWS 账户

本教程中创建和使用的资源符合 AWS 免费套餐条件。本研讨会的费用不到 10 USD。

已拥有账户? 登录

第 2 步:创建 Amazon SageMaker Studio 控制面板

完成以下步骤,以载入 Amazon SageMaker Studio 并设置 Amazon SageMaker Studio 控制面板。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的开始使用 Amazon SageMaker Studio


a.登录到 Amazon SageMaker 控制台。 

注:在右上角,确保选择已推出 SageMaker Studio 的 AWS 区域。有关区域清单,请参阅载入 Amazon SageMaker Studio


b.在 Amazon SageMaker 导航窗格,选择 Amazon SageMaker Studio
 
注:如果您是第一次使用 Amazon SageMaker Studio,请一定要完成 Studio 载入流程。 载入时,您可以选择使用 AWS Single Sign-On (AWS SSO) 或 AWS Identity and Access Management (IAM) 进行身份验证。当您使用 IAM 身份验证时,您可以选择快速启动或标准设置流程。如果您不确定选择哪种方式,请参阅 载入 Amazon SageMaker Studio,并咨询您的 IT 管理员寻求帮助。为简单起见,本教程采用了 快速启动流程。

c.在 开始使用对话框中,选择 快速启动并指定用户名。

d.有关 执行角色,请选择 创建一个 IAM 角色。在出现的对话框中,选择 任何 S3 存储桶,然后选择 创建角色

Amazon SageMaker 将创建一个具有所需权限的角色,并将其分配给您的实例。 


e.单击 提交

第 3 步:下载数据集

Amazon SageMaker Studio 笔记本是一键式 Jupyter 笔记本,包含构建和测试训练脚本需要的所有元素。SageMaker Studio 还包括实验跟踪和可视化,以便于从一个位置轻松地管理整个机器学习工作流程。

完成以下步骤,以创建 SageMaker 笔记本,下载数据集,然后将数据集上传到 Amazon S3。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的使用 Amazon SageMaker Studio 笔记本


a.在 Amazon SageMaker Studio 控制面板中,选择 打开 Studio

b.在 文件菜单的 JupyterLab 中,选择 新建,然后选择 笔记本。在 选择内核对话框中,选择 Python 3(数据科学)
 

c.首先,验证您的 Amazon SageMaker Python SDK 的版本。将以下代码块复制并粘贴到代码单元格中,然后选择 Run(运行)。如果打印消息显示 安装先前的 SageMaker 版本。请重新启动内核,选择 Kernel(内核)选项卡,然后选择 Restart Kernel(重新启动内核)。然后,您可以继续完成剩余的步骤。
 
注:运行代码时,方括号之间会出现 *。几秒钟后,代码执行完成,* 会由一个数字替换。
import boto3
import sagemaker
from sagemaker import get_execution_role
import sys

if int(sagemaker.__version__.split('.')[0]) == 2:
    !{sys.executable} -m pip install sagemaker==1.72.0
    print("Installing previous SageMaker Version. Please restart the kernel")
else:
    print("Version is good")

role = get_execution_role()
sess = sagemaker.Session()
region = boto3.session.Session().region_name
print("Region = {}".format(region))
sm = boto3.Session().client('sagemaker')
接下来,导入库。将以下代码复制并粘贴到代码单元格中,然后选择 Run(运行)。
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from time import sleep, gmtime, strftime
import json
import time
最后,导入实验。将以下代码复制并粘贴到代码单元格中,然后选择 Run(运行)。
!pip install sagemaker-experiments 
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

d.定义用于该项目的 Amazon S3 存储桶和文件夹。将以下代码复制并粘贴到代码单元格中,然后选择 Run(运行)。
rawbucket= sess.default_bucket() # Alternatively you can use our custom bucket here. 

prefix = 'sagemaker-modelmonitor' # use this prefix to store all files pertaining to this workshop.

dataprefix = prefix + '/data'
traindataprefix = prefix + '/train_data'
testdataprefix = prefix + '/test_data'
testdatanolabelprefix = prefix + '/test_data_no_label'
trainheaderprefix = prefix + '/train_headers'

e.下载数据集,并使用 pandas 库将其导入。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。

! wget https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls
data = pd.read_excel('default of credit card clients.xls', header=1)
data = data.drop(columns = ['ID'])
data.head()

f.将最后一列重命名为 Label,并单独提取标签列。对于 Amazon SageMaker 内置 XGBoost 算法,标签列必须是 dataframe 中的第一列。要进行此更改,将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。

data.rename(columns={"default payment next month": "Label"}, inplace=True)
lbl = data.Label
data = pd.concat([lbl, data.drop(columns=['Label'])], axis = 1)
data.head()

g.将 CSV 数据集上传到 Amazon S3 存储桶。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。

if not os.path.exists('rawdata/rawdata.csv'):
    !mkdir rawdata
    data.to_csv('rawdata/rawdata.csv', index=None)
else:
    pass
# Upload the raw dataset
raw_data_location = sess.upload_data('rawdata', bucket=rawbucket, key_prefix=dataprefix)
print(raw_data_location)

完成了! 代码输出显示 S3 存储桶 URI,如下例所示:

s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data

第 4 步:使用 Amazon SageMaker Processing 处理数据

在这一步中,您使用 Amazon SageMaker Processing 预处理数据集,包括扩展列以及将数据集拆分成训练和测试数据。通过 Amazon SageMaker Processing,您可以在完全托管的基础设施上运行预处理、后处理和模型评估工作负载。

完成以下步骤以使用 Amazon SageMaker Processing 处理数据和生成功能。

注:Amazon SageMaker Processing 在来自您的笔记本的独立计算实例上运行。这意味着在处理作业运行期间,您可以继续实验并在笔记本中运行代码。这将会产生额外的收费,用于在处理作业期间保持正常运行的实例的成本。当处理作业完成后,SageMaker 就会自动终止该实例。有关定价详情,请参阅 Amazon SageMaker 定价

注:有关更多信息,请参阅 Amazon SageMaker 文档中的处理数据和评估模型


a.导入 scikit-learn 处理容器。将以下代码复制并粘贴到新的代码单元格中,然后选择 Run(运行)。

注:Amazon SageMaker 可为 scikit-learn 提供托管容器。有关更多信息,请参阅使用 scikit-learn 处理数据和评估模型

from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.c4.xlarge',
                                     instance_count=1)

b.将以下预处理脚本复制并粘贴到新单元格中,然后选择 Run(运行)。

%%writefile preprocessing.py

import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.exceptions import DataConversionWarning
from sklearn.compose import make_column_transformer

warnings.filterwarnings(action='ignore', category=DataConversionWarning)

if __name__=='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.3)
    parser.add_argument('--random-split', type=int, default=0)
    args, _ = parser.parse_known_args()
    
    print('Received arguments {}'.format(args))

    input_data_path = os.path.join('/opt/ml/processing/input', 'rawdata.csv')
    
    print('Reading input data from {}'.format(input_data_path))
    df = pd.read_csv(input_data_path)
    df.sample(frac=1)
    
    COLS = df.columns
    newcolorder = ['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
    
    split_ratio = args.train_test_split_ratio
    random_state=args.random_split
    
    X_train, X_test, y_train, y_test = train_test_split(df.drop('Label', axis=1), df['Label'], 
                                                        test_size=split_ratio, random_state=random_state)
    
    preprocess = make_column_transformer(
        (['PAY_AMT1'], StandardScaler()),
        (['BILL_AMT1'], MinMaxScaler()),
    remainder='passthrough')
    
    print('Running preprocessing and feature engineering transformations')
    train_features = pd.DataFrame(preprocess.fit_transform(X_train), columns = newcolorder)
    test_features = pd.DataFrame(preprocess.transform(X_test), columns = newcolorder)
    
    # concat to ensure Label column is the first column in dataframe
    train_full = pd.concat([pd.DataFrame(y_train.values, columns=['Label']), train_features], axis=1)
    test_full = pd.concat([pd.DataFrame(y_test.values, columns=['Label']), test_features], axis=1)
    
    print('Train data shape after preprocessing: {}'.format(train_features.shape))
    print('Test data shape after preprocessing: {}'.format(test_features.shape))
    
    train_features_headers_output_path = os.path.join('/opt/ml/processing/train_headers', 'train_data_with_headers.csv')
    
    train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_data.csv')
    
    test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_data.csv')
    
    print('Saving training features to {}'.format(train_features_output_path))
    train_full.to_csv(train_features_output_path, header=False, index=False)
    print("Complete")
    
    print("Save training data with headers to {}".format(train_features_headers_output_path))
    train_full.to_csv(train_features_headers_output_path, index=False)
                 
    print('Saving test features to {}'.format(test_features_output_path))
    test_full.to_csv(test_features_output_path, header=False, index=False)
    print("Complete")

c.使用以下代码将预处理代码复制到 Amazon S3 存储桶,然后选择 Run(运行)。

# Copy the preprocessing code over to the s3 bucket
codeprefix = prefix + '/code'
codeupload = sess.upload_data('preprocessing.py', bucket=rawbucket, key_prefix=codeprefix)
print(codeupload)

d.指定当 SageMaker Processing 作业完成后您要将训练和测试数据存储在哪里。Amazon SageMaker Processing 自动将数据存储在指定的位置。

train_data_location = rawbucket + '/' + traindataprefix
test_data_location = rawbucket+'/'+testdataprefix
print("Training data location = {}".format(train_data_location))
print("Test data location = {}".format(test_data_location))


e.复制并粘贴以下代码以启动处理作业。此代码通过调用 sklearn_processor.run 并提取关于该处理作业的一些可选元数据(例如,存储训练和测试输出的位置)来启动作业。

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code=codeupload,
                      inputs=[ProcessingInput(
                        source=raw_data_location,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                source='/opt/ml/processing/train',
                               destination='s3://' + train_data_location),
                               ProcessingOutput(output_name='test_data',
                                                source='/opt/ml/processing/test',
                                               destination="s3://"+test_data_location),
                               ProcessingOutput(output_name='train_data_headers',
                                                source='/opt/ml/processing/train_headers',
                                               destination="s3://" + rawbucket + '/' + prefix + '/train_headers')],
                      arguments=['--train-test-split-ratio', '0.2']
                     )

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'train_data':
        preprocessed_training_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'test_data':
        preprocessed_test_data = output['S3Output']['S3Uri']


记下在输出中提供给处理器的代码、训练和测试数据的位置。另外,记下提供给处理脚本的参数。 

第 5 步:创建 Amazon SageMaker 实验

现在,您已经在 Amazon S3 中下载并暂存了数据集,您可以开始创建 Amazon SageMaker 实验。实验是与同一机器学习项目相关的处理和训练作业的集合。Amazon SageMaker Experiments 自动为您管理和跟踪您的训练运行。

完成以下步骤以创建新的实验。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的实验


a.复制并粘贴以下代码,以创建名为 Build-train-deploy- 的实验。

# Create a SageMaker Experiment
cc_experiment = Experiment.create(
    experiment_name=f"Build-train-deploy-{int(time.time())}", 
    description="Predict credit card default from payments data", 
    sagemaker_boto_client=sm)
print(cc_experiment)

每个训练作业记录为一个试用。每个试用都是端对端训练作业的一个迭代。除了训练作业之外,它还可跟踪预处理和后处理作业以及数据集和其他元数据。单个实验可包括多个试用,因此您可以轻松地在 Amazon SageMaker Studio 实验窗格内跟踪多个迭代随时间的变化。


b.复制并粘贴以下代码,以跟踪实验下的预处理作业以及训练管道中的步骤。

# Start Tracking parameters used in the Pre-processing pipeline.
with Tracker.create(display_name="Preprocessing", sagemaker_boto_client=sm) as tracker:
    tracker.log_parameters({
        "train_test_split_ratio": 0.2,
        "random_state":0
    })
    # we can log the s3 uri to the dataset we just uploaded
    tracker.log_input(name="ccdefault-raw-dataset", media_type="s3/uri", value=raw_data_location)
    tracker.log_input(name="ccdefault-train-dataset", media_type="s3/uri", value=train_data_location)
    tracker.log_input(name="ccdefault-test-dataset", media_type="s3/uri", value=test_data_location)

c.查看实验的详细信息:在 Experiments(实验)窗格中,右键单击名为 Build-train-deploy- 的实验并选择 Open in trial components list(在试用组件列表中打开)。


d.复制并粘贴以下代码,然后选择 Run(运行)。然后,仔细了解一下代码:

要训练 XGBoost 分类器,您应先导入 Amazon SageMaker 维护的 XGBoost 容器。然后,您将训练运行记录在 Trial(试用)下,以使 SageMaker Experiments 能够在 Trial(试用)名称下跟踪它。预处理作业包括在相同试用名称下,因为它是该管道的一部分。接下来,创建一个 SageMaker Estimator 对象,它可自动预置您选择的底层实例类型,将训练数据从处理作业复制到指定的输出位置,训练模型,并输出模型构件。

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '1.0-1')
s3_input_train = sagemaker.s3_input(s3_data='s3://' + train_data_location, content_type='csv')
preprocessing_trial_component = tracker.trial_component

trial_name = f"cc-default-training-job-{int(time.time())}"
cc_trial = Trial.create(
        trial_name=trial_name, 
            experiment_name=cc_experiment.experiment_name,
        sagemaker_boto_client=sm
    )

cc_trial.add_trial_component(preprocessing_trial_component)
cc_training_job_name = "cc-training-job-{}".format(int(time.time()))

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    train_max_run=86400,
                                    output_path='s3://{}/{}/models'.format(rawbucket, prefix),
                                    sagemaker_session=sess) # set to true for distributed training

xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        verbosity=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit(inputs = {'train':s3_input_train},
       job_name=cc_training_job_name,
        experiment_config={
            "TrialName": cc_trial.trial_name, #log training job in Trials for lineage
            "TrialComponentDisplayName": "Training",
        },
        wait=True,
    )
time.sleep(2)

完成训练作业将需要大约 70 秒。您应该能看到以下输出。

Completed - Training job completed

e.在左侧工具栏中,选择 Experiment(实验)。右键单击 Build-train-deploy- 实验,并选择 Open in trial components list(在试用组件列表中打开)。Amazon SageMaker Experiments 可捕获所有运行,包括任何失败的训练运行。


f.右键单击已完成的 Training jobs(训练作业)之一,并选择 Open in Trial Details(在试用详情中打开)以探索与训练作业相关的元数据。 

注:您可能需要刷新页面以查看最新结果。 

第 6 步:部署用于脱机推理的模型

在预处理步骤中,您已生成了一些测试数据。在这一步中,您从经过训练的模型生成脱机或批量推理,以在不可见测试数据上评估模型性能。

完成以下步骤以部署用于脱机推理的模型。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的批量转换


a.复制并粘贴以下代码,然后选择 Run(运行)。

这一步可将测试数据集从 Amazon S3 位置复制到本地文件夹中。 

test_data_path = 's3://' + test_data_location + '/test_data.csv'
! aws s3 cp $test_data_path .

b.复制并粘贴以下代码,然后选择 Run(运行)。

test_full = pd.read_csv('test_data.csv', names = [str(x) for x in range(len(data.columns))])
test_full.head()

c.复制并粘贴以下代码,然后选择 Run(运行)。这一步提取标签列。

label = test_full['0'] 

d.复制并粘贴以下代码,然后选择 Run(运行)以创建批量转换作业。然后,仔细了解一下代码:

与培训作业相似,SageMaker 预置所有基础资源、复制经过培训的模型构件、在本地设置批量终端节点、复制数据,然后对这些数据运行推理并将输出推送到 Amazon S3。请注意,通过设置 input_filter,您可让批量转换知道忽略测试数据中的第一列,即标签列。

%%time

sm_transformer = xgb.transformer(1, 'ml.m5.xlarge', accept = 'text/csv')

# start a transform job
sm_transformer.transform(test_data_path, split_type='Line', input_filter='$[1:]', content_type='text/csv')
sm_transformer.wait()

批量转换作业将需要大约 4 分钟时间来完成,然后您就可以评估模型结果。


e.复制并运行以下代码,以评估模型指标。然后,仔细了解一下代码:

首先,您定义一个函数,让它从 Amazon S3 存储桶提取包含在扩展名为 .out 的文件中的批量转换作业的输出。然后,您将预测的标签提取到 dataframe 中,并加 true 标签附加到此 dataframe。

import json
import io
from urllib.parse import urlparse

def get_csv_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')
output = get_csv_output_from_s3(sm_transformer.output_path, 'test_data.csv.out')
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)
output_df['Predicted']=np.round(output_df.values)
output_df['Label'] = label
from sklearn.metrics import confusion_matrix, accuracy_score
confusion_matrix = pd.crosstab(output_df['Predicted'], output_df['Label'], rownames=['Actual'], colnames=['Predicted'], margins = True)
confusion_matrix

您应看到与图中相似的输出,它显示了预测的 True 和 False 值与实际值。 


f.使用以下代码提取基准模型精度和模型精度。

注:有用的基准精度模型可能是非默认案例的一小部分。始终预测用户不使用默认值的模型将可达到该精度。

print("Baseline Accuracy = {}".format(1- np.unique(data['Label'], return_counts=True)[1][1]/(len(data['Label']))))
print("Accuracy Score = {}".format(accuracy_score(label, output_df['Predicted'])))

结果显示,简单模型可能已超过了基准精度。为了改善结果,您可以调整超参数。您可以在 SageMaker 上使用超参数优化 (HPO) 以进行自动模型调优。要了解更多信息,请参阅超参数调优的工作原理。 

注:虽然未包含在本教程中,您仍可以选择包含批量转换作为您的试用中的组成部分。当您调用 .transform 函数时,只需像您在培训作业中那样传递到 experiment_config 中即可。Amazon SageMaker 自动关联批量转换作为试用组件。

第 7 步:部署模型作为终端节点并设置数据捕获

在这一步中,您可以部署模型作为 RESTful HTTPS 终端节点,以充当实时推理。Amazon SageMaker 自动处理模型托管以及为您创建终端节点的工作。

完成以下步骤,以部署模型作为终端节点并设置数据捕获。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的部署用于推理的模型


a.复制并粘贴以下代码,然后选择 Run(运行)。

from sagemaker.model_monitor import DataCaptureConfig
from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer

sm_client = boto3.client('sagemaker')

latest_training_job = sm_client.list_training_jobs(MaxResults=1,
                                                SortBy='CreationTime',
                                                SortOrder='Descending')

training_job_name=TrainingJobName=latest_training_job['TrainingJobSummaries'][0]['TrainingJobName']

training_job_description = sm_client.describe_training_job(TrainingJobName=training_job_name)

model_data = training_job_description['ModelArtifacts']['S3ModelArtifacts']
container_uri = training_job_description['AlgorithmSpecification']['TrainingImage']

# create a model.
def create_model(role, model_name, container_uri, model_data):
    return sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer={
        'Image': container_uri,
        'ModelDataUrl': model_data,
        },
        ExecutionRoleArn=role)
    

try:
    model = create_model(role, training_job_name, container_uri, model_data)
except Exception as e:
        sm_client.delete_model(ModelName=training_job_name)
        model = create_model(role, training_job_name, container_uri, model_data)
        

print('Model created: '+model['ModelArn'])


b.要指定数据配置设置,复制并粘贴以下代码,然后选择 Run(运行)。

此代码告诉 SageMaker 捕获终端节点收到的 100% 的推理负载,捕获输入和输出,并将输入内容类型注释为 csv。

s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(rawbucket, prefix)
data_capture_configuration = {
    "EnableCapture": True,
    "InitialSamplingPercentage": 100,
    "DestinationS3Uri": s3_capture_upload_path,
    "CaptureOptions": [
        { "CaptureMode": "Output" },
        { "CaptureMode": "Input" }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"],
       "JsonContentTypes": ["application/json"]}

c.复制并粘贴以下代码,然后选择 Run(运行)。这一步创建终端节点配置,并部署该终端节点。在代码中,您可以指定实例类型以及您是否要将所有流量发送到此终端节点,等等。

def create_endpoint_config(model_config, data_capture_config): 
    return sm_client.create_endpoint_config(
                                                EndpointConfigName=model_config,
                                                ProductionVariants=[
                                                        {
                                                            'VariantName': 'AllTraffic',
                                                            'ModelName': model_config,
                                                            'InitialInstanceCount': 1,
                                                            'InstanceType': 'ml.m4.xlarge',
                                                            'InitialVariantWeight': 1.0,
                                                },
                                                    
                                                    ],
                                                DataCaptureConfig=data_capture_config
                                                )




try:
    endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)
except Exception as e:
    sm_client.delete_endpoint_config(EndpointConfigName=endpoint)
    endpoint_config = create_endpoint_config(training_job_name, data_capture_configuration)

print('Endpoint configuration created: '+ endpoint_config['EndpointConfigArn'])

d.复制并粘贴以下代码,然后选择 Run(运行)以创建终端节点。 

# Enable data capture, sampling 100% of the data for now. Next we deploy the endpoint in the correct VPC.

endpoint_name = training_job_name
def create_endpoint(endpoint_name, config_name):
    return sm_client.create_endpoint(
                                    EndpointName=endpoint_name,
                                    EndpointConfigName=training_job_name
                                )


try:
    endpoint = create_endpoint(endpoint_name, endpoint_config)
except Exception as e:
    sm_client.delete_endpoint(EndpointName=endpoint_name)
    endpoint = create_endpoint(endpoint_name, endpoint_config)

print('Endpoint created: '+ endpoint['EndpointArn'])

e.在左侧工具栏中,选择 Endpoints(终端节点)。Endpoints(终端节点)列表显示服务中的所有终端节点。

请注意,build-train-deploy 终端节点显示 Creating(正在创建)状态。要部署模型,Amazon SageMaker 必须先将模型构件和推理图像复制到实例上,并设置 HTTPS 终端节点以使用客户端应用程序或 RESTful API 进行推理。

创建了终端节点之后,状态就会变成 InService(可用)。(请注意,创建部署可能需要大约 5-10 分钟。)  

注:您可能需要单击 Refresh(刷新)以获得更新后的状态。


f.在 JupyterLab 笔记本中,复制并运行以下代码以获取测试数据集的样本。此代码将获取前 10 行。

!head -10 test_data.csv > test_sample.csv

g.运行以下代码以向此终端节点发送一些推理请求。

注:如果您指定了不同的终端节点名称,您需要将下面的 endpoint 替换为您的终端节点名称。 

from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer

predictor = RealTimePredictor(endpoint=endpoint_name, content_type = 'text/csv')

with open('test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = predictor.predict(data=payload[2:])
        sleep(0.5)
print('done!')

h.运行以下代码以验证模型监控器正在正确地获取入站数据。

在代码中,current_endpoint_capture_prefix 捕获存储您的模型监控器的目录路径。导航到 Amazon S3 存储桶,以查看是否正在捕获预测请求。请注意,此位置应与上述代码中的 s3_capture_upload_path 一致。

# Extract the captured json files.
data_capture_prefix = '{}/monitoring'.format(prefix)
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/datacapture/{}/AllTraffic'.format(data_capture_prefix, endpoint_name)
print(current_endpoint_capture_prefix)
result = s3_client.list_objects(Bucket=rawbucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))


capture_files[0]

捕获的输出指示数据捕获已配置,并且正在保存入站请求。 

注:如果您最初看到 Null 响应,则当您首次初始化数据捕获时数据可能未同步加载到 Amazon S3 路径。等待大约一分钟,然后重试。


i.运行以下代码以提取 json 文件之一的内容,并查看捕获的输出。 

# View contents of the captured file.
def get_obj_body(bucket, obj_key):
    return s3_client.get_object(Bucket=rawbucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(rawbucket, capture_files[0])
print(json.dumps(json.loads(capture_file.split('\n')[5]), indent = 2, sort_keys =True))

输出指示数据捕获正在捕获输入负载和模型的输出。 

第 8 步:利用 SageMaker 模型监控器监控终端节点

在这一步中,您可以启用 SageMaker 模型监控器以监控部署的终端节点有无数据漂移。为此,您应比较发送到模型的负载和输出与基准,并确定输入数据或标签是否存在任何漂移。 

完成以下步骤以启用模型监控。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的 Amazon SageMaker 模型监控器


a.运行以下代码,以在 Amazon S3 存储桶中创建一个用于存储模型监控器输出的文件夹。

此代码可创建两个文件夹:一个文件夹存储您用于训练模型的基准数据;第二个文件夹存储与该基准的任何偏离。

model_prefix = prefix + "/" + endpoint_name
baseline_prefix = model_prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(rawbucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(rawbucket, baseline_results_prefix)
train_data_header_location = "s3://" + rawbucket + '/' + prefix + '/train_headers'
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))
print(train_data_header_location)


b.运行以下代码,以将模型监控器的基准作业设置为捕获训练数据的统计信息。为此,模型监控器使用在 Apache Spark 上构建的 deequ 库对数据开展单元测试。 

from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600)

my_default_monitor.suggest_baseline(
    baseline_dataset=os.path.join(train_data_header_location, 'train_data_with_headers.csv'),
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

模型监控器设置一个独立的实例,复制训练数据,并生成一些统计信息。该服务会生成大量的 Apache Spark 日志,您可以忽略。当作业完成后,您将会看到 Spark job completed(Spark 作业已完成)输出。


c.运行以下代码以查看基准作业生成的输出。

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df

您将会看到两个文件:constraints.jsonstatistics.json。接下来,更深入地探究其内容:

上面的代码可将 /statistics.json 中的 json 输出转换成 pandas dataframe。注意 deequ 库如何推理列的数据类型、是否存在 Null 或缺失值,以及均值、最小值、最大值、总和、标准差之类的统计参数和输入数据流的 sketch 参数。 

同样,constraints.json 文件由训练数据集所遵守的若干限制构成,例如非负值和功能字段的数据类型。

您将会看到两个文件:constraints.jsonstatistics.json。接下来,更深入地探究其内容:

上面的代码可将 /statistics.json 中的 json 输出转换成 pandas dataframe。注意 deequ 库如何推理列的数据类型、是否存在 Null 或缺失值,以及均值、最小值、最大值、总和、标准差之类的统计参数和输入数据流的 sketch 参数。   

同样,constraints.json 文件由训练数据集所遵守的若干限制构成,例如非负值和功能字段的数据类型。

constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df

d.运行以下代码以设置终端节点监控的频率。

您可以指定每天或每小时。此代码指定每小时频率,但是您可能想为生产应用更改此设置,因为每小时频率将会产生大量数据。模型监控器将会生成包含其找到所有违规事件的报告。

reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(rawbucket,reports_prefix)
print(s3_report_path)

from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'Built-train-deploy-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,

)

请注意,此代码启用 Amazon CloudWatch 指标,它指示模型监控器将输出发送到 CloudWatch。您可以通过此方法利用 CloudWatch 警报触发警报,以使工程师或管理员知道何时检测到数据漂移。 

第 9 步:测试 SageMaker 模型监控器性能

在这一步中,您对照一些样本数据评估模型监控器。您应修改测试负载中的多种功能的分布而不原样发送测试负载,以测试模型监控器是否能够检测到变化。

完成以下步骤以测试模型监控器性能。

注:有关更多信息,请参阅 Amazon SageMaker 文档中的 Amazon SageMaker 模型监控器


a.运行以下代码以导入测试数据,并生成一些修改后的样本数据。

COLS = data.columns
test_full = pd.read_csv('test_data.csv', names = ['Label'] +['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:]
)
test_full.head()

b.运行以下代码以更改少数行。请注意此处图像中标记为红色的部分与上一步中的差异。删除标签列,并保存修改后的样本测试数据。

faketestdata = test_full
faketestdata['EDUCATION'] = -faketestdata['EDUCATION'].astype(float)
faketestdata['BILL_AMT2']= (faketestdata['BILL_AMT2']//10).astype(float)
faketestdata['AGE']= (faketestdata['AGE']-10).astype(float)

faketestdata.head()
faketestdata.drop(columns=['Label']).to_csv('test-data-input-cols.csv', index = None, header=None)

c.运行以下代码以使用此修改后的数据集反复调用终端节点。

from threading import Thread

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        for row in f:
            payload = row.rstrip('\n')
            response = runtime_client.invoke_endpoint(EndpointName=ep_name,
                                          ContentType='text/csv', 
                                          Body=payload)
            time.sleep(1)
            
def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint, 'test-data-input-cols.csv', runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()
# Note that you need to stop the kernel to stop the invocations

d.运行以下代码以检查模型监控器作业的状态。

desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

您应看到输出为 Schedule status: Scheduled(计划状态:已计划)


e.运行以下代码,每 10 分钟检查一次是否生成了任何监控输出。请注意,第一个作业运行时的缓冲大约为 20 分钟。 

mon_executions = my_default_monitor.list_executions()
print("We created ahourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(600)
    mon_executions = my_default_monitor.list_executions()

f.在 Amazon SageMaker Studio 的左侧工具栏中,选择 Endpoints(终端节点)。右键单击 build-train-deploy 终端节点,然后选择 Describe Endpoint(描述终端节点)。


g.选择 Monitoring job history(监控作业历史)。请注意,Monitoring status(监控状态)显示 In progress(正在进行)。

当作业完成后,Monitoring status(监控状态)显示Issue found(找到问题)(对于找到任何问题的情况)。 


h.双击问题以查看详细信息。在您以前修改过的 EDUCATIONBILL_AMT2 字段中,您可以看到模型监控器检测到很大的基线漂移。

模型监控器还在两个其他字段中检测到数据类型差异。训练数据由整数标签构成,但是 XGBoost 模型预测概率分值。因此,模型监控器会报告不一致。 


i.在您的 JupyterLab 笔记本中,运行以下单元格以查看来自模型监控器的输出。

latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

j.运行以下代码,以查看模型监控器生成的报告。

report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=rawbucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

除了 statistics.jsonconstraints.json 之外,您还可以看到生成了名为 constraint_violations.json 的新文件。此文件的内容在 Amazon SageMaker Studio 中显示在上方(步骤 g)。


注:当您设置数据捕获时,Amazon SageMaker Studio 自动为您创建一个笔记本,它包含上述代码以运行监控作业。要访问该笔记本,右键单击终端节点,然后选择 Describe Endpoint(描述终端节点)。在 Monitoring results(监控结果)选项卡上,选择 Enable Monitoring(启用监控)。这一步自动打开一个 Jupyter 笔记本,其中包含您在上面授权的代码。

第 10 步:清除

在这一步中,您将终止在本实验中使用的资源。

重要说明:终止当前未在使用的资源可降低成本,是最佳做法。不终止资源可能会在您的账户下产生费用。


a.删除监控计划:在您的 Jupyter 笔记本中,复制并粘贴以下代码,然后选择 Run(运行)。

注:与模型监控器终端节点关联的所有监控作业都已删除后,您才能删除该终端节点。

my_default_monitor.delete_monitoring_schedule()
time.sleep(10) # actually wait for the deletion

b.删除您的终端节点:在您的 Jupyter 笔记本中,复制并粘贴以下代码,然后选择 Run(运行)。

注:确保先删除与该终端节点关联的所有监控作业。

sm.delete_endpoint(EndpointName = endpoint_name)

如果您想要清除所有训练构件(模型、预处理数据集等等),请将以下代码复制并粘贴到您的代码单元格中,然后选择 Run(运行)。

注:确保用您自己的账号替换 ACCOUNT-NUMBER。

%%sh
aws s3 rm --recursive s3://sagemaker-us-east-2-ACCOUNT_NUMBER/sagemaker-modelmonitor/data

恭喜

您已使用 Amazon SageMaker Studio 创建、训练、部署和监控机器学习模型。

本教程对您是否有帮助?

了解 Amazon SageMaker Studio

了解如何使用 Amazon SageMaker Autopilot 自动构建、训练和部署 ML 模型