AWS Partner Network (APN) Blog

Migrate On-Premises Machine Learning Operations to Amazon SageMaker Pipelines for Computer Vision

By Na Yu and Caitlin Berger, Sr. Big Data Engineers – Mission Cloud
By Ryan Ries, Practice Lead – Mission Cloud Services
By Qiong Zhang, Kris Skrinak and Cristian Torres Salamanca, Sr. Partner Solutions Architects – AWS

Mission-Cloud-AWS-Partners-2023
Mission Cloud
Mission-Cloud-APN-Blog-CTA-2023

Amazon SageMaker Pipelines is a workflow orchestration tool for building machine learning (ML) pipelines with continuous integration and continuous delivery (CI/CD) capabilities. SageMaker Pipelines helps machine learning operations (MLOps) to automate and scale the entire ML lifecycle, including ML model building, training, deployment, and monitoring for lower costs and faster go-to-market time.

When migrating on-premises MLOps to Amazon SageMaker Pipelines, customers often find it challenging to monitor metrics in training scripts and add inference scripts for custom ML models.

There are a few examples from Amazon Web Services (AWS) showing how to use SageMaker Pipelines for training and inference scripts for AWS built-in algorithms. There is no such example with custom ML models for computer vision, however. Many customers also want to know the best practices of SageMaker Pipelines from use cases in production.

Mission Cloud implemented an end-to-end SageMaker Pipeline to build the workflow of model development to production, accelerating their customer’s computer vision model production process.

In this post, you will learn how to train and deploy customized training and inference scripts with SageMaker Script Mode for Bring Your Own Model (BYOM) in SageMaker Pipelines. You’ll learn best practices for building an end-to-end MLOps workflow for a computer vision model with SageMaker Pipelines, including:

  • How to train with a customized PyTorch script, including custom metrics tracked on Amazon CloudWatch.
  • How to reduce training costs by using Amazon EC2 Spot instances and checkpointing with SageMaker Pipelines.
  • How to register (with ModelStep) and deploy models with customized inference scripts in SageMaker Pipelines.

Mission Cloud is an AWS Premier Tier Services Partner and Managed Service Provider (MSP) with the Machine Learning Consulting Competency. An AWS Marketplace Seller also, Mission Cloud delivers a comprehensive suite of services to help businesses migrate, manage, modernize, and optimize their AWS cloud environments.

Solution Overview

Detectron2 is an open-source computer vision model that provides object detection and segmentation. There are many existing examples about how to train and deploy Detectron2 using a PyTorch model, but we’ll focus on training an existing PyTorch model training script and building an end-to-end SageMaker Pipeline with BYOM.

The following diagram illustrates the architecture for our modeling pipeline, where SageMaker Pipelines is used to automate ML steps, including training, model evaluation, metrics evaluation, model fail, register model, and an AWS Lambda step. The Lambda step invokes a Lambda function that deploys an endpoint, so researchers can do ad-hoc testing on the model.

Mission-Cloud-SageMaker-Pipeline-1

Figure 1 – Reference architecture on AWS.

The diagram below depicts the workflow of the modeling pipeline generated by SageMaker Pipelines for our Detectron2 object detection model training.

Mission-Cloud-SageMaker-Pipeline-2

Figure 2 – ML workflow generated by SageMaker Pipelines.

Sample Code for SageMaker Pipelines

This section shows the pipeline development step by step using SageMaker Pipelines with code samples.

Pipeline Parameters Setup

Pipeline parameters allow you to assign values to variables at runtime. This sample code shows the definitions for the seven pipeline parameters used in this modeling pipeline.

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

train_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p2.xlarge")

train_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

train_num_epochs = ParameterInteger(name="NumEpoch", default_value = 1)

train_learning_rate = ParameterFloat(name="LearningRate", default_value=0.0001)

train_max_iter = ParameterInteger(name="MaxIter", default_value = 160)

# checkpoint path 
s3_path = ParameterString(name="s3Path", default_value="s3://sagemaker-bucket/model/")

endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.c4.large") 

Training Step

We use a PyTorch estimator to bring our own customized training script. It uses Detectron2 for object detection, and the details of this training script can be found in train.py.

Define Customized Metrics Associated with the Training Script

SageMaker training jobs allow you to publish custom metrics from your training script to track the progress of training over time. These metrics can be viewed directly through SageMaker Experiments, the SageMaker console, and Amazon CloudWatch.

The sample code below shows how to define the customized metrics of training and testing average precisions in the modeling pipeline notebook. This metric_definitions is used as an input parameter of the estimator for the training job.

