使用 AWS Trainium 和 Amazon SageMaker 训练机器学习模型

教程

概述

在本教程中,您将学习如何通过 AWS Trainium 实例使用 Amazon SageMaker 训练一个机器学习 (ML) 模型。由 AWS Trainium 加速器提供支持的 Amazon EC2 Trn1 实例专为高性能的深度学习 (DL) 训练构建,与同类 Amazon EC2 实例相比,Amazon EC2 Trn1 实例最多可节省 50% 的训练成本。Amazon SageMaker 是一项全托管服务,可以为每个开发人员和数据科学家提供大规模构建、训练和部署机器学习模型的能力。

在本教程中,您将使用 Amazon SageMaker Studio 这个提供全托管 Jupyter 笔记本界面的机器学习集成开发环境 (IDE),通过 AWS Trainium 构建和运行训练作业。我们将使用 IMDB 数据构建一个基于 BERT 的情感分析模型。BERT 是一个在大型英文数据语料库上进行自监督预训练的 Transformer 模型。该模型的主要目的是对使用整个语句(可能做了掩码处理)做决策的任务进行微调,例如序列分类、令牌分类和问答。该数据集包含 IMDB 的评论,这些评论可能是正面评论,也可能是负面评论。这次练习的目标是构建一个模型,用于预测给定的评论是正面评论还是负面评论。

要完成的目标

在本指南中,您将:

  • 使用 AWS Trainium (Trn1) 实例上的 Amazon SageMaker Training 训练一个用于文本分类的 BERT 模型

Prerequisites

开始本指南之前,您需要先满足以下条件:

  • AWS 账户:如果您还没有 AWS 账户,请遵循设置 AWS 环境指南中的说明获取快速概览。

 AWS 使用经验

新手

 完成时间

15 分钟

 所需费用

请参考 Amazon SageMaker 定价来估算本教程所需成本。

 前提条件

您必须登录 AWS 账户。

  使用的服务

Amazon SageMaker Training

 上次更新时间

2023 年 5 月 2 日

步骤 1:创建 AWS 账户

AWS 账户在每个 AWS 区域中只能拥有一个 SageMaker Studio 域。如果您已经拥有一个美国东部(弗吉尼亚州北部)区域的 SageMaker Studio 域,请按照 SageMaker Studio 设置指南将所需的 AWS IAM 策略附加到您的 SageMaker Studio 账户,然后跳过步骤 1,直接执行步骤 2 以设置 SageMaker Studio 笔记本。

如果没有现有的 SageMaker Studio 域,请继续执行步骤 1,运行 AWS CloudFormation 模板,创建 SageMaker Studio 域并添加本教程后续步骤所需的权限。

选择 AWS CloudFormation 堆栈链接。您将通过此链接打开 AWS CloudFormation 控制台,并创建 SageMaker Studio 域和名为 studio-user 的用户。您还将为您的 SageMaker Studio 账户添加所需的权限。在 CloudFormation 控制台上,确认右上角显示的区域是美国东部(弗吉尼亚州北部)。堆栈名称应为 CFN-SM-IM-Lambda-catalog,不应更改。系统需要 10 分钟左右来创建此堆栈的所有资源。

此堆栈假定您已在账户中设置了公共 VPC。如果没有公共 VPC,请参阅使用单个公有子网的 VPC 了解如何创建公共 VPC。

勾选“I acknowledge that AWS CloudFormation might create IAM resources”(我了解 AWS CloudFormation 可能会创建 IAM 资源),然后点击 Create stack(创建堆栈)。

在 CloudFormation 窗格中,选择 Stacks(堆栈)。创建堆栈后,堆栈的状态应从 CREATE_IN_PROGRESS 更改为 CREATE_COMPLETE。

步骤 2:设置 SageMaker Studio 笔记本

在此步骤中,您将启动一个新的 SageMaker Studio 笔记本、安装必要的开源库并设置与其他服务(包括 Amazon Simple Storage Service 即 Amazon S3)交互所需的 SageMaker 变量。

