场景简介
随着深度学习领域的不断发展,很多生成类模型的效果已经达到“准商用”的水平,因此,也有很多客户选择在这类模型上进行调优和封装并作为应用提供给终端用户,从而实现算法能力产品化。在本博客中,我们将为大家展示如何用利用Amazon SageMaker Pipeline构建包括数据预处理,模型训练,模型推理以及任务状态通知的自动化工作流。在本博客中我们将为大家展示主要部分的技术实现,您可以参考完整代码 。
StyleGAN是目前最先进的高分辨率图像合成方法之一,它生成的人脸照片一度被认为“逼真到吓人”。在2021年初,英伟达(NVIDIA)的研究人员发布并开源了升级版本:StyleGAN2。该版本重点修复了特征伪影问题,并在进一步提高生成图像质量的同时降低了训练时间和训练集大小。自面世以来,StyleGAN2受到了众多智能视觉爱好者的欢迎,其实践除了人脸生成以外还包括宠物生成,动漫生成,场景生成等等。广泛的应用场景也引起了艺术设计领域从业者的兴趣,他们希望借助此模型来丰富他们现有平台的功能,利用AI来赋能内容生成。
Anime Face Dataset是一个来自于Kaggle平台的开源数据集。里面包含63,632张卡通头像图片可用于做各种各样的深度学习实验。
工程架构参考
如下图所示,我们不妨模拟为平台客户封装AI增值服务的场景。在该场景中,平台客户的终端用户有可能是原画师或者设计师,他们将仅需要在客户的前端网页上上传标注好的图像素材。平台客户则仅需要在后台触发预设好的SageMaker Pipeline任务。SageMaker Pipeline将会自动获取S3中的训练数据并开启整个工作流,并按需回传训练的中间结果,最终的模型文件和推理结果。在推理完成之后,SageMaker Pipeline会通过Amazon SNS通知平台终端用户。
部分实验结果:
本博客的实验框架环境为Python3.8,PyTorch1.8.1+cu111。同时选用了单个ml.p3.2xlarge作为训练计算资源。
下图为完整训练后部分生成样本,仅供参考。
完整的训练将需要持续72小时左右,请参考以下超参:
hyperparameters = {"data":"/opt/ml/input/data/training/animeface.zip",
"gpus":1,
"augpipe":"bg",
"gamma":10,
"cfg":"paper256",
"mirror":1,
"snap":10,
"kimg":2000,
"metrics":"none",
"outdir":"/opt/ml/model",
}
本博客为了简化工程演示,将“kimg”设为1。
产品服务
Amazon SageMaker Pipeline
Amazon SageMaker Pipelines是一种利用Amazon SageMake直接集成构建机器学习管道的工具。利用Pipeline,您可以实现复杂工作流步骤的编排,其中不仅包括机器学习相关的特征工程,训练和部署等步骤,还包括直接集成大规模数据处理的EMR Step以及可以用来触发其他服务的Lambda Step。配置好的管道可以通过Amazon EventBridge触发,或是通过Lambda进行调用。同时,您也可以通过配置占位参数更便捷地换元并执行整个管道。如果您使用SageMaker Studio,还可以动态查看流程的进展以及相关的信息。
在本博客中,我们将介绍如何用Pipeline串联起数据处理,训练,在线和离线部署等步骤来组织整个工作流。结合占位元,该工作流就可以比较灵活的复用。比如,当我们有多个模型以同样的流程提供线上服务,我们就可以将镜像地址作为占位元实现“一键”复用而无需反复配置;又或是我们想尝试不同的资源配置来测试整个流程的性能,那么我们就可以使用相关参数作为占位元。同时,结合Amazon EventBridge 和Amazon Lambda ,我们可以实现自动触发机制。比如,每周或是每月会产生一批新的生产数据,而我们需要定时对他们进行批量推理;又或是,每当S3特定位置中上传了新文件我们需要重新进行模型评估。在启动整个管道流程后我们可以根据每个组件的Log Group在CloudWatch上查找相关日志。比如TrainingStep相关的日志可以在对应Training Job的Log Group中查看。
SageMaker Processing Job
SageMaker Processing Job是一款可分布式作业的托管数据处理功能,您可以利用他来完成例如特征工程、数据验证、模型评估和模型批量推理等任务。 您不仅可以选择使用Sagemaker预置的处理镜像,也可以选择自带镜像.
在本博客中,我们将用Processing Job来完成数据的预处理和模型训练后的批量推理任务。
Sagemaker Training Job BYOC: Bring Your Own Container (自带镜像实现模型训练)
通常我们会建议客户以自带算法脚本(Bring Your Own Scripts)的形式在SageMaker上进行训练,然而为了最大限度保证生产环境的稳定性和灵活性,在本博客中我们将介绍自带镜像在Amazon SageMaker上进行训练的方式。在这里我们选择使用SageMaker预置的基础镜像来减省环境配置的时间,更多预置镜像请参考这里 。
Amazon ECR
Amazon Elastic Container Registry (Amazon ECR) 是一款安全、可扩展且可靠的 AWS 托管容器映像注册表服务。 Amazon ECR 使用 AWS IAM 支持具有基于资源的权限的私有存储库。您可以使用首API完成推送、拉取和管理 Docker 映像、开放容器倡议 (OCI) 映像和 OCI 兼容的工件。
在本博客中我们将会使用Amazon ECR来存储我们的训练镜像。
Amazon SNS
Amazon Simple Notification Service (Amazon SNS) 是一项托管服务,可提供从发布者到订阅者(也称为生产者和消费者)的消息传递。发布者通过向主题发送消息来与订阅者异步通信,主题是逻辑访问点和通信通道。客户端可以订阅 SNS 主题并使用受支持的终端节点类型接收已发布的消息.
在本博客中Amazon SNS将在整个工作流顺利完成之后用邮件告知其主题订阅者最终推理结果在S3上的存储位置。如果您还没有创建过主题,请您参考此链接 来创建您的SNS主题并配置订阅邮箱。这样在启动长时间的训练任务时,我们就不必手动去查看任务状态。您还可以利用SNS做任务发起通知和故障通知。
技术实现
以下为本博客中Pipeline的示意图。我们可以比较清晰地看到Pipeline Steps和底层功能组件的对应关系。在代码层面,二者的语法入参也基本一致。组织SageMaker Pipeline更像是搭积木,我们分别定义每一个步骤,最终再将他们串起来。
第一步我们需要配置Pipeline的占位元:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
Process Placeholders
process_input_data_uri = 's3://{}/My-StyleGAN2-Pipeline/animeface/'.format(default_bucket)
process_input_data_path = ParameterString(name="ProcessInput",default_value=process_input_data_uri,)
process_instance_count = ParameterInteger(name="ProcessInstanceCount",default_value=1)
process_instance_type = ParameterString(name="ProcessInstancetType",default_value='ml.m5.xlarge',)
Train Placeholders
#image_uri = '357224784104.dkr.ecr.us-west-2.amazonaws.com/blogstylegan2'
image_uri = stylegan2_image
output_uri = 's3://{}/My-StyleGAN2-Pipeline/Model'.format(default_bucket)
checkpoint_uri = 's3://{}/My-StyleGAN2-Pipeline/checkpoints'.format(default_bucket)
train_instance_count = ParameterInteger(name="TrainInstanceCount",default_value=1)
train_instance_type = ParameterString(name="TrainInstancetType",default_value='ml.p3.2xlarge',)
train_checkpoint_path = ParameterString(name="TrainCheckpointPath",default_value=checkpoint_uri)
train_output_path = ParameterString(name="TrainOutputlPath",default_value=output_uri)# we write the final model to the same S3 directory as the inferencing source codes
train_image = ParameterString(name="TrainImage",default_value=image_uri,)
Inference Placeholders
source_code_uri = sagemaker_session.upload_data('stylegan2-ada-pytorch', key_prefix='My-StyleGAN2-Pipeline/Inference')
#Upload a test image
inference_img_uri = sagemaker_session.upload_data('test.png', key_prefix='My-StyleGAN2-Pipeline/InferenceImg')
inference_code_path = ParameterString(name="InferenceCodePath",default_value=source_code_uri)
inference_image_path = ParameterString(name="InferenceImgPath",default_value=inference_img_uri)
inference_instance_count = ParameterInteger(name="InferenceInstanceCount",default_value=1)
inference_instance_type = ParameterString(name="InferenceInstancetType",default_value='ml.g4dn.2xlarge',)
第二步我们需要定义数据预处理的ProcessingStep:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
framework_version = "0.23-1"
# Configure Processor
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=process_instance_type,
instance_count=process_instance_count,
role=role,
)
# Configure ProcessingStep
step_process = ProcessingStep(
name="stylegan2Process",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=process_input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train")
],
## Processing Arguments
job_arguments=['--source', '/opt/ml/processing/input/',
'--dest','/opt/ml/processing/train/animeface.zip',
'--width', '256',
'--height','256',],
code="code_pipeline/dataset_tool.py",
)
第三步我们需要定义用作模型训练的TrainingStep。这边我们可以直接引用上一步数据处理的输出作为本步骤的输入。同时我们也可以通过depends_on来定义步骤之间的依赖关系。在配置训练任务时,我们推荐您使用checkpoints,SageMaker将会自动将/opt/ml/checkpoints路径下的文件同步到S3指定路径,使您可以及时查看训练的中间结果。在训练结束之后,SageMaker将会将/opt/ml/model下的文件作为最终模型输出,因此我们需要调整代码中模型最后的输出位置。
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
# Configure training parameters
def json_encode_hyperparameters(hyperparameters):
return {str(k): json.dumps(v) for (k, v) in hyperparameters.items()}
params = {"data": "/opt/ml/input/data/train/animeface.zip",
"gpus": 1,
"augpipe": "bg",
"gamma": 10,
"cfg": "paper256",
"mirror": 1,
"snap": 10,
"metrics": "none",
"kimg": 1,
"outdir": "/opt/ml/checkpoints"}
hyperparameters = json_encode_hyperparameters(params)
# Configure the estimator
estimator_stylegan2 = Estimator(
role=role,
image_uri=train_image,
train_instance_count=train_instance_count,
train_instance_type=train_instance_type,
hyperparameters=hyperparameters,
disable_profiler=True,
checkpoint_s3_uri=train_checkpoint_path,
checkpoint_local_path='/opt/ml/checkpoints',
output_path= train_output_path,
)
# Configure Training Step
step_train = TrainingStep(
name="stylegan2train",
estimator = estimator_stylegan2,
inputs={
"train": TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
content_type = 'application/x-image'),
},
depends_on = [step_process],
)
第四步我们需要定义用作批量推理的ProcessingStep。我们可以按需定义多个数据输入和输出通道。如下所示我们可以分别传入代码,模型和用作推理的样例数据。本片博客为了工程演示选择实现了原代码中映射(projector.py)这个功能,您可以根据自己的需要采样生成(generate.py)或是进行风格混合(style_mixing.py)。
#Initialize the PyTorchProcessor
pytorch_processor = PyTorchProcessor(
framework_version='1.10.2',
role=get_execution_role(),
instance_type=inference_instance_type,
instance_count=inference_instance_count,
base_job_name='stylegan2_batch_inference',
py_version = 'py38'
)
# Configure ProcessingStep
step_process_inference = ProcessingStep(
name="stylegan2Inference",
processor=pytorch_processor,
inputs=[
# input 1: source code
ProcessingInput(source=inference_code_path,destination="/opt/ml/processing/input"),
# input 2: trained model
ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts,destination="/opt/ml/processing/model"),
# input 3: test image
ProcessingInput(source=inference_image_path,destination="/opt/ml/processing/data")
],
outputs=[
ProcessingOutput(output_name="result", source="/opt/ml/processing/output/test")
],
code="code_pipeline/inference.sh",
depends_on=[step_train]
)
最后我们需要定义负责消息分发的LambdaStep。首先我们需要定义对应的Lambda Function,该Function会在整个工作流顺利结束之后给订阅群组发邮件同时提示他们推理结果在S3上的存储位置。:
%%writefile lambda_deployer.py
"""
This Lambda function sents Email to SNS Topic end users once the inference is complete and notify them the output directory on S3.
"""
import json
import boto3
def lambda_handler(event, context):
""" """
sns_client = boto3.client("sns")
output_s3_dir = event["output_s3_dir"]
msg = 'The Inference is done! The output has been stored at: '+str(output_s3_dir)
response = sns_client.publish(
TopicArn='<Your Topic Arn Here>',
Message=msg,
Subject='StyleGAN2 Inference',
)
return {
"statusCode": 200,
"body": json.dumps("Message Sent!"),
}
然后我们需要定义预定的LambdaStep:
# Define Lambda Step
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
function_name = "StyleGAN2_pipeline_callback"
# Lambda helper class can be used to create the Lambda function
func = Lambda(
function_name=function_name,
execution_role_arn=lambda_role,
script="lambda_deployer.py",
handler="lambda_deployer.lambda_handler",
timeout=600,
memory_size=10240,
)
# Lambda Step Input
output_s3_dir = step_process_inference.properties.ProcessingOutputConfig.Outputs["result"].S3Output.S3Uri
# Lambda Step Output
output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
step_lambda = LambdaStep(
name="stylegan2Notification",
lambda_func=func,
inputs={
"output_s3_dir": output_s3_dir,
},
outputs=[output_param_1, output_param_2],
depends_on=[step_process_inference]
)
最后我们要将整个工作流组装并执行。这边要提醒大家,当我们的工作流逻辑非常复杂的时候,对整个工作流的代码调试会比较困难,因此建议先单独验证每一个组件再进行整体的串联。
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"stylegan2pipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[process_input_data_path,
process_instance_count,
process_instance_type,
train_instance_count,
train_instance_type,
train_checkpoint_path,
train_output_path,
train_image,
inference_image_path,
inference_code_path,
inference_instance_count,
inference_instance_type
],
steps=[step_process, step_train, step_process_inference, step_lambda],
)
pipeline.upsert(role_arn=role)
execution = pipeline.start()
Pipeline开始执行之后,我们可以在SageMaker Studio里面可视化看到动态的执行结果和相关日志。
还可以通过换元“一键”重启。
总结
随着诸多领域深度学习模型逐渐成熟并达到商用级别,怎么样能够用最小的运维成本来将模型产品化、平台化变成为了首要问题。在这一点上SageMaker Pipeline给出了答案:不仅可以编排复杂的工作流自动化执行,同时还省去了对底层资源的运维,这让相关应用可以快速上线。同时,对于大多数生成类模型,强壮的算力资源是训练和在线推理所必须的,然而本地自建集群往往难以灵活配置机型和数量,而SageMaker在这一点上也有非常明显的优势:可以灵活配置算力集群并实现弹性扩展,使得应用架构对业务变化的适应性更高。
本篇作者
Xueqing Li
GCR AI/ML 解决方案架构师。