Automatizzazione dei flussi di lavoro di machine learning

TUTORIAL

Panoramica

Con questo tutorial scoprirai come creare ed automatizzare i flussi di lavoro di machine learning (ML) end-to-end utilizzando Pipeline Amazon SageMaker, Amazon SageMaker Model Registry e Amazon SageMaker Clarify.

SageMaker Pipelines è il primo servizio di integrazione continua/distribuzione continua (CI/CD) dedicato per il ML. Con SageMaker Pipelines potrai automatizzare le diverse fasi del flusso di lavoro di ML, inclusi il caricamento e la trasformazione dei dati, l'addestramento, l'ottimizzazione, la valutazione e l'implementazione. SageMaker Model Registry consente di tenere traccia delle versioni dei modelli, dei relativi metadati come il raggruppamento dei casi d'uso e le baseline dei parametri delle prestazioni dei modelli in un repository centrale da cui poter facilmente scegliere il modello più adatto da implementare in base ai requisiti aziendali. SageMaker Clarify fornisce una maggiore visibilità sui dati e i modelli di addestramento in modo da poter identificare e limitare gli errori e spiegare le previsioni.

In questo tutorial, implementerai una pipeline SageMaker per creare, addestrare e distribuire un modello di classificazione binaria XGBoost che prevede la probabilità che una richiesta di indennizzo di un'assicurazione auto sia fraudolenta. Utilizzerai un set di dati sui sinistri di assicurazione auto generato artificialmente. Gli input non elaborati sono due tabelle di dati assicurativi: una tabella dei sinistri e una tabella dei clienti. La tabella dei sinistri contiene una colonna denominata frode che indica se un incidente è fraudolento o meno. La pipeline elaborerà i dati non elaborati, creerà set di dati di addestramento, convalida e test e costruirà e valuterà un modello di classificazione binaria. Utilizzerà quindi SageMaker Clarify per testare la distorsione e la spiegabilità del modello e, infine, distribuirà il modello per l'inferenza.

Obiettivi

Con questa guida, imparerai a:

  • Creare ed eseguire una pipeline SageMaker per automatizzare il ciclo di vita del ML end-to-end
  • Generare previsioni utilizzando il modello distribuito

Prerequisiti

Prima di iniziare questa guida, avrai bisogno di:

 Esperienza AWS

Intermedio

 Tempo per il completamento

120 minuti

 Costo richiesto per il completamento

Per una stima dei costi di questo tutorial, consulta Prezzi di SageMaker.

 Requisiti

Devi aver effettuato l'accesso con un account AWS.

 Servizi utilizzati

Amazon SageMaker Studio, Pipeline Amazon SageMaker, Amazon SageMaker Clarify, Amazon SageMaker Model Registry

 Ultimo aggiornamento

24 giugno 2022

Implementazione

Fase 1: Configurazione del dominio di Amazon SageMaker Studio

Un account AWS può avere solo un dominio SageMaker Studio per regione. Se hai già un dominio SageMaker Studio nella regione Stati Uniti orientali (Virginia settentrionale), consulta la Guida alla configurazione di SageMaker Studio per i flussi di lavoro di ML per collegare le policy AWS IAM richieste all'account SageMaker Studio, quindi salta la fase 1 e passa direttamente alla fase 2. 

Se invece non disponi di un dominio SageMaker Studio, continua con la fase 1 per eseguire un modello AWS CloudFormation che crea un dominio SageMaker Studio e aggiunge le autorizzazioni necessarie per il resto di questo tutorial.

Scegli il link dello stack AWS CloudFormation. Questo link apre la console AWS CloudFormation e crea il dominio SageMaker e un utente denominato studio-user. Inoltre aggiunge le autorizzazioni richieste al tuo account SageMaker Studio. Nella console CloudFormation, conferma che Stati Uniti orientali (Virginia settentrionale) sia la regione visualizzata nell'angolo in alto a destra. Il nome dello stack deve essere CFN-SM-IM-Lambda-catalog e non può essere modificato. Questo stack richiede circa 10 minuti per creare tutte le risorse.

Assume inoltre che sia presente un VPC pubblico configurato nell'account. Se non hai un VPC pubblico, consulta VPC con una singola sottorete pubblica per scoprire come creare un VPC pubblico. 

Seleziona I acknowledge that AWS CloudFormation might create IAM resources (Sono consapevole che AWS CloudFormation può creare le risorse IAM), quindi scegli Create stack (Crea stack).

Nel riquadro CloudFormation, scegli Stack. La creazione dello stack richiede circa 10 minuti. Una volta creato lo stack, il suo stato passerà da CREATE_IN_PROGRESS a CREATE_COMPLETE

Fase 2: Configurazione di un notebook SageMaker Studio e parametrizzazione della pipeline

In questa fase puoi avviare un nuovo SageMaker Studio e configurare le variabili SageMaker richieste per interagire con Amazon Simple Storage Service (Amazon S3).

Digita SageMaker Studio nella barra di ricerca della console AWS, quindi scegli SageMaker Studio. Scegli US East (N. Virginia) (Stati Uniti orientali [Virginia settentrionale]) dall'elenco a discesa Region (Regione) nell'angolo in alto a destra della console.

In Launch app (Avvia app), seleziona Studio per aprire SageMaker Studio utilizzando il profilo studio-user.

Sulla barra di navigazione di SageMaker Studio, scegli FileNew (Nuovo), Notebook

Nella finestra di dialogo Set up notebook environment (Configura ambiente notebook), in Image (Immagine), seleziona Data Science (Data science). Il kernel Python 3 viene selezionato automaticamente. Scegli Select (Seleziona). 

Il kernel nell'angolo in alto a destra del notebook dovrebbe visualizzare Python 3 (Data Science) (Python 3 [data science]).

Per importare le librerie richieste, copia e incolla il seguente codice in una cella del notebook, quindi esegui la cella.

import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker


from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

