亚马逊AWS官方博客

使用 Amazon SageMaker Processing 与 AWS Step Functions 构建机器学习工作流

Original URL: https://aws.amazon.com/cn/blogs/machine-learning/building-machine-learning-workflows-with-amazon-sagemaker-processing-jobs-and-aws-step-functions/

机器学习(ML)工作流负责编排并自动执行机器学习任务序列,包括数据收集,机器学习模型的训练、测试与评估,外加模型部署。AWS Step Functions能够在端到端工作流中编排并自动执行与 Amazon SageMaker相关的各项机器学习任务。AWS Step Functions数据科学软件开发工具包( AWS Step Functions Data Science Software Development Kit,简称SDK)是一套开源库,使您得以轻松创建包含数据预处理、模型训练和部署的工作流。您可以使用Python创建机器学习工作流,而无需分别设置及整合各项AWS服务。

Amazon SageMaker 是一项全托管服务,能够为每位开发人员及数据科学家提供以规模化方式构建、训练以及部署机器学习模型的能力。

在 re:Invent 2019大会上,我们宣布正式发布Amazon SageMaker Processing。这是Amazon SageMaker中的一项全新功能,可帮助用户轻松在全托管基础设施之上运行预处理、后处理以及模型评估等工作负载。

今天,我们很高兴宣布Step Functions服务已经与Amazon SageMaker Processing正式集成。这一集成将帮助数据科学家们使用Step Functions与Step Functions Data Science SDK将Amazon SageMaker Processing轻松整合至机器学习工作流当中。

AWS Step Functions Data Science SDK的优势

AWS Step Functions Data Science SDK将帮助数据科学家在无需处理硬件置备或软件部署等DevOps任务的前提下,轻松构建起机器学习工作流。Step Functions Data Science SDK已经与Amazon SageMaker实现内置集成,用以编排机器学习工作流中的各个环节,包括模型训练、超参数调优以及模型部署等。SDK将帮助您在本地开发并测试机器学习工作流,并以高度一致的方式将工作流轻松部署在AWS之上以供测试或生产。

该SDK还具备以下优势:

  • 易用性——您可以使用Python构建及编排机器学习工作流。该SDK还允许用户为团队内的其他成员创建可复用的工作流模板。Step Functions将帮助您快速向工作流中引入错误处理、重试逻辑、并行步骤以及分支。您也可以使用其他多种AWS服务构建起复杂的机器学习工作流,包括与Amazon DynamoDBAmazon SNSAmazon SQSAmazon EMRAWS LambdaAWS GlueAWS Batch以及 Amazon Elastic Container Service (Amazon ECS)等实现原生集成。关于更多详细信息,请参阅AWS Step Functions Data Science SDK
  • 敏捷性——Step Functions将帮助您建立起无服务器工作流,全程无需设置任何基础设施。您可以在几分钟内快速构建起新的工作流。此外,Step Functions还能够轻松扩展以匹配您的用例需求。
  • 成本——使用Step Functions,您只需要为每一次状态转换付费。换言之,只有状态转换才会产生成本,其他空闲时间不会带来任何费用。在您的运行次数达到一定规模之后,Step Functions将带来巨大的成本效益。另外,SDK与其他AWS服务(包括Amazon SageMaker)的原生集成能力,还将帮助大家进一步减少状态转换次数。在后文当中,我们将具体介绍其中一种原生集成场景。

Amazon SageMaker ProcessingStep现可作为AWS Step Functions Data Science SDK的组成部分进行使用。通过此项服务集成,您可以摆脱其他steps,包括用于创建、轮询以及检查Amazon SageMaker Processing作业状态的AWS Lambda steps。您还可以使用最新发布的ProcessingStep创建处理作业。

在此之前,要想将Amazon SageMaker Processing集成至Step Functions工作流当中,大家需要编写AWS Lambda函数以调用Amazon SageMaker Processing API。该函数使用低级AWS SDK以构造请求参数,调用Lambda Amazon SageMaker Processing job APIs (create_processing_job()describe_processing_job()list_processing_jobs()或者 stop_processing_job()),而后读取返回的响应对象。此外,机器学习工程师还需要嵌入其他逻辑,借此通过其他工作流steps定期执行较频繁的轮询以创建处理作业并检查该作业的实际状态(包括Wait状态、Choice状态以及Task状态)。下图所示,为此前启动Step Functions工作流的具体方式:

“ProcessingStep”发布之前,AWS Step Functions工作流的启动方式

这种方法需要频繁轮询处理作业的状态,而且需要在整个工作流当中增加其他steps,这会导致复杂性持续提升。而随着所检查的状态发生转换,这种轮询机制还会产生额外的成本。

新的Amazon SageMaker Processing step