在控制台搜索栏中输入 SageMaker Studio,然后选择 SageMaker Studio。

从 SageMaker 控制台右上角的区域下拉菜单中选择 US East (N. Virginia)(美国东部(弗吉尼亚州北部))。从 Launch app(启动应用程序)下拉菜单中选择 Studio,可使用 studio-user 配置文件打开 SageMaker Studio。

打开 SageMaker Studio 用户界面。在导航栏中,依次选择 File(文件)> New(新建)> Notebook(笔记本)。  

在 Set up notebook environment(设置笔记本环境)对话框中,在 Image(镜像)下选择 Data Science 2.0。自动选择的内核是 Python 3。点击 Select(选择)。

这时笔记本右上角的内核应当显示 Python 3 (Data Science 2.0)。

步骤 3:准备数据

在此步骤中,您将使用 Amazon SageMaker Studio 笔记本预处理训练机器学习模型所需的数据,然后将数据上传到 Amazon S3。

我们将使用 Hugging Face 的 IMDB 数据集来训练模型。为了下载数据集,我们需要安装 datasets 和 transformers 库。要安装这些库的特定版本,请复制以下代码段并将其粘贴到笔记本的单元格中,然后按下 Shift+Enter 运行当前单元格。请忽略任何重启内核的警告和依赖冲突错误。

!pip install transformers==4.21.3 datasets==2.5.2

将下列导入代码添加到笔记本的单元格中 

from datasets import load_dataset
from datasets.filesystems import S3FileSystem
from tqdm.auto import tqdm

现在,我们可以使用 load_dataset 函数从 Hugging Face 数据集下载数据。我们将把数据集分为训练集和测试集。复制以下代码段并将其粘贴到笔记本的单元格中。

dataset = load_dataset("imdb",split="train",ignore_verifications=True)
dataset = dataset.train_test_split()

最后,我们将把数据集上传到 Amazon S3。  您可以指定要使用的 S3 存储桶,如果没有指定,代码将使用默认存储桶。复制以下代码并将其粘贴到笔记本的单元格中,然后执行代码。单元格输出应打印角色、存储桶和区域。  

import sagemaker
from sagemaker.pytorch import PyTorch

sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

我们现在有一个存储桶,我们将使用 Hugging Face 数据集 API 将数据集上传到 S3。复制并粘贴以下代码,将数据集上传到 S3。

s3 = S3FileSystem()  
s3_prefix = 'HFDatasets/imdb'

# save train_dataset to s3
training_input_path = f's3://{sagemaker_session_bucket}/{s3_prefix}'
dataset.save_to_disk(training_input_path,fs=s3)

步骤 4:构建训练脚本

 

借助 SageMaker,您可以在用于训练的 Python 脚本中引入自己的逻辑。通过在脚本中封装训练逻辑,您可以在使用 PyTorch 等常用机器学习框架容器的同时,纳入自定义训练例程和模型配置。在本教程中,您将准备一个训练脚本,该脚本使用 Hugging Face Transformers 库中的 BERT Transformer 模型,借助该脚本,您可以使用在上一步中上传的 IMDB 数据来训练文本分类模型。 

 

脚本模式的第一级功能是在独立的自定义 Python 脚本中定义自己的训练过程,并将其作为定义 SageMaker 估算器时的入口。复制并粘贴以下代码块,以编写封装模型训练逻辑的 Python 脚本。

%%writefile train.py

import argparse
import os
import torch
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_backend
from datasets import load_from_disk
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm.auto import tqdm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from datasets import load_from_disk
# Initialize XLA process group for torchrun
import torch_xla.distributed.xla_backend
import random
import evaluate



device = "xla"
torch.distributed.init_process_group(device)
world_size = xm.xrt_world_size() 