Copia e incolla il seguente blocco di codice in una cella ed esegui per configurare gli oggetti client SageMaker e S3 tramite SageMaker e gli SDK AWS. Questi oggetti sono necessari per consentire a SageMaker di eseguire varie operazioni come l'implementazione e il richiamo di endpoint e interagire con Amazon S3 e AWS Lambda. Il codice imposta anche le posizioni del bucket S3 in cui vengono archiviati i set di dati non elaborati ed elaborati e gli artefatti del modello. Nota che i bucket di lettura e scrittura sono separati. Il bucket di lettura è il bucket S3 pubblico denominato sagemaker-sample-files e contiene i set di dati non elaborati. Il bucket di scrittura è il bucket S3 predefinito associato al tuo account denominato sagemaker-<regione>-<id-account> e sarà utilizzato successivamente in questo tutorial per memorizzare i set di dati elaborati e gli artefatti.

# Instantiate AWS services session and client objects
sess = sagemaker.Session()
write_bucket = sess.default_bucket()
write_prefix = "fraud-detect-demo"

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")

# Fetch SageMaker execution role
sagemaker_role = sagemaker.get_execution_role()


# S3 locations used for parameterizing the notebook run
read_bucket = "sagemaker-sample-files"
read_prefix = "datasets/tabular/synthetic_automobile_claims" 

# S3 location where raw data to be fetched from
raw_data_key = f"s3://{read_bucket}/{read_prefix}"

# S3 location where processed data to be uploaded
processed_data_key = f"{write_prefix}/processed"

# S3 location where train data to be uploaded
train_data_key = f"{write_prefix}/train"

# S3 location where validation data to be uploaded
validation_data_key = f"{write_prefix}/validation"

# S3 location where test data to be uploaded
test_data_key = f"{write_prefix}/test"


# Full S3 paths
claims_data_uri = f"{raw_data_key}/claims.csv"
customers_data_uri = f"{raw_data_key}/customers.csv"
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"
clarify_bias_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/bias_config"
clarify_explainability_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/explainability_config"
bias_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/bias"
explainability_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/explainability"

# Retrieve training image
training_image = retrieve(framework="xgboost", region=region, version="1.3-1")

Copia e incolla il seguente codice per impostare i nomi per i vari componenti della pipeline SageMaker, come il modello e l'endpoint, e specificare i numeri e i tipi di istanze di addestramento e inferenza. Questi valori verranno utilizzati per parametrizzare la tua pipeline.

# Set names of pipeline objects
pipeline_name = "FraudDetectXGBPipeline"
pipeline_model_name = "fraud-detect-xgb-pipeline"
model_package_group_name = "fraud-detect-xgb-model-group"
base_job_name_prefix = "fraud-detect"
endpoint_config_name = f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

# Set data parameters
target_col = "fraud"

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
predictor_instance_count = 1
predictor_instance_type = "ml.m4.xlarge"
clarify_instance_count = 1
clarify_instance_type = "ml.m4.xlarge"

SageMaker Pipelines supporta la parametrizzazione, che consente di specificare i parametri di input al runtime senza modificare il codice della pipeline. Puoi utilizzare i moduli disponibili nel modulo sagemaker.workflow.parameters, come ParameterInteger, ParameterFloat, ParameterString e ParameterBoolean, per specificare i parametri della pipeline di diversi tipi di dati. Copia, incolla ed esegui il seguente codice per impostare più parametri di input, incluse le configurazioni di SageMaker Clarify.

# Set up pipeline input parameters

# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set deployment instance type
deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

# Set deployment instance count
deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount",
    default_value=predictor_instance_count
)

# Set Clarify check instance type
clarify_instance_type_param = ParameterString(
    name="ClarifyInstanceType",
    default_value=clarify_instance_type,
)

# Set model bias check params
skip_check_model_bias_param = ParameterBoolean(
    name="SkipModelBiasCheck", 
    default_value=False
)

register_new_baseline_model_bias_param = ParameterBoolean(
    name="RegisterNewModelBiasBaseline",
    default_value=False
)

supplied_baseline_constraints_model_bias_param = ParameterString(
    name="ModelBiasSuppliedBaselineConstraints", 
    default_value=""
)

# Set model explainability check params
skip_check_model_explainability_param = ParameterBoolean(
    name="SkipModelExplainabilityCheck", 
    default_value=False
)

register_new_baseline_model_explainability_param = ParameterBoolean(
    name="RegisterNewModelExplainabilityBaseline",
    default_value=False
)

