AWS Machine Learning Blog

New features for Amazon SageMaker Pipelines and the Amazon SageMaker SDK

Amazon SageMaker Pipelines allows data scientists and machine learning (ML) engineers to automate training workflows, which helps you create a repeatable process to orchestrate model development steps for rapid experimentation and model retraining. You can automate the entire model build workflow, including data preparation, feature engineering, model training, model tuning, and model validation, and catalog it in the model registry. You can configure pipelines to run automatically at regular intervals or when certain events are triggered, or you can run them manually as needed.

In this post, we highlight some of the enhancements to the Amazon SageMaker SDK and introduce new features of Amazon SageMaker Pipelines that make it easier for ML practitioners to build and train ML models.

Pipelines continues to innovate its developer experience, and with these recent releases, you can now use the service in a more customized way:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Updated documentation on PipelineVariable usage for estimator, processor, tuner, transformer, and model base classes, Amazon models, and framework models. There will be additional changes coming with newer versions of the SDK to support all subclasses of estimators and processors.
  • 2.90.0 – Availability of ModelStep for integrated model resource creation and registration tasks.
  • 2.88.2 – Availability of PipelineSession for managed interaction with SageMaker entities and resources.
  • 2.88.2 – Subclass compatibility for workflow pipeline job steps so you can build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
  • 2.76.0 – Availability of FailStep to conditionally stop a pipeline with a failure status.

In this post, we walk you through a workflow using a sample dataset with a focus on model building and deployment to demonstrate how to implement Pipelines’s new features. By the end, you should have enough information to successfully use these newer features and simplify your ML workloads.

Features overview

Pipelines offers the following new features:

  • Pipeline variable annotation – Certain method parameters accept multiple input types, including PipelineVariables, and additional documentation has been added to clarify where PipelineVariables are supported in both the latest stable version of SageMaker SDK documentation and the init signature of the functions. For example, in the following TensorFlow estimator, the init signature now shows that model_dir and image_uri support PipelineVariables, whereas the other parameters do not. For more information, refer to TensorFlow Estimator.
    • Before:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )
    • After:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )
  • Pipeline sessionPipelineSession is a new concept introduced to bring unity across the SageMaker SDK and introduces lazy initialization of the pipeline resources (the run calls are captured but not run until the pipeline is created and run). The PipelineSession context inherits the SageMakerSession and implements convenient methods for you to interact with other SageMaker entities and resources, such as training jobs, endpoints, and input datasets stored in Amazon Simple Storage Service (Amazon S3).
  • Subclass compatibility with workflow pipeline job steps – You can now build job abstractions and configure and run processing, training, transform, and tuning jobs as you would without a pipeline.
    • For example, creating a processing step with SKLearnProcessor previously required the following:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )
    • As we see in the preceding code, ProcessingStep needs to do basically the same preprocessing logic as .run, just without initiating the API call to start the job. But with subclass compatibility now enabled with workflow pipeline job steps, we declare the step_args argument that takes the preprocessing logic with .run so you can build a job abstraction and configure it as you would use it without Pipelines. We also pass in the pipeline_session, which is a PipelineSession object, instead of sagemaker_session to make sure the run calls are captured but not called until the pipeline is created and run. See the following code:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)
  • Model step (a streamlined approach with model creation and registration steps) –Pipelines offers two step types to integrate with SageMaker models: CreateModelStep and RegisterModel. You can now achieve both using only the ModelStep type. Note that a PipelineSession is required to achieve this. This brings similarity between the pipeline steps and the SDK.
    • Before:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )
    • After:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
  • Fail step (conditional stop of the pipeline run)FailStep allows a pipeline to be stopped with a failure status if a condition is met, such as if the model score is below a certain threshold.

Solution overview

In this solution, your entry point is the Amazon SageMaker Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from within Studio, refer to View, Track, and Execute SageMaker Pipelines in SageMaker Studio.

The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train and generate inferences using the new features.

The pipeline includes the following steps:

  1. Preprocess data to build features required and split data into train, validation, and test datasets.
  2. Create a training job with the SageMaker XGBoost framework.
  3. Evaluate the trained model using the test dataset.
  4. Check if the AUC score is above a predefined threshold.
    • If the AUC score is less than the threshold, stop the pipeline run and mark it as failed.
    • If the AUC score is greater than the threshold, create a SageMaker model and register it in the SageMaker model registry.
  5. Apply batch transform on the given dataset using the model created in the previous step.

Prerequisites

To follow along with this post, you need an AWS account with a Studio domain.

Pipelines is integrated directly with SageMaker entities and resources, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because it’s a fully managed service, which means that it creates and manages resources for you. For more information on the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker product page.

Before getting started, install SageMaker SDK version >= 2.104.0 and xlrd >=1.0.0 within the Studio notebook using the following code snippet:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML workflow

For this post, you use the following components:

  • Data preparation
    • SageMaker Processing – SageMaker Processing is a fully managed service allowing you to run custom data transformations and feature engineering for ML workloads.
  • Model building
  • Model training and evaluation
    • One-click training – The SageMaker distributed training feature. SageMaker provides distributed training libraries for data parallelism and model parallelism. The libraries are optimized for the SageMaker training environment, help adapt your distributed training jobs to SageMaker, and improve training speed and throughput.
    • SageMaker Experiments – Experiments is a capability of SageMaker that lets you organize, track, compare, and evaluate your ML iterations.
    • SageMaker batch transform – Batch transform or offline scoring is a managed service in SageMaker that lets you predict on a larger dataset using your ML models.
  • Workflow orchestration