def parse_args():
    parser = argparse.ArgumentParser(description="Finetune a transformers model on a text classification task")

    parser.add_argument(
        "--train_data", type=str, default=os.environ["SM_CHANNEL_TRAIN"])

    parser.add_argument(
        "--max_length",type=int,default=128)
  
    parser.add_argument(
        "--model_name_or_path",
        type=str,
        help="Path to pretrained model or model identifier from huggingface.co/models.",
        required=True,
    )

    parser.add_argument(
        "--per_device_train_batch_size",
        type=int,
        default=8,
        help="Batch size (per device) for the training dataloader.",
    )
    parser.add_argument(
        "--per_device_eval_batch_size",
        type=int,   
        default=8,
        help="Batch size (per device) for the evaluation dataloader.",
    )
    parser.add_argument(
        "--learning_rate",
        type=float,
        default=5e-5,
        help="Initial learning rate (after the potential warmup period) to use.",
    )
   
    parser.add_argument("--num_train_epochs", type=int, default=2, help="Total number of training epochs to perform.")
    parser.add_argument(
        "--max_train_steps",
        type=int,
        default=2000,
        help="Total number of training steps to perform. If provided, overrides num_train_epochs.",
    )


    parser.add_argument("--output_dir", type=str, default=os.environ["SM_MODEL_DIR"], help="Where to store the final model.")
    parser.add_argument("--seed", type=int, default=100, help="A seed for reproducible training.")
    args = parser.parse_args()
    

    # Sanity checks
    if args.train_data is None:
        raise ValueError("Need a training file.")

    args.local_rank = int(os.environ["LOCAL_RANK"])
    args.world_rank = int(os.environ["RANK"])
    args.world_size = int(os.environ["WORLD_SIZE"])

    print("Local rank {} , World Rank {} , World Size {}".format(args.local_rank,args.world_rank,args.world_size))

    return args

def gather(tensor, name="gather tensor"):
    return xm.mesh_reduce(name, tensor, torch.cat)

def main():

    # Retrieve args passed to the training script
    args = parse_args()

    dataset = load_from_disk(args.train_data)

    tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path)
    
    # tokenizer helper function
    def tokenize(batch):
        return tokenizer(batch['text'], max_length=args.max_length, padding='max_length', truncation=True)
    
    # load dataset
    train_dataset = dataset['train'].shuffle()
    eval_dataset = dataset['test'].shuffle()

    # tokenize dataset
    train_dataset = train_dataset.map(tokenize, batched=True)
    eval_dataset = eval_dataset.map(tokenize, batched=True)

    
    xm.rendezvous("wait_for_everyone_to_reach")

    # Log a few random samples from the training set:

    
    # set format for pytorch
    train_dataset =  train_dataset.rename_column("label", "labels")
    train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

    if args.world_rank == 0:
        for index in random.sample(range(len(train_dataset)), 3):
            print(f"Sample {index} of the training set: {train_dataset[index]}.")

    eval_dataset =  eval_dataset.rename_column("label", "labels")
    eval_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

    if args.world_rank == 0:
        for index in random.sample(range(len(eval_dataset)), 3):
            print(f"Sample {index} of the training set: {eval_dataset[index]}.")

    
    # Set up distributed data loader
    train_sampler = None
    if world_size > 1: # if more than one core
        train_sampler = DistributedSampler(
            train_dataset,
            num_replicas = args.world_size,
            rank = args.world_rank,
            shuffle = True,
        )
    train_loader = DataLoader(
        train_dataset,
        batch_size = args.per_device_train_batch_size,
        sampler=train_sampler,
        shuffle=False if train_sampler else True,
    )

    if world_size > 1: # if more than one core
        eval_sampler = DistributedSampler(
            eval_dataset,
            num_replicas = args.world_size,
            rank = args.world_rank,
            shuffle = True,
        )

    eval_loader = DataLoader(
        eval_dataset,
        batch_size = args.per_device_eval_batch_size,
        sampler=eval_sampler,
        shuffle=False if eval_sampler else True,
    )


    train_device_loader = pl.MpDeviceLoader(train_loader, device)
    eval_device_loader = pl.MpDeviceLoader(eval_loader, device)
    num_training_steps = args.num_train_epochs * len(train_device_loader)
    progress_bar = tqdm(range(num_training_steps))

    model = AutoModelForSequenceClassification.from_pretrained(args.model_name_or_path)

    model.to(device)
    optimizer = AdamW(model.parameters(), lr=args.learning_rate)

    # Get the metric function
    metric = evaluate.load("accuracy")
    
    for epoch in range(args.num_train_epochs):
        model.train() 
        for batch in train_device_loader:
            batch = {k: v.to(device) for k, v, in batch.items()}
            outputs = model(**batch)
            optimizer.zero_grad()
            loss = outputs.loss
            loss.backward()
            xm.optimizer_step(optimizer) #gather gradient updates from all cores and apply them
            if args.world_rank == 0:
                progress_bar.update(1)
        if args.world_rank == 0:
            print(
            "Epoch {}, rank {}, Loss {:0.4f}".format(epoch, args.world_rank, loss.detach().to("cpu"))
            )
        # Run evaluation after each epochs
        model.eval()
        if args.world_rank == 0:
            print("Running evaluation for the model")
        for eval_batch in eval_device_loader:
            with torch.no_grad():
                batch = {k: v.to(device) for k, v, in eval_batch.items()}
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)
            xm.rendezvous("wait_for_everyone_to_reach")
            # Gather predictions and labels from all workers to compute accuracy.
            predictions = gather(predictions)
            references = gather(batch["labels"])
            metric.add_batch(
                predictions=predictions,
                references=references
            ) 
        eval_metric = metric.compute()
        if args.world_rank == 0:
            print(f"epoch {epoch} : Validation Accuracy -: {eval_metric}")
            

    # Save checkpoint for evaluation (xm.save ensures only one process save)
    if args.output_dir is not None:
        xm.save(model.state_dict(), f"{args.output_dir}/checkpoint.pt")
        if args.world_rank == 0:
            tokenizer.save_pretrained(args.output_dir)
    if args.world_rank == 0:
        print('----------End Training ---------------')
    