Metric definition includes two keys to be specified: Name which is used to define the metric name, and Regex which defines a regular expression that’s used to detect the metric. It searches all logs published to CloudWatch by the training job, so anything printed by the training job using a logger or print statement will be checked against this regular expression.

# define the customized metrics and make sure these metrics spelling are consistent in the training script to be displayed properly  
metric_definitions=[
            {'Name':'training_ap', 'Regex':'training_ap ([0-9\\.]+)'},
            {'Name':'testing_ap', 'Regex':'testing_ap ([0-9\\.]+)'}
        ]

If a log message contains a metric matching the regex above, it will be recorded in the SageMaker Experiment console.

Mission-Cloud-SageMaker-Pipeline-3

Figure 3 – Screenshot from SageMaker Experiments.

Correspondingly, the training script (train.py) must print log messages containing the metrics consistent with the regex defined in the code sample above.

The following code sample needs to be added to the training script to write logs that match this regex so the metrics will be recognized and recorded when the training job runs. Note that these two metric examples are not in the above given training script.

Here, training_ap and testing_ap have been calculated and assigned earlier in the training script. We are just printing them to the logs using the correct format.

logger.info("training_ap {}".format(training_ap))
logger.info("testing_ap {}".format(testing_ap))

Enable Training with Spot Instances

Amazon SageMaker makes it easy to take advantage of the cost savings available through Amazon EC2 Spot instances. You can use Spot instances for training jobs and save up to 90% on training costs over on-demand instance pricing. You can see the estimated percentage of cost saving from the SageMaker training job console.

To enable training with Spot instances, we’ll need to set values for max_wait and max_run; make sure you set the max_wait larger than max_run. SageMaker Estimator documentation provides detailed explanations for each of the parameters.

The max value allowed by AWS for max_run is 28 days, although your own AWS account may have a higher limit requested. The recommended best practice is to set max_run larger than the estimated length of the training job with additional buffer time.

In this example, since the training job takes about 10 minutes to finish, max_run is set to be 30 minutes (1800 seconds), as shown in the example below.

train_use_spot_instance = true
if train_use_spot_instance:
    max_run = 1800   
    max_wait = 1805  
else:
    max_run = 86400 # (24*60*60 default value)
    max_wait = None # (Required None default value)

See how train_use_spot_instance, max_run, and max_wait are used in the estimator definition in the section “Define the PyTorch estimator.”

Configure Dynamic Checkpoint Paths Through a Pipeline Parameter

Checkpoints are model snapshots used to save the state of machine learning models during training; checkpoint_s3_uri in the PyTorch estimator is where the checkpoint path is specified.

In our modeling pipeline, after one pipeline run is finished a checkpoint file is generated and saved to the specified checkpoint path. When you re-execute the pipeline to change the learning rate, for example, an error occurs in the training job if you do not update the checkpoint path to a new unique value. This happens because the training job tries to continue from the saved checkpoint path.

Our solution is to pass the checkpoint path as a pipeline parameter so it can be re-defined each time it’s run with a unique path. This is shown in the “Execute the pipeline” section.

Define the PyTorch Estimator

The following code shows how to define a PyTorch estimator using the parameters defined above. Make sure the train.py script is in the code folder.

from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker import get_execution_role 
from sagemaker.workflow.pipeline_context import PipelineSession

role = get_execution_role()
pipeline_session = PipelineSession()

# Define the model estimator 
pytorch_estimator = PyTorch(
    entry_point="train.py",  
    source_dir="code",
    role=role,
    framework_version=pytorch_framework_version,  
    py_version=python_version, 
    instance_count=train_instance_count,
    instance_type=train_instance_type,  
    hyperparameters={"num_epochs": train_num_epochs,
                      "learning_rate": train_learning_rate,
                      "max_iter": train_max_iter, 
                      "s3_path": s3_path,
                     },      
    checkpoint_s3_uri=s3_path, 
    use_spot_instances = train_use_spot_instance,  
    max_run=max_run,
    max_wait=max_wait,
    metric_definitions=metric_definitions,
    disable_profiler = True,
    sagemaker_session=pipeline_session
)

# Define the training step in the Pipeline 
step_train_model = TrainingStep(
    name= "Train-CV-Model",
    estimator=pytorch_estimator,
)

Evaluation Step

This sample code shows how to evaluate models by using a processing step.