A SageMaker pipeline is a series of interconnected steps defined by a JSON pipeline definition. It encodes a pipeline using a directed acyclic graph (DAG). The DAG gives information on the requirements for and relationships between each step of the pipeline, and its structure is determined by the data dependencies between steps. These dependencies are created when the properties of a step’s output are passed as the input to another step.

The following diagram illustrates the different steps in the SageMaker pipeline (for a churn prediction use case) where the connections between the steps are inferred by SageMaker based on the inputs and outputs defined by the step definitions.

The next sections walk through creating each step of the pipeline and running the entire pipeline once created.

Project structure

Let’s start with the project structure:

  • /sm-pipelines-end-to-end-example – The project name
    • /data – The datasets
    • /pipelines – The code files for pipeline components
      • /customerchurn
        • preprocess.py
        • evaluate.py
    • sagemaker-pipelines-project.ipynb – A notebook walking through the modeling workflow using Pipelines’s new features

Download the dataset

To follow along with this post, you need to download and save the sample dataset under the data folder within the project home directory, which saves the file in Amazon Elastic File System (Amazon EFS) within the Studio environment.

Build the pipeline components

Now you’re ready to build the pipeline components.

Import statements and declare parameters and constants

Create a Studio notebook called sagemaker-pipelines-project.ipynb within the project home directory. Enter the following code block in a cell, and run the cell to set up SageMaker and S3 client objects, create PipelineSession, and set up the S3 bucket location using the default bucket that comes with a SageMaker session:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines supports parameterization, which allows you to specify input parameters at runtime without changing your pipeline code. You can use the modules available under the sagemaker.workflow.parameters module, such as ParameterInteger, ParameterFloat, and ParameterString, to specify pipeline parameters of various data types. Run the following code to set up multiple input parameters:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generate a batch dataset

Generate the batch dataset, which you use later in the batch transform step:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Upload data to an S3 bucket

Upload the datasets to Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Define a processing script and processing step

In this step, you prepare a Python script to do feature engineering, one hot encoding, and curate the training, validation, and test splits to be used for model building. Run the following code to build your processing script:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Next, run the following code block to instantiate the processor and the Pipelines step to run the processing script. Because the processing script is written in Pandas, you use a SKLearnProcessor. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Define a training step

Set up model training using a SageMaker XGBoost estimator and the Pipelines TrainingStep function:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Define the evaluation script and model evaluation step

Run the following code block to evaluate the model once trained. This script encapsulates the logic to check if the AUC score meets the specified threshold.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Next, run the following code block to instantiate the processor and the Pipelines step to run the evaluation script. Because the evaluation script uses the XGBoost package, you use a ScriptProcessor along with the XGBoost image. The Pipelines ProcessingStep function takes the following arguments: the processor, the input S3 locations for raw datasets, and the output S3 locations to save processed datasets.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Define a create model step

Run the following code block to create a SageMaker model using the Pipelines model step. This step utilizes the output of the training step to package the model for deployment. Note that the value for the instance type argument is passed using the Pipelines parameter you defined earlier in the post.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Define a batch transform step

Run the following code block to run batch transformation using the trained model with the batch input created in the first step:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Define a register model step

The following code registers the model within the SageMaker model registry using the Pipelines model step:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Define a fail step to stop the pipeline

The following code defines the Pipelines fail step to stop the pipeline run with an error message if the AUC score doesn’t meet the defined threshold:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Define a condition step to check AUC score

The following code defines a condition step to check the AUC score and conditionally create a model and run a batch transformation and register a model in the model registry, or stop the pipeline run in a failed state:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Build and run the pipeline

After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Run the following code in a cell in your notebook. If the pipeline already exists, the code updates the pipeline. If the pipeline doesn’t exist, it creates a new one.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusion

In this post, we introduced some of the new features now available with Pipelines along with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy a model for churn prediction. The solution can be extended with additional data sources

to implement your own ML workflow. For more details on the steps available in the Pipelines workflow, refer to Amazon SageMaker Model Building Pipeline and SageMaker Workflows. The AWS SageMaker Examples GitHub repo has more examples around various use cases using Pipelines.


About the Authors

Jerry Peng is a software development engineer with AWS SageMaker. He focuses on building end-to-end large-scale MLOps system from training to model monitoring in production. He is also passionate about bringing the concept of MLOps to broader audience.

Dewen Qi is a Software Development Engineer in AWS. She currently focuses on developing and improving SageMaker Pipelines. Outside of work, she enjoys practicing Cello.

Gayatri Ghanakota is a Sr. Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.

Rupinder Grewal is a Sr Ai/ML Specialist Solutions Architect with AWS. He currently focuses on serving of models and MLOps on SageMaker. Prior to this role he has worked as Machine Learning Engineer building and hosting models. Outside of work he enjoys playing tennis and biking on mountain trails.

Ray Li is a Sr. Data Scientist with AWS Professional Services. His specialty focuses on building and operationalizing AI/ML solutions for customers of varying sizes, ranging from startups to enterprise organizations. Outside of work, Ray enjoys fitness and traveling.