if __name__ == '__main__':
    main()

1. 在训练脚本中,有几个重要细节值得一提:
较小的 Trainium 实例 (trn1.2xlarge) 包含 2 个神经元内核,而 trn1.32xlarge 包含 32 个神经元内核。为了高效地进行训练,我们需要一种将训练分配到可用神经元内核的机制。我们使用 Pytorch XLA 来达成该目标。PyTorch/XLA 是一个 Python 软件包,使用 XLA 深度学习编译器连接 PyTorch 深度学习框架和 AWS Trainium 等云加速器。您只需几行 XLA 专用代码就可以构建一个新的 PyTorch 神经网络或将现有网络切换到 XLA 设备上运行。

  • 这里,Pytorch/XLA 设备取代了 GPU 设备。由于我们使用的是概率分布,因此需要将 XLA 用作设备来初始化训练,如下图所示。
    device = "xla"
    torch.distributed.init_process_group(device)
  • PyTorch/XLA MpDeviceLoader 用于数据摄取管道。Pytorch/XLA MpDeviceLoader 可以同时执行以下三个步骤:跟踪、编译和数据批量加载到设备。这有助于提高性能。我们需要用 MpDeviceDataLoader 包装 PyTorch 数据加载器,如下所示。
train_device_loader = pl.MpDeviceLoader(train_loader, "xla")
  • 使用 XLA 提供的 API 运行优化步骤,如下所示。这样就可以巩固核心之间的梯度并发出 XLA 设备步长计算。
torch_xla.core.xla_model.optimizer_step(optimizer)

2. S3 中的数据将被复制到训练实例中,其路径将以 SM_CHANNEL_TRAIN 通道下的环境变量形式提供。
3. 传递给训练任务的超参数以参数形式提供。我们将使用 Argparser 在代码中读取参数,如下所示。