from sagemaker.sklearn.processing import PyTorchProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_evaluate_model = ProcessingStep(
    name="evaluate-model",
    processor=PyTorchProcessor(
        framework_version="0.23-1",
        instance_type="ml.t3.medium", 
        instance_count=1,
        role=role,
    ),
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    property_files=[
        PropertyFile(
            name="EvaluationReport", 
            output_name="evaluation", 
            path="evaluation.json"
        ) 
    ], 
    code="code/evaluate_step.py",
)

step_evaluate_model.add_depends_on([step_train_model])

In the ProcessingStep, we use a PyTorch processor that’s the same as the framework version of our training step. This determines what container is used to run our provided evaluation script.

Next, provide an output directory using the outputs parameter. This is the local path on the processing job instance; anything you write to this processing job output is saved as an output in Amazon Simple Storage Service (Amazon S3).

Then, set a property file with the property_files parameter. This property file will be used to pass any values or metrics between this step and the condition step. The property file must be written to the above output directory as part of the processing job.

Finally, select the script to use for your model evaluation with the code parameter. The evaluation script must contain the code below in order to create and write to the specified property file.

evaluation_dict = {
  training_accuracy: training_accuracy # calculated accuracy, float
}

output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
  f.write(json.dumps(evaluation_dict))

For this step, we also need to add a dependency on the model training step using the method add_depends_on to make sure the evaluation step only runs after the training job is finished.

Register Step

To register our model to SageMaker Model Registry, we use ModelStep, a new feature in the SageMaker software development kit (SDK). ModelStep either packages and creates the model, or packages the model and registers it in SageMaker Model Registry. Packaging the model will zip the model artifacts with the code necessary for inference and define what container should be used to deploy the model.

Any class that extends the base SageMaker Model can be used with ModelStep. Here, we use PyTorchModel. We use the model artifacts resulting from the training step above, and package it with our customized inference script, inference.py.

from sagemaker.pytorch import PyTorchModel
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep

model_package_group_name = "my_model_package_group_name" #replace with the model package group you wish to use for this pipeline

model_pytorch = PyTorchModel(
                  model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
                     role=role,
                     sagemaker_session = PipelineSession(),
                     entry_point="inference.py", 
                     source_dir = "code", 
                     framework_version=pytorch_framework_version,  
                     py_version= python_version
                )

step_model_registration = ModelStep(
   name="MyModelRegistration",
   step_args=model_pytorch.register(
              content_types=["application/json"],
              response_types=["application/json"],
              inference_instances=["ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name= model_package_group_name,
              approval_status="Approved",
   ),
)

Note that both SageMaker Estimator and Model can be used to create model objects for deployment. However, the model object is a lower-level class that allows us to repackage with a customized inference script for model deployment.

In ModelStep, we register the model to the specified model package group and provide content and response types, allowed instance types for inference and transform. Note that content_types and response_types should be configured correctly corresponding to the inference script. Otherwise, the deployment pipeline will run into error.

We set the approval status to be “approved” so that we can later deploy our model without manual intervention.

Fail Step

If the evaluation metric does not pass the user-defined threshold in the condition step, a fail step below is defined to output a customized message.

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="fail-check-cv-model",
    error_message=Join(
        on=" ", values=["Execution failed due to first validation threshold Accuracy < ", 0.90]
    )
)

Condition Step

We use a condition step to direct which steps to take if the condition returns true (the register step), or if the condition returns false (the fail step). The condition to be checked is ConditionGreaterThanOrEqualTo.

The condition reads the training_accuracy metric in the property file resulting from the evaluation step and compares training_accuracy with a threshold of 0.90.

from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="training_accuracy",
    ),
    right=0.90,
)

step_cond = ConditionStep(
    name="Condition-Accuracy-Greater-Than-90",
    conditions=[cond_gte],
    if_steps=[step_model_registration],     
    else_steps = [step_fail],  
)

Deploy Model Endpoint with the AWS Lambda Step

The sample code below shows how we use the Lambda step to deploy a SageMaker endpoint.

First, create a Lambda function containing the code below.

import json
import boto3
import time

sm_client = boto3.client("sagemaker")

#   ARN of the IAM role to be used by the SageMaker instance once the endpoint is deployed
role_arn = <insert IAM ARN>