使用新的 ProcessingStep,我们能够在无需向工作流中写入额外steps的前提下,同步获取响应结果。

新的 ProcessingStep 会创建一项Task状态以满足处理作业的需求。大家可以参阅Step Functions Amazon SageMaker Jupyter Notebook以了解其工作原理。

在这一notebook中,我们创建了一个SKLearn Amazon SageMaker Processor 对象,详见以下代码:

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1200,
)

 

 

接下来,我们使用此processor创建 ProcessingStep

processing_step = ProcessingStep(
    "SageMaker pre-processing step",
    processor=sklearn_processor,
    job_name=execution_input["PreprocessingJobName"],
    inputs=inputs,
    outputs=outputs,
    container_arguments=["--train-test-split-ratio", "0.2"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
)

 

 

此处理step使用预处理脚本preprocessor.py,并配合用于指定数据集中训练与测试部分划分比例(例如0.2)的参数。关于输入参数的更多详细信息,请参阅ProcessingStep

新的 ProcessingStep会启动一项新的处理作业,并默认等待同步完成。以此为基础,相较于以往需要编写新Lambda函数及其他steps以轮询处理作业状态的方法相比,现在您可以在单一step当中获得处理作业的状态与输出结果。ProcessingStep当中的wait_for_completion参数被设置为True,代表Task状态应等待处理作业完成后,方可执行下一step。当ProcessingStep完成之后,Step Functions会将DescribeProcessingJob 的响应结果作为该step的输出结果。Step Functions会在内部监听Amazon EventBridge Amazon SageMaker事件,借此获取关于处理作业状态变更的通知。这种方法帮助最终用户摆脱了繁琐的作业状态轮询工作。

如果您要求ProcessingStep输出结果中的任何值,以用于Step Functions工作流中的下一状态,则可在Choice状态中使用Choice Rules机制,详见以下代码:

my_choice_state.add_choice(

rule=ChoiceRule.StringEquals(variable=processing_step.output()["ProcessingJobStatus"], value="Completed")
next_step=happy_path

)

 

以上代码会检索处理作业的状态,检查其是否为Completed,并在ProcessingStep完成之后设置下一项状态。

您还可以创建Step Functions Catch block,并使用add_catch()将其添加至状态捕捉器列表当中,借此在ProcessingStep当中添加错误处理逻辑。详见以下代码:

catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=['States.TaskFailed'],
    next_step=failed_state_sagemaker_processing_failure

)
processing_step.add_catch(catch_state_processing)

 

 

以下机器学习工作流,展示了如何在运行TrainingStep之前使用 ProcessingStep对数据集进行预处理。

使用全新“ProcessingStep”之后的AWS Step Functions Workflow

您还可以创建更多其他机器学习工作流,并使用Step Functions中的动态并行功能支持多项并行任务的同时启动及运行。例如,在开始训练之前,您的工作流中可以运行多项独立任务,包括异常检测或特征选择等。您还可以通过Parallel状态启动多个处理steps以执行这些任务。

Amazon SageMaker Processing作业还受到EventBridge的支持。这项与EventBridge的集成将允许您监控处理作业的状态变更,并据此自动触发操作。例如,您可以对EventBridge规则加以配置,使其与Amazon SageMaker Processing事件相匹配。在模式匹配之后,该规则将把事件路由至目标处。您可以在Amazon CloudWatch控制台上完成这一配置。具体操作步骤如下:

  1. 在 CloudWatch控制台的Events之下,选择Rules
  2. 选择Create rule
  3. Service Name部分,选择SageMaker
  4. Event Type部分,选择SageMaker Processing Job State Change
  5. 在 Targets 部分,选择Add target以配置您的事件处理程序

总结

本文向大家简要介绍了Step Functions Data Science SDK当中的全新ProcessingStep,包括如何使用它创建Amazon SageMaker Processing作业。此外,我们还共同了解了其如何帮助用户摆脱繁忙的处理作业状态轮询、避免在工作流中引入不必要的steps,并为新的Processing step添加重试与错误处理等逻辑。本文也提供示例notebook,在展示新step的用法之外,概述了如何使用EventBridge规则根据处理作业中的事件变化执行相应操作。

关于更多详细信息以及关于这套SDK的示例notebooks,请参阅面向Amazon SageMaker的AWS Step Functions Data Science SDK

本篇作者

Dhawalkumar Patel

Dhawalkumar Patel 是 Amazon Web Services 的高级解决方案架构师。 他一直就职于从大型企业到中型初创公司等组织,致力于解决与分布式计算和人工智能有关的问题。目前,他正专注于研究机器学习和无服务器技术

Shunjia Ding

AWS公司AWS Step Functions Services开发团队软件开发工程师。