parser = argparse.ArgumentParser(description="Finetune a transformers model on a text classification task")

parser.add_argument(
        "--train_data", type=str, default=os.environ["SM_CHANNEL_TRAIN"])

4. 使用数据集 API 从通道路径 SM_CHANNEL_TRAIN 加载数据集。

dataset = load_from_disk(args.train_data)

5. 训练好的模型配置和权重存储在环境变量 SM_MODEL_DIR 提供的路径下。训练完成后,Amazon SageMaker 会将 SM_MODEL_DIR 路径下的文件复制到 S3 存储桶。然后,我们可以使用该模型,将其部署到我们选择的任何硬件上。我们需要确保将模型文件存储在 SM_MODEL_DIR 环境变量提供的路径下。

SageMaker 还提供了一种机制:在提供训练脚本的同时也提供 requirements.txt 文件,从而方便地安装训练所需的其他库。在本例中,我们还需要安装一些库才能使用 transformers 库。复制以下代码段并将其粘贴到笔记本的单元格中,然后执行代码以创建 requirements.txt 文件。

%%writefile requirements.txt

datasets==2.5.2
evaluate==0.3.0
transformers==4.21.0

我们成功创建了训练脚本和 requirements.txt 文件。  

步骤 5:训练机器学习模型

接下来,您将实例化一个 SageMaker 估算器。您将使用 AWS 托管的 PyTorch 估算器来运行自定义脚本。若要实例化 PyTorch 估算器,请复制并粘贴以下代码。

base_job_name = "imdb-sentiment-classification"

hyperparameters = {}

hyperparameters["model_name_or_path"] = "bert-base-uncased"
hyperparameters["seed"] = 100
hyperparameters["max_length"] = 128
hyperparameters["per_device_train_batch_size"] = 8
hyperparameters["per_device_eval_batch_size"] = 8
hyperparameters["learning_rate"] = 5e-5
hyperparameters["max_train_steps"] = 2000
hyperparameters["num_train_epochs"] = 1

接下来,您将实例化一个 SageMaker 估算器。您将使用 AWS 托管的 PyTorch 估算器来运行自定义脚本。若要实例化 PyTorch 估算器,请复制并粘贴以下代码。

pt_estimator = PyTorch(
    entry_point="train.py", 
    source_dir="./",
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.trn1.2xlarge",
    framework_version="1.11.0",
    py_version="py38",
    disable_profiler=True,
    base_job_name=base_job_name,
    hyperparameters=hyperparameters,
    distribution={"torch_distributed": {"enabled": True}},
)

这里有一些细节需要注意。entry_point 将指向我们在步骤 4 中创建的脚本。  还要注意有一个 source_dir 属性指向当前目录。这是必需的,因为我们需要复制 requirements.txt 文件并将其安装到训练实例中。对于实例类型,我们将其指定为 AWS Trainium,它有两种类型:trn1.2xlarge 和 trn1.32xlarge。在本练习中,我们将使用 trn1.2xlarge 实例。

我们现已创建估算器,需要使用 S3 数据路径调用 fit 方法来启动训练作业。
复制、粘贴并执行下面的单元格,启动训练作业。

pt_estimator.fit({"train": training_input_path})

这将使用 Trn1.2xlarge 实例运行训练。您可以在 Studio 笔记本中查看培训日志。

结论

恭喜您!您已完成使用 AWS Trainium 和 Amazon SageMaker 训练机器学习模型教程

在本教程中,您使用了 AWS Trainium 实例,利用 Amazon SageMaker 训练基于 BERT 的分类模型。AWS Trainium 为大规模机器学习模型训练提供了经济高效的机制,结合 Amazon SageMaker 易于构建机器学习模型的特点,您应当能够快速进行实验和扩展。

后续步骤

若要了解如何在 Service Quotas 控制台上设置 CloudWatch 报警以及如何创建配额请求模板,请查看以下资源。

开始使用亚马逊云科技免费构建

开始使用亚马逊云科技免费构建