def lambda_handler(event, context):
    print(f"Received Event: {event}")

    model_package_arn = event['model_package_arn']
    endpoint_instance_type = event['endpoint_instance_type']

    model_description = sm_client.describe_model_package(ModelPackageName=model_package_arn)
    print("model package description: ")
    print(model_description)
    
    print("creating model from model package...")
    current_time = time.strftime("%m%d%H%M%S", time.localtime())
    model_name = model_description['ModelPackageGroupName'] + "-v" + str(model_description['ModelPackageVersion']) + "-" + current_time

    print("Model name : {}".format(model_name))
    container_list = [{'ModelPackageName': model_package_arn}]

    create_model_response = sm_client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role_arn,  
        Containers=container_list
    )

    print("Created model arn : {}".format(create_model_response["ModelArn"]))


    print("Creating endpoint config...")
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=model_name,
        ProductionVariants=[
            {
                "InstanceType": endpoint_instance_type,
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )
    print(f"create_endpoint_config_response: {create_endpoint_config_response}")

    print("Creating endpoint...")
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=model_name, EndpointConfigName=model_name
    )
    print(f"create_endpoint_response: {create_endpoint_response}")

    return {"statusCode": 200, "body": json.dumps("Successfully submitted endpoint creation")}

Note that the AWS Identity and Access Management (IAM) role assigned to the Lambda function should already be configured with the appropriate permissions.

Next, create a Lambda step in the pipeline notebook to invoke the Lambda function. The input parameters of the Lambda step include the Amazon Resource Name (ARN) of the above Lambda function, the registered model ARN output from the model registration step, and the endpoint instance type.

from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

lambda_arn = <your-lambda-arn> # replace with your lambda function arn 

step_deploy_model_lambda = LambdaStep(
    name="deploy-model-to-endpoint",
    lambda_func=Lambda(function_arn=lambda_arn),
    inputs={
        "model_package_arn": step_model_registration.properties.ModelPackageArn,
        "endpoint_instance_type": endpoint_instance_type,
    },
)

Define and Submit the Pipeline

The following code shows how to define and submit the pipeline by using all of the defined pipeline parameters.

from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        train_instance_type, 
        train_instance_count,
        train_num_epochs,
        train_learning_rate,
        train_max_iter,
        train_use_spot_instance,
        model_approval_status,
        endpoint_instance_type, 
        s3_path
    ],
    steps=[
        step_train_model,
        step_evaluate_model,
        step_cond, 
        step_deploy_model_lambda
    ],
)

pipeline.upsert(role_arn=role)

Execute the Pipeline

There are two ways to execute the pipeline after it has been submitted. One option is to use start() to execute the pipeline with specified input parameters, as shown below. This sample code shows how the two pipeline parameters checkpoint path (s3Path) and the learning rate (LearningRate) can be updated when executing the pipeline.

my_uuid = str(uuid.uuid4())[:8]
my_path = f"s3://sagemaker-us-east-2-123456789ABC/module-a/{pipeline_name}/{my_uuid}"

print(f"This checkpoint S3 path contains the checkpoint.pth and evaluation.json: {my_path}")

execution = pipeline.start(
    parameters = dict(
        s3Path = my_path,
        LearningRate = 0.0123
    )
)

Another option is to execute the pipeline from the SageMaker Studio console. In the SageMaker Pipelines dashboard, select the pipeline created and then select “Create execution.” This should bring up a window that allows you to fill in the input parameters.

Click “Start” to trigger a new pipeline execution. The values input from the console overwrite the values set when defining the input parameters in the beginning.

Mission-Cloud-SageMaker-Pipeline-4

Figure 4 – Screenshot to start an execution of a pipeline.

Conclusion

This post demonstrated how to build end-to-end Amazon SageMaker Pipelines with customized training and inference scripts for a computer vision model. We showed how to leverage SageMaker features, including SageMaker PyTorch Estimator, PyTorchProcessor, and PyTorchModel to simplify each pipeline step and to optimize compute cost.

The best practices highlighted in this post can help you explore the capabilities of SageMaker and simplify your MLOps pipeline building process.

Mission Cloud thanks Authentic ID for the partnership that allowed Mission to collaborate, define, and build Amazon SageMaker workflows. Authentic ID is a leading identity verifier utilizing cloud-native ML frameworks to realize its mission of “Identity made simple.”

You can also learn more about Mission Cloud in AWS Marketplace.

.
Mission-Cloud-APN-Blog-Connect-2023
.


Mission Cloud – AWS Partner Spotlight

Mission Cloud is an AWS Premier Tier Services Partner and MSP that delivers a comprehensive suite of services to help businesses migrate, manage, modernize, and optimize their AWS cloud environments.

Contact Mission Cloud | Partner Overview | AWS Marketplace | Case Studies