supplied_baseline_constraints_model_explainability_param = ParameterString(
    name="ModelExplainabilitySuppliedBaselineConstraints", 
    default_value=""
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

Fase 3: Costruzione dei componenti della pipeline

Una pipeline è una sequenza di operazioni che possono essere costruiti individualmente e quindi messi insieme per formare un flusso di lavoro di ML. Il diagramma seguente mostra le fasi di livello superiore di una pipeline.

In questo tutorial, la pipeline viene creata con le seguenti operazioni:

  1. Fase di elaborazione dati: esegue un processo di elaborazione SageMaker utilizzando i dati non elaborati di input in S3 e restituisce l'addestramento, la convalida e le suddivisioni di test in S3.
  2. Fase di addestramento: addestra un modello XGBoost tramite processi di addestramento SageMaker con dati di addestramento e convalida in S3 come input, quindi archivia l'artefatto del modello addestrato in S3.
  3. Fase di valutazione: valuta il modello sul set di dati di test eseguendo un processo di elaborazione SageMaker tramite i dati di test e l'artefatto del modello in S3 come input, quindi archivia il report di valutazione delle prestazioni del modello di output in S3.
  4. Fase condizionale: confronta le prestazioni del modello sul set di dati di test con il valore soglia. Esegue una fase predefinita di SageMaker Pipelines utilizzando il report di valutazione delle prestazioni del modello in S3 come input, quindi archivia l'elenco di output delle fasi della pipeline che verranno eseguiti se le prestazioni del modello sono accettabili.
  5. Fase di creazione del modello: esegue una fase predefinita di SageMaker Pipelines utilizzando l'artefatto del modello in S3 come input e archivia il modello SageMaker di output in S3.
  6. Fase di controllo errori: controlla gli errori del modello tramite SageMaker Clarify con i dati di addestramento e l'artefatto del modello in S3 come input e archivia il report degli errori del modello e i parametri di base in S3.
  7. Fase di spiegabilità del modello: esegue SageMaker Clarify con i dati di addestramento e l'artefatto del modello in S3 come input, quindi archivia il report di spiegabilità del modello e i parametri di base in S3.
  8. Fase di registrazione: esegue un passaggio predefinito di SageMaker Pipelines utilizzando i parametri di base di modello, errori e spiegabilità come input per registrare il modello in SageMaker Model Registry.
  9. Fase di implementazione: esegue un passaggio predefinito di SageMaker Pipelines utilizzando una funzione del gestore AWS Lambda, il modello e la configurazione dell'endpoint come input per implementare il modello in un endpoint di inferenza in tempo reale di SageMaker.

SageMaker Pipelines fornisce numerosi tipi di passaggi predefiniti, come ad esempio le operazioni per l'elaborazione dei dati, l'addestramento e l'ottimizzazione del modello e la trasformazione batch. Per ulteriori informazioni, consulta Fasi della pipeline nella guida per gli sviluppatori di Amazon SageMaker. Nella procedura seguente viene configurata e definita ogni singola fase della pipeline e poi viene definita la pipeline stessa combinando le fasi con i parametri di input.

Fase di elaborazione dei dati: in questa fase viene preparato uno script Python per acquisire i dati non elaborati, viene eseguita l'elaborazione come l'imputazione dei valori mancanti e l'ingegneria delle funzionalità e viene completato l'addestramento, la convalida e le suddivisioni dei test da utilizzare per la creazione di modelli. Copia, incolla ed esegui il codice riportato per creare lo script di elaborazione.

%%writefile preprocessing.py

import argparse
import pathlib
import boto3
import os
import pandas as pd
import logging
from sklearn.model_selection import train_test_split

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-ratio", type=float, default=0.8)
    parser.add_argument("--validation-ratio", type=float, default=0.1)
    parser.add_argument("--test-ratio", type=float, default=0.1)
    args, _ = parser.parse_known_args()
    logger.info("Received arguments {}".format(args))
    
    # Set local path prefix in the processing container
    local_dir = "/opt/ml/processing"    
    
    input_data_path_claims = os.path.join("/opt/ml/processing/claims", "claims.csv")
    input_data_path_customers = os.path.join("/opt/ml/processing/customers", "customers.csv")
    
    logger.info("Reading claims data from {}".format(input_data_path_claims))
    df_claims = pd.read_csv(input_data_path_claims)
    
    logger.info("Reading customers data from {}".format(input_data_path_customers))
    df_customers = pd.read_csv(input_data_path_customers)
    
    logger.debug("Formatting column names.")
    # Format column names
    df_claims = df_claims.rename({c : c.lower().strip().replace(' ', '_') for c in df_claims.columns}, axis = 1)
    df_customers = df_customers.rename({c : c.lower().strip().replace(' ', '_') for c in df_customers.columns}, axis = 1)
    
    logger.debug("Joining datasets.")
    # Join datasets
    df_data = df_claims.merge(df_customers, on = 'policy_id', how = 'left')

    # Drop selected columns not required for model building
    df_data = df_data.drop(['customer_zip'], axis = 1)
    
    # Select Ordinal columns
    ordinal_cols = ["police_report_available", "policy_liability", "customer_education"]

    # Select categorical columns and filling with na
    cat_cols_all = list(df_data.select_dtypes('object').columns)
    cat_cols = [c for c in cat_cols_all if c not in ordinal_cols]
    df_data[cat_cols] = df_data[cat_cols].fillna('na')
    
    logger.debug("One-hot encoding categorical columns.")
    # One-hot encoding categorical columns
    df_data = pd.get_dummies(df_data, columns = cat_cols)
    
    logger.debug("Encoding ordinal columns.")
    # Ordinal encoding
    mapping = {
               "Yes": "1",
               "No": "0" 
              }
    df_data['police_report_available'] = df_data['police_report_available'].map(mapping)
    df_data['police_report_available'] = df_data['police_report_available'].astype(float)

    mapping = {
               "15/30": "0",
               "25/50": "1", 
               "30/60": "2",
               "100/200": "3"
              }
    
    df_data['policy_liability'] = df_data['policy_liability'].map(mapping)
    df_data['policy_liability'] = df_data['policy_liability'].astype(float)

    mapping = {
               "Below High School": "0",
               "High School": "1", 
               "Associate": "2",
               "Bachelor": "3",
               "Advanced Degree": "4"
              }
    
    df_data['customer_education'] = df_data['customer_education'].map(mapping)
    df_data['customer_education'] = df_data['customer_education'].astype(float)
    
    df_processed = df_data.copy()
    df_processed.columns = [c.lower() for c in df_data.columns]
    df_processed = df_processed.drop(["policy_id", "customer_gender_unkown"], axis=1)
    
    # Split into train, validation, and test sets
    train_ratio = args.train_ratio
    val_ratio = args.validation_ratio
    test_ratio = args.test_ratio
    
    logger.debug("Splitting data into train, validation, and test sets")
    
    y = df_processed['fraud']
    X = df_processed.drop(['fraud'], axis = 1)
    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=test_ratio, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=val_ratio, random_state=42)

    train_df = pd.concat([y_train, X_train], axis = 1)
    val_df = pd.concat([y_val, X_val], axis = 1)
    test_df = pd.concat([y_test, X_test], axis = 1)
    dataset_df = pd.concat([y, X], axis = 1)
    
    logger.info("Train data shape after preprocessing: {}".format(train_df.shape))
    logger.info("Validation data shape after preprocessing: {}".format(val_df.shape))
    logger.info("Test data shape after preprocessing: {}".format(test_df.shape))
    
    # Save processed datasets to the local paths in the processing container.
    # SageMaker will upload the contents of these paths to S3 bucket
    logger.debug("Writing processed datasets to container local path.")
    train_output_path = os.path.join(f"{local_dir}/train", "train.csv")
    validation_output_path = os.path.join(f"{local_dir}/val", "validation.csv")
    test_output_path = os.path.join(f"{local_dir}/test", "test.csv")
    full_processed_output_path = os.path.join(f"{local_dir}/full", "dataset.csv")

    logger.info("Saving train data to {}".format(train_output_path))
    train_df.to_csv(train_output_path, index=False)
    
    logger.info("Saving validation data to {}".format(validation_output_path))
    val_df.to_csv(validation_output_path, index=False)

    logger.info("Saving test data to {}".format(test_output_path))
    test_df.to_csv(test_output_path, index=False)
    
    logger.info("Saving full processed data to {}".format(full_processed_output_path))
    dataset_df.to_csv(full_processed_output_path, index=False)

Quindi, copia, incolla ed esegui il blocco di codice riportato per avviare l'istanza del processore e la fase di SageMaker Pipelines per eseguire lo script di elaborazione. Poiché lo script di elaborazione è scritto in Pandas, viene utilizzato SKLearnProcessor. La funzione ProcessingStep di SageMaker Pipelines utilizza i seguenti argomenti: il processore, le posizioni S3 di input per i set di dati non elaborati e le posizioni S3 di output per salvare i set di dati elaborati. Argomenti aggiuntivi come addestramento, convalida e rapporti di divisione dei test vengono forniti tramite l'argomento job_arguments.

from sagemaker.workflow.pipeline_context import PipelineSession
# Upload processing script to S3
s3_client.upload_file(
    Filename="preprocessing.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/preprocessing.py"
)

# Define the SKLearnProcessor configuration
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_count=1,
    instance_type=process_instance_type,
    base_job_name=f"{base_job_name_prefix}-processing",
)

# Define pipeline processing step
process_step = ProcessingStep(
    name="DataProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=claims_data_uri, destination="/opt/ml/processing/claims"),
        ProcessingInput(source=customers_data_uri, destination="/opt/ml/processing/customers")
    ],
    outputs=[
        ProcessingOutput(destination=f"{processing_output_uri}/train_data", output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(destination=f"{processing_output_uri}/validation_data", output_name="validation_data", source="/opt/ml/processing/val"),
        ProcessingOutput(destination=f"{processing_output_uri}/test_data", output_name="test_data", source="/opt/ml/processing/test"),
        ProcessingOutput(destination=f"{processing_output_uri}/processed_data", output_name="processed_data", source="/opt/ml/processing/full")
    ],
    job_arguments=[
        "--train-ratio", "0.8", 
        "--validation-ratio", "0.1",
        "--test-ratio", "0.1"
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/preprocessing.py"
)

Copia, incolla ed esegui il blocco di codice riportato per preparare lo script di addestramento. Questo script incapsula la logica di addestramento per il classificatore binario XGBoost. Gli iperparametri usati nell'addestramento del modello vengono forniti più avanti nel tutorial attraverso la definizione della fase di addestramento.

%%writefile xgboost_train.py

import argparse
import os
import joblib
import json
import pandas as pd
import xgboost as xgb
from sklearn.metrics import roc_auc_score

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # Hyperparameters and algorithm parameters are described here
    parser.add_argument("--num_round", type=int, default=100)
    parser.add_argument("--max_depth", type=int, default=3)
    parser.add_argument("--eta", type=float, default=0.2)
    parser.add_argument("--subsample", type=float, default=0.9)
    parser.add_argument("--colsample_bytree", type=float, default=0.8)
    parser.add_argument("--objective", type=str, default="binary:logistic")
    parser.add_argument("--eval_metric", type=str, default="auc")
    parser.add_argument("--nfold", type=int, default=3)
    parser.add_argument("--early_stopping_rounds", type=int, default=3)
    

    # SageMaker specific arguments. Defaults are set in the environment variables
    # Set location of input training data
    parser.add_argument("--train_data_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    # Set location of input validation data
    parser.add_argument("--validation_data_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    # Set location where trained model will be stored. Default set by SageMaker, /opt/ml/model
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    # Set location where model artifacts will be stored. Default set by SageMaker, /opt/ml/output/data
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    
    args = parser.parse_args()

    data_train = pd.read_csv(f"{args.train_data_dir}/train.csv")
    train = data_train.drop("fraud", axis=1)
    label_train = pd.DataFrame(data_train["fraud"])
    dtrain = xgb.DMatrix(train, label=label_train)
    
    
    data_validation = pd.read_csv(f"{args.validation_data_dir}/validation.csv")
    validation = data_validation.drop("fraud", axis=1)
    label_validation = pd.DataFrame(data_validation["fraud"])
    dvalidation = xgb.DMatrix(validation, label=label_validation)
    
    # Choose XGBoost model hyperparameters
    params = {"max_depth": args.max_depth,
              "eta": args.eta,
              "objective": args.objective,
              "subsample" : args.subsample,
              "colsample_bytree":args.colsample_bytree
             }
    
    num_boost_round = args.num_round
    nfold = args.nfold
    early_stopping_rounds = args.early_stopping_rounds
    
    # Cross-validate train XGBoost model
    cv_results = xgb.cv(
        params=params,
        dtrain=dtrain,
        num_boost_round=num_boost_round,
        nfold=nfold,
        early_stopping_rounds=early_stopping_rounds,
        metrics=["auc"],
        seed=42,
    )
    
    model = xgb.train(params=params, dtrain=dtrain, num_boost_round=len(cv_results))
    
    train_pred = model.predict(dtrain)
    validation_pred = model.predict(dvalidation)
    
    train_auc = roc_auc_score(label_train, train_pred)
    validation_auc = roc_auc_score(label_validation, validation_pred)
    
    print(f"[0]#011train-auc:{train_auc:.2f}")
    print(f"[0]#011validation-auc:{validation_auc:.2f}")

    metrics_data = {"hyperparameters" : params,
                    "binary_classification_metrics": {"validation:auc": {"value": validation_auc},
                                                      "train:auc": {"value": train_auc}
                                                     }
                   }
              
    # Save the evaluation metrics to the location specified by output_data_dir
    metrics_location = args.output_data_dir + "/metrics.json"
    
    # Save the trained model to the location specified by model_dir
    model_location = args.model_dir + "/xgboost-model"

    with open(metrics_location, "w") as f:
        json.dump(metrics_data, f)

    with open(model_location, "wb") as f:
        joblib.dump(model, f)

Configura l'addestramento del modello utilizzando uno stimatore XGBoost SageMaker e la funzione TrainingStep di SageMaker Pipelines.

# Set XGBoost model hyperparameters 
hyperparams = {  
"eval_metric" : "auc",
"objective": "binary:logistic",
"num_round": "5",
"max_depth":"5",
"subsample":"0.75",
"colsample_bytree":"0.75",
"eta":"0.5"
}

# Set XGBoost estimator
xgb_estimator = XGBoost(
entry_point="xgboost_train.py", 
output_path=estimator_output_uri,
code_location=estimator_output_uri,
hyperparameters=hyperparams,
role=sagemaker_role,
# Fetch instance type and count from pipeline parameters
instance_count=train_instance_count,
instance_type=train_instance_type,
framework_version="1.3-1"
)

# Access the location where the preceding processing step saved train and validation datasets
# Pipeline step properties can give access to outputs which can be used in succeeding steps
s3_input_train = TrainingInput(
s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, 
content_type="csv", 
s3_data_type="S3Prefix"
)
s3_input_validation = TrainingInput(
s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
content_type="csv",
s3_data_type="S3Prefix"
)

# Set pipeline training step
train_step = TrainingStep(
name="XGBModelTraining",
estimator=xgb_estimator,
inputs={
"train":s3_input_train, # Train channel 
"validation": s3_input_validation # Validation channel
}
)

Copia, incolla ed esegui il blocco di codice riportato che verrà utilizzato per creare un modello SageMaker utilizzando la funzione CreateModelStep di SageMaker Pipelines. Questa fase utilizza l'output della fase di addestramento per creare un pacchetto del modello per l'implementazione. Tieni presente che il valore per l'argomento del tipo di istanza viene passato utilizzando il parametro SageMaker Pipelines definito in precedenza nel tutorial.

# Create a SageMaker model
model = sagemaker.model.Model(
    image_uri=training_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=sagemaker_role
)

# Specify model deployment instance type
inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type_param)

create_model_step = CreateModelStep(name="FraudDetModel", model=model, inputs=inputs)

In un flusso di lavoro di ML, è importante valutare un modello addestrato alla ricerca di potenziali distorsioni e comprendere in che modo le varie funzionalità nei dati di input influiscono sulla previsione del modello. SageMaker Pipelines fornisce una funzione ClarifyCheckStep che può essere utilizzata per eseguire tre tipi di controlli: controllo degli errori nei dati (pre-addestramento), controllo degli errori nel modello (post-addestramento) e controllo di spiegabilità del modello. Per ridurre il tempo di esecuzione, in questo tutorial, saranno implementati solo i controlli degli errori e di spiegabilità. Copia, incolla ed esegui il blocco di codice riportato per impostare SageMaker Clarify per il controllo degli errori nel modello. Tieni presente che questo passaggio raccoglie le risorse, come i dati di addestramento e il modello SageMaker creato nei passaggi precedenti, tramite l'attributo properties. Quando viene eseguita la pipeline, questa fase non inizia fino al termine dell'esecuzione dei passaggi che forniscono gli input. Per maggiori dettagli, consulta Dipendenza dei dati tra i passaggi nella guida per gli sviluppatori di Amazon SageMaker. Per gestire i costi e il tempo di esecuzione del tutorial, la funzione ModelBiasCheckConfig viene configurata per calcolare solo un parametro di errore, DPPL. Per ulteriori informazioni sui parametri di errore disponibili in SageMaker Clarify, consulta Misurazione dei dati post-addestramento e degli errori nel modello nella guida per gli sviluppatori di Amazon SageMaker.

# Set up common configuration parameters to be used across multiple steps
check_job_config = CheckJobConfig(
    role=sagemaker_role,
    instance_count=1,
    instance_type=clarify_instance_type,
    volume_size_in_gb=30,
    sagemaker_session=sess,
)

# Set up configuration of data to be used for model bias check
model_bias_data_config = sagemaker.clarify.DataConfig(
    # Fetch S3 location where processing step saved train data
    s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
    s3_output_path=bias_report_output_uri,
    label=target_col,
    dataset_type="text/csv",
    s3_analysis_config_output_path=clarify_bias_config_output_uri
)

# Set up details of the trained model to be checked for bias
model_config = sagemaker.clarify.ModelConfig(
    # Pull model name from model creation step
    model_name=create_model_step.properties.ModelName,
    instance_count=train_instance_count,
    instance_type=train_instance_type
)

# Set up column and categories that are to be checked for bias
model_bias_config = sagemaker.clarify.BiasConfig(
    label_values_or_threshold=[0],
    facet_name="customer_gender_female",
    facet_values_or_threshold=[1]
)

# Set up model predictions configuration to get binary labels from probabilities
model_predictions_config = sagemaker.clarify.ModelPredictedLabelConfig(probability_threshold=0.5)

model_bias_check_config = ModelBiasCheckConfig(
    data_config=model_bias_data_config,
    data_bias_config=model_bias_config,
    model_config=model_config,
    model_predicted_label_config=model_predictions_config,
    methods=["DPPL"]
)

# Set up pipeline model bias check step
model_bias_check_step = ClarifyCheckStep(
    name="ModelBiasCheck",
    clarify_check_config=model_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_bias_param,
    register_new_baseline=register_new_baseline_model_bias_param,
    supplied_baseline_constraints=supplied_baseline_constraints_model_bias_param
)

Copia, incolla ed esegui il blocco di codice riportato per impostare i controlli di spiegabilità del modello. Questa fase fornisce informazioni come l'importanza delle caratteristiche (come le funzioni di input influenzano le previsioni del modello).

# Set configuration of data to be used for model explainability check
model_explainability_data_config = sagemaker.clarify.DataConfig(
    # Fetch S3 location where processing step saved train data
    s3_data_input_path=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
    s3_output_path=explainability_report_output_uri,
    label=target_col,
    dataset_type="text/csv",
    s3_analysis_config_output_path=clarify_explainability_config_output_uri 
)

# Set SHAP configuration for Clarify to compute global and local SHAP values for feature importance
shap_config = sagemaker.clarify.SHAPConfig(
    seed=42, 
    num_samples=100,
    agg_method="mean_abs",
    save_local_shap_values=True
)

model_explainability_config = ModelExplainabilityCheckConfig(
    data_config=model_explainability_data_config,
    model_config=model_config,
    explainability_config=shap_config
)

# Set pipeline model explainability check step
model_explainability_step = ClarifyCheckStep(
    name="ModelExplainabilityCheck",
    clarify_check_config=model_explainability_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_explainability_param,
    register_new_baseline=register_new_baseline_model_explainability_param,
    supplied_baseline_constraints=supplied_baseline_constraints_model_explainability_param
)

Nei sistemi di produzione, non tutti i modelli addestrati vengono implementati. Di solito, vengono implementati solo i modelli con prestazioni migliori della soglia per un determinato parametro di valutazione. In questa fase, viene creato uno script Python che valuta il modello su un set di test utilizzando il parametro Receiver Operating Characteristic Area Under the Curve (ROC-AUC). Le prestazioni del modello rispetto a questo parametro vengono utilizzate in una fase successiva per determinare se il modello deve essere registrato e implementato. Copia, incolla ed esegui il codice riportato per creare uno script di valutazione che acquisisca un set di dati di test e generi il parametro AUC.

%%writefile evaluate.py

import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.metrics import roc_auc_score

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    # The name of the file should match how the model was saved in the training script
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_local_path = "/opt/ml/processing/test/test.csv"
    df_test = pd.read_csv(test_local_path)
    
    # Extract test set target column
    y_test = df_test.iloc[:, 0].values
   
    cols_when_train = model.feature_names
    # Extract test set feature columns
    X = df_test[cols_when_train].copy()
    X_test = xgb.DMatrix(X)

    logger.info("Generating predictions for test data.")
    pred = model.predict(X_test)
    
    # Calculate model evaluation score
    logger.debug("Calculating ROC-AUC score.")
    auc = roc_auc_score(y_test, pred)
    metric_dict = {
        "classification_metrics": {"roc_auc": {"value": auc}}
    }
    
    # Save model evaluation metrics
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing evaluation report with ROC-AUC: %f", auc)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(metric_dict))

Quindi, copia, incolla ed esegui il blocco di codice riportato per avviare l'istanza del processore e la fase di SageMaker Pipelines per eseguire lo script di valutazione. Per elaborare lo script personalizzato, utilizzi ScriptProcessor. La funzione ProcessingStep di SageMaker Pipelines accetta i seguenti argomenti: il processore, la posizione di input S3 per il set di dati di test, l'artefatto del modello e la posizione di output per archiviare i risultati della valutazione. Inoltre, viene fornito un argomento property_files. I file delle proprietà vengono utilizzati per archiviare le informazioni dall'output della fase di elaborazione, che in questo caso è un file json con il parametro delle prestazioni del modello. Come mostrato più avanti nel tutorial, questo è particolarmente utile per determinare quando deve essere eseguito un passaggio condizionale.

# Upload model evaluation script to S3
s3_client.upload_file(
    Filename="evaluate.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/evaluate.py"
)

eval_processor = ScriptProcessor(
    image_uri=training_image,
    command=["python3"],
    instance_type=predictor_instance_type,
    instance_count=predictor_instance_count,
    base_job_name=f"{base_job_name_prefix}-model-eval",
    sagemaker_session=sess,
    role=sagemaker_role,
)
evaluation_report = PropertyFile(
    name="FraudDetEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

# Set model evaluation step
evaluation_step = ProcessingStep(
    name="XGBModelEvaluate",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            # Fetch S3 location where train step saved model artifacts
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # Fetch S3 location where processing step saved test data
            source=process_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(destination=f"{model_eval_output_uri}", output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/evaluate.py",
    property_files=[evaluation_report],
)

Con SageMaker Model Registry, puoi catalogare i modelli, gestire le versioni dei modelli e implementare selettivamente i modelli alla produzione. Copia, incolla ed esegui il blocco di codice riportato per impostare la fase del registro del modello. I due parametri, model_metrics e drift_check_baselines, contengono i parametri di base calcolati in precedenza nel tutorial dalla funziona ClarifyCheckStep. Puoi anche fornire i tuoi parametri di base personalizzati. Lo scopo di questi parametri è fornire un modo per configurare le linee di base associate a un modello in modo che possano essere utilizzate nei controlli di deriva e nei processi di monitoraggio dei modelli. Ogni volta che viene eseguita una pipeline, puoi decidere di aggiornare questi parametri con le linee di base appena calcolate.

# Fetch baseline constraints to record in model registry
model_metrics = ModelMetrics(
    bias_post_training=MetricsSource(
        s3_uri=model_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json"
    ),
    explainability=MetricsSource(
        s3_uri=model_explainability_step.properties.CalculatedBaselineConstraints,
        content_type="application/json"
    ),
)

# Fetch baselines to record in model registry for drift check
drift_check_baselines = DriftCheckBaselines(
    bias_post_training_constraints=MetricsSource(
        s3_uri=model_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_constraints=MetricsSource(
        s3_uri=model_explainability_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_config_file=FileSource(
        s3_uri=model_explainability_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
)

# Define register model step
register_step = RegisterModel(
    name="XGBRegisterModel",
    estimator=xgb_estimator,
    # Fetching S3 location where train step saved model artifacts
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[predictor_instance_type],
    transform_instances=[predictor_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    # Registering baselines metrics that can be used for model monitoring
    model_metrics=model_metrics,
    drift_check_baselines=drift_check_baselines
)

Con Amazon SageMaker, un modello registrato può essere implementato per l'inferenza in diversi modi. In questa fase, il modello viene implementato utilizzando la funzione LambdaStep. Sebbene per le solide implementazioni di modelli che seguono le best practice CI/CD di solito si utilizzi SageMaker Projects, esistono dei casi in cui ha più senso utilizzare LambdaStep implementazioni di modelli leggeri su sviluppo, test ed endpoint interni che servono bassi volumi di traffico. La funzione LambdaStep fornisce un'integrazione nativa con AWS Lambda, in modo da poter implementare la logica personalizzata nella pipeline senza eseguire il provisioning o la gestione dei server. Nel caso di SageMaker Pipelines, LambdaStep consente di aggiungere una funzione AWS Lambda alle pipeline per supportare operazioni di calcolo arbitrarie, in particolare operazioni leggere di breve durata. Tieni presente che in una LambdaStep di SageMaker Pipelines, una funzione Lambda è limitata a 10 minuti di autonomia massima, con un timeout predefinito modificabile di 2 minuti. 

Sono disponibili due modi per aggiungere una LambdaStep alle pipeline. Per prima cosa, puoi fornire l'ARN di una funzione Lambda esistente che è stata creata con il kit di sviluppo per il cloud AWS (AWS CDK), la console di gestione AWS o in un altro modo. In secondo luogo, l'SDK Python di SageMaker di livello superiore ha una classe di convenienza dell'helper Lambda che può essere utilizzata per creare una nuova funzione Lambda insieme all'altro codice che definisce la pipeline. In questo tutorial, verrà utilizzato il secondo metodo. Copia, incolla ed esegui il codice riportato per definire la funzione del gestore Lambda. Questo è lo script Python personalizzato che accetta gli attributi del modello, come il nome del modello, e li distribuisce a un endpoint in tempo reale.

%%writefile lambda_deployer.py

"""
Lambda function creates an endpoint configuration and deploys a model to real-time endpoint. 
Required parameters for deployment are retrieved from the event object
"""

import json
import boto3


def lambda_handler(event, context):
    sm_client = boto3.client("sagemaker")

    # Details of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    instance_type = event["instance_type"]
    instance_count = event["instance_count"]
    primary_container = {"ModelPackageName": model_package_arn}

    # Create model
    model = sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer=primary_container,
        ExecutionRoleArn=role
    )

    # Create endpoint configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
        {
            "VariantName": "Alltraffic",
            "ModelName": model_name,
            "InitialInstanceCount": instance_count,
            "InstanceType": instance_type,
            "InitialVariantWeight": 1
        }
        ]
    )

    # Create endpoint
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name
    )

Copia, incolla ed esegui il blocco di codice riportato per creare la funzione LambdaStep. Tutti i parametri come il modello, il nome dell'endpoint e il tipo e il conteggio dell'istanza di distribuzione vengono forniti utilizzando l'argomento inputs.

# The function name must contain sagemaker
function_name = "sagemaker-fraud-det-demo-lambda-step"
# Define Lambda helper class can be used to create the Lambda function required in the Lambda step
func = Lambda(
    function_name=function_name,
    execution_role_arn=sagemaker_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The inputs used in the lambda handler are passed through the inputs argument in the 
# LambdaStep and retrieved via the `event` object within the `lambda_handler` function

lambda_deploy_step = LambdaStep(
    name="LambdaStepRealTimeDeploy",
    lambda_func=func,
    inputs={
        "model_name": pipeline_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": register_step.steps[0].properties.ModelPackageArn,
        "role": sagemaker_role,
        "instance_type": deploy_instance_type_param,
        "instance_count": deploy_instance_count_param
    }
)

In questa fase, si utilizza ConditionStep per confrontare le prestazioni del modello corrente in base al parametro Area Under Curve (AUC). La pipeline eseguirà controlli di errore e spiegabilità, registrerà il modello e lo distribuirà solo se le prestazioni sono maggiori o uguali a un AUC soglia (qui scelto come 0,7). Fasi condizionali come questa aiutano nell'implementazione selettiva dei migliori modelli alla produzione. Copia, incolla ed esegui il codice riportato per definire la fase condizionale.

# Evaluate model performance on test set
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.roc_auc.value",
    ),
    right=0.7, # Threshold to compare model performance against
)
condition_step = ConditionStep(
    name="CheckFraudDetXGBEvaluation",
    conditions=[cond_gte],
    if_steps=[create_model_step, model_bias_check_step, model_explainability_step, register_step, lambda_deploy_step], 
    else_steps=[]
)

Fase 4: Creazione ed esecuzione della pipeline

Dopo aver definito tutte le fasi del componente, puoi assemblarle in un oggetto SageMaker Pipelines. Non è necessario specificare l'ordine di esecuzione poiché SageMaker Pipelines deduce automaticamente la sequenza di esecuzione in base alle dipendenze tra le fasi.

Copia, incolla ed esegui il codice riportato per impostare la pipeline. La definizione della pipeline utilizza tutti i parametri definiti nella fase 2 e l'elenco di fasi del componente. Fasi come la creazione del modello, i controlli di errore e spiegabilità, la registrazione del modello e l'implementazione lambda non sono elencati nella definizione della pipeline perché non vengono eseguiti a meno che la fase condizionale non sia true. Se la fase condizionale è true, le fasi successive saranno eseguite nell'ordine in base agli input e agli output specificati.

# Create the Pipeline with all component steps and parameters
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[process_instance_type_param, 
                train_instance_type_param, 
                train_instance_count_param, 
                deploy_instance_type_param,
                deploy_instance_count_param,
                clarify_instance_type_param,
                skip_check_model_bias_param,
                register_new_baseline_model_bias_param,
                supplied_baseline_constraints_model_bias_param,
                skip_check_model_explainability_param,
                register_new_baseline_model_explainability_param,
                supplied_baseline_constraints_model_explainability_param,
                model_approval_status_param],
    steps=[
        process_step,
        train_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sess
    
)

Copia, incolla ed esegui il codice riportato in una cella nel notebook. Se la pipeline esiste già, il codice la aggiorna. Se la pipeline non esiste, ne verrà creata una nuova. Ignora eventuali avvertenze dell'SDK SageMaker come "Nessun processo di addestramento completato associato a questo stimatore. Assicurati che questo stimatore venga utilizzato solo per la creazione della configurazione del flusso di lavoro."

# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)

# Full Pipeline description
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

SageMaker codifica la pipeline in un Directed Acyclic Graph (DAG), dove ogni nodo rappresenta una fase e le connessioni tra i nodi rappresentano le dipendenze. Per ispezionare il DAG della pipeline dall'interfaccia di SageMaker Studio, seleziona la scheda SageMaker Resources (Risorse SageMaker) nel pannello di sinistra, quindi Pipelines (Pipeline) dall'elenco a discesa e poi scegli FraudDetectXGBPipeline, Graph (Grafico). Puoi vedere che le fasi della pipeline che hai creato sono rappresentati da nodi nel grafico e le connessioni tra i nodi sono state dedotte da SageMaker in base agli input e agli output forniti nelle definizioni delle fasi.

Esegui la pipeline eseguendo l'istruzione di codice seguente. I parametri di esecuzione della pipeline vengono forniti come argomenti in questa fase. Seleziona la scheda SageMaker Resources (Risorse SageMaker) nel pannello di sinistra, quindi Pipelines (Pipeline) dall'elenco a discesa e poi scegli FraudDetectXGBPipeline, Executions (Esecuzioni). Sarà elencata l'esecuzione corrente della pipeline.

 

 

# Execute Pipeline
start_response = pipeline.start(parameters=dict(
        SkipModelBiasCheck=True,
        RegisterNewModelBiasBaseline=True,
        SkipModelExplainabilityCheck=True,
        RegisterNewModelExplainabilityBaseline=True)
                               )

Per rivedere l'esecuzione della pipeline, scegli la scheda Status (Stato). Quando tutte le fasi vengono eseguite correttamente, i nodi nel grafico diventano verdi.

Nell'interfaccia di SageMaker Studio, seleziona la scheda SageMaker Resources (Risorse SageMaker) nel pannello di sinistra e scegli Model registry (Registro modello) dall'elenco a discesa. Il modello registrato è riportato in Model group name (Nome gruppo modello) nel riquadro di sinistra. Seleziona il nome del gruppo di modelli per visualizzare l'elenco delle versioni del modello. Ogni volta che esegui la pipeline, una nuova versione del modello viene aggiunta al registro se la versione del modello soddisfa la soglia condizionale per la valutazione. Scegli una versione del modello per visualizzare i dettagli, come l'endpoint del modello e il rapporto sulla spiegazione del modello.

 

 

Fase 5: Test della pipeline richiamando l'endpoint

In questo tutorial, il modello ottiene un punteggio superiore alla soglia scelta di 0,7 AUC. Pertanto, la fase condizionale registra e implementa il modello a un endpoint di inferenza in tempo reale.

Nell'interfaccia di SageMaker Studio, seleziona la scheda SageMaker Resources (Risorse SageMaker) nel pannello di sinistra, scegli Endpoints (Endpoint) quindi attendi che lo stato di fraud-detect-xgb-pipeline-endpoint diventi InService.

Una volta che lo stato dell'endpoint diventa InService, copia, incolla ed esegui il codice riportato per richiamare l'endpoint ed esegui le inferenze campione. Il codice restituisce le previsioni del modello dei primi cinque campioni nel set di dati di test.

# Fetch test data to run predictions with the endpoint
test_df = pd.read_csv(f"{processing_output_uri}/test_data/test.csv")

# Create SageMaker Predictor from the deployed endpoint
predictor = sagemaker.predictor.Predictor(endpoint_name, 
                                          sagemaker_session=sess,
                                          serializer=CSVSerializer(),
                                          deserializer=CSVDeserializer()
                                         )
# Test endpoint with payload of 5 samples
payload = test_df.drop(["fraud"], axis=1).iloc[:5]
result = predictor.predict(payload.values)
prediction_df = pd.DataFrame()
prediction_df["Prediction"] = result
prediction_df["Label"] = test_df["fraud"].iloc[:5].values
prediction_df

Fase 6: Eliminazione delle risorse

Per evitare di ricevere addebiti non desiderati, una best practice consiste nell'eliminare le risorse non utilizzate.

Copia e incolla il blocco di codice seguente per eliminare la funzione Lambda, il modello, la configurazione dell'endpoint, l'endpoint e la pipeline che hai creato in questo tutorial.

# Delete the Lambda function
func.delete()

# Delete the endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Delete the EndpointConfig
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete the model
sm_client.delete_model(ModelName=pipeline_model_name)

# Delete the pipeline
sm_client.delete_pipeline(PipelineName=pipeline_name)

Per eliminare il bucket S3, completa le seguenti operazioni: 

  • Apri la console Amazon S3. Sulla barra di navigazione, scegli Buckets (Bucket), sagemaker-<regione>-<id-account>, quindi seleziona la casella di controllo accanto a fraud-detect-demo. Quindi, seleziona Delete (Elimina). 
  • Nella finestra di dialogo Delete objects (Elimina oggetti), verifica di aver selezionato l'oggetto corretto da eliminare, quindi digita permanently delete nella casella di conferma Permanently delete objects (Elimina definitivamente gli oggetti). 
  • Una volta completato e il bucket è vuoto, potrai eliminare il bucket sagemaker-<regione>-<id-account> eseguendo di nuovo la stessa procedura.

Il kernel Data science utilizzato per eseguire l'immagine del notebook in questo tutorial accumulerà costi fino a quando non lo interromperai o eseguirai i passaggi riportati di seguito per eliminare le app. Per ulteriori informazioni, consulta Shut Down Resources (Risorse di arresto) nella guida per gli sviluppatori di Amazon SageMaker.

Per eliminare le app SageMaker Studio, completa la seguente procedura: nella console SageMaker Studio, scegli studio-user, quindi elimina tutte le app riportate in Apps (App) selezionando Delete app (Elimina app). Attendi fino a che Status (Stato) diventa Deleted (Eliminato).

Se nel passaggio 1 hai utilizzato un dominio SageMaker Studio esistente, salta il resto del passaggio 6 e procedi direttamente alla sezione conclusiva. 

Se hai eseguito il modello CloudFormation nel passaggio 1 per creare un nuovo dominio SageMaker Studio, continua con i passaggi seguenti per eliminare il dominio, l'utente e le risorse create dal modello CloudFormation.  

Per aprire la console CloudFormation, immetti CloudFormation nella barra di ricerca della console AWS, quindi scegli CloudFormation dai risultati della ricerca.

Nel riquadro CloudFormation, scegli Stack. Dall'elenco degli stati, seleziona Active (Attivo). In Stack name (Nome stack), scegli CFN-SM-IM-Lambda-catalog per aprire la pagina dei dettagli dello stack.

Nella pagina dei dettagli dello stack CFN-SM-IM-Lambda-catalog, scegli Delete (Elimina) per eliminare lo stack insieme alle risorse create nella fase 1.

Conclusioni

Complimenti! Hai terminato il tutorial Automatizzazione dei flusso di lavoro di machine learning

Hai utilizzato correttamente Pipeline Amazon SageMaker per automatizzare il flusso di lavoro di ML end-to-end a partire dall'elaborazione dei dati, l'addestramento del modello, la valutazione del modello, il controllo di errori e spiegabilità, la registrazione condizionale del modello e la distribuzione. Infine, hai utilizzato l'SDK SageMaker per implementare il modello in un endpoint di inferenza in tempo reale e lo hai testato con un payload campione.

Puoi continuare il tuo percorso di machine learning con Amazon SageMaker completando le fasi successive riportate di seguito.

Implementazione di un modello di machine learning

Scopri come implementare un modello di ML per l'inferenza.
Successivo »

Addestramento di un modello di deep learning

Scopri come creare, addestrare ed ottimizzare un modello di deep learning TensorFlow.
Successivo »

Trova altri tutorial pratici

Esplora altri tutorial di machine learning per approfondire.
Successivo »