亚马逊AWS官方博客

构建基于 Amazon SageMaker FastFileMode 的 LLM 流式训练

摘要

本文展示了如何在 Amazon SageMaker 训练任务的按需临时集群上,利用 SageMaker FastFile Mode(FFM)对存储于 Amazon S3 中的训练数据进行读取,针对 transformers Trainer API 及 train loop 的两种形式分别提供了流式读取训练数据的示例,简化大语言模型的算力基础设施构建。文末提供了性能 benchmark,作为参考。可以看出在大语言模型训练场景,该模式对整体耗时影响极其微弱同时较大简化训练基础设施构建及训练前的等待耗时。

背景

大模型训练对数据的要求巨大。对于中大型的企业,往往会搭建模型训练的平台,在高效地支持多业务线的模型训练需求的同时,避免了按业务线或按团队来置备算力集群的巨大开销。但其带来了挑战是,其算力需要从一个集中式的资源池按需拉取(或多个任务/团队通过队列向同一资源池提交任务)。虽然这可以带来更好的资源利用率,但各任务/团队的私有数据无法持久化在训练机器之上,因此一般会采用底层的共享存储。如 SageMaker 作为 ML 全栈平台,可以直接作为企业的机器学习开发平台,不同业务线或技术团队可以按需地提交训练任务到 SageMaker 集群,同时选择 S3 作为存储层,在训练资源完成分配之后将训练数据拷贝到集群的训练实例上,并且在训练过程中或结束之后将 checkpoint 拷贝到 S3 进行持久化存储。此外在模型开发或调试阶段, 数据量往往在几百 MB 到几个 GB 左右,从 S3 上直接读取会有更高的灵活性,如跨账号访问等需求。

Practice

SageMaker 为训练提供了多种数据读写能力,如 File Mode,Pipe Mode,Fast File Mode (FFM) [1]。其中,SageMaker FFM 在训练任务实例上,以 POSIX 文件系统的形式暴露了一个本地路径,直接映射到预设的 S3 路径。当在训练任务对数据进行消费时,数据会以 FUSE 的形式加载进入管道,训练时可以直接以本地文件路径进行数据读取。

在此基础上,训练代码中的数据集构建部分需要进行流式读取的改造,来避免在训练开始之前对全量数据的加载而导致的全量数据从 S3 到训练实例间拷贝的发生,充分利用 FFM 所提供了按需拉取的优势。

1. 原生 Stream Dataset + Train Loop

基本的基于流式数据加载的模型训练形式,可以参考 HuggingFace [3]。其主要逻辑如下,即通过 load_dataset() 方法,指定 streaming 参数。

from datasets import load_dataset
dataset = load_dataset(..., streaming=True)

完成对于数据加载形式的定义之后,利用 DataLoader 在每次 loop 中进行当前 batch 的加载,完成正/反向计算过程后,进行下一个 batch 的加载和计算。

dataloader = DataLoader(dataset, collate_fn=DataCollator(tokenizer))
for epoch in range(N):
    for i, batch in enumerate(tqdm(dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        ...
        loss.backward()
        optimizer.step()

2. 基于 transformers Trainer API 的流式训练

目前 transformers 库的 Trainer API 由于接口的便捷性,成为了较多大模型训练尤其是 SFT 阶段的常用方案。以上流式 load_dataset 缺乏对于 transformers 库中 Trainer API 进行直接支持的范例。为便于理解及使用,这里以基于 llama-based Stanford_Alpaca [5]项目进行适配为例[7],展示如何用 Stream dataset + Trainer API,来充分利用 SageMaker FFM(Fast File Mode)为训练任务的按需临时集群所提供的 S3 虚拟挂载能力,来简化大模型的调优及训练过程。

首先在 SageMaker Estimator 启动器中,用参数 input_mode='FastFile' 来启动 Fast File Mode 虚拟挂载,避免训练启动前的批量数据拷贝,并在 estimator.fit() 中指定训练数据所在的 S3 路径。

estimator = Estimator(image_uri=...
                      input_mode='FastFile',
                      ...)
                      
estimator.fit({'training': 's3://training-data-s3-path'})

在以下的训练代码中,我们可以对镜像中本地路径/opt/ml/input/training(其中/training 对应 estimator.fit() 中传入的 key 名称,见上例)进行 POSIX Read。在训练启动及数据 load 之前,由于训练数据此时尚未存在于训练实例的存储中,此时可以先获取该文件路径的 metadata。

localpath = pathlib.Path('/opt/ml/input/training')
fileslist = list(localpath.rglob("*"))

SageMaker FFM 为传入的 S3 channel 的 metadata 构建了一个 read-only FUSE (file system in userspace),同时提供每秒 5500 个请求的数据下载能力 [1]。

以中文 wiki 数据集为例,数据文件格式为.jsonlist ,单条数据示例如下:

{
    "id": "5985687", 
    "url": "https://zh.wikipedia.org/wiki?curid=5985687", 
    "title": "可唤醒I/O", 
    "text": "可唤醒I/O\n\n可唤醒I/O(Alertable I/O)是一种重叠I/O,发起I/O请求的线程在可唤醒状态下(alertable state)执行I/O请求的完成例程。也即完成例程作为回调函数(callback function),被这个线程异步过程调用。\n\n线程只有在执行下述API函数之一,并设置适当的参数标记时,才阻塞于可唤醒状态:\n"
}

接下来,使用 datasetsload_dataset() 将前序步骤中获取的 metadata 即文件列表以 Stream 的形式传入。

datafile = {'train': fileslist}
train_data = load_dataset('json', data_files=datafile, split='train', streaming=True)

在对数据集进行 tokenization 时,我们需要提供对应的处理逻辑。这里定义了新的 tokenize_wiki_batch 方法来为 map 接口提供 stream mini-batch 的处理方法。区别于全量加载或逐行加载,入参 list_data_dict 的 k-v 嵌套关系发生了变化,因此调整后处理方式如下。

def tokenize_wiki_batch(
        tokenizer: transformers.PreTrainedTokenizer,
        list_data_dict: Dict[str, Any],
):

    txttemplate = "{title}\n\t\t{text}\n\n"
    titlelist = list_data_dict['title']
    textlist = list_data_dict['text']
    batchlen = len(list_data_dict['text'])

    sources = [f"{tokenizer.bos_token}"] * batchlen
    targets = list(map(lambda x: txttemplate.format_map({'title':x[0],'text':x[1]}), 
                                        list(zip(titlelist, textlist)))
                                        )

    data_dict = preprocess(sources, targets, tokenizer)
    
    input_ids = data_dict["input_ids"]
    labels = data_dict["labels"]

    return {"input_ids": input_ids, "labels": labels}

train_data = train_data.shuffle(buffer_size=10000).map(
    partial(
        tokenize_wiki_batch,
        tokenizer
    )
)

进一步向上追溯至 preprocess() 方法,由于是数据不再是以全量的形式进行整体处理,因此需要将其中的_tokenize_fn() 中涉及到 padding 的部分,调整为模型的最大窗口长度 max_length,而无法如全量处理中仅需要 pad 到当前数据集中的最大长度。

def preprocess(sources, targets, tokenizer):
    ...
    call _tokenize_fn()
    ...

def _tokenize_fn(strings: Sequence[str], tokenizer: transformers.PreTrainedTokenizer) -> Dict:
    tokenized_list = [
        tokenizer(
            text,
            return_tensors="pt",
            # padding="longest",  ## 全量
            padding='max_length', ## Stream mini-batch
            max_length=tokenizer.model_max_length,
            truncation=True,
        )
        for text in strings
    ]

由于数据是流式按需加载,因此在训练启动之前 Trainer API 无法得知数据在何时消耗完。为了避免数据不足导致报错,需要首先根据所预备的数据集大小来计算出训练一个轮次所需的 steps(向下取整),或直接取用一个小于 max avaiable steps 的步数。同时需要据此设置本次训练的 max_step 训练参数,否则会引起报错。最终经过以上调整,可以使用该流式 dataset 直接传入 Trainer API 进行训练。

trainer = Trainer(model=model, tokenizer=tokenizer, args=training_args, dataset, data_collator)
trainer.train()

3. 基于 Train Loop 的文件粒度流式加载

除基于 SageMaker FFM + transformers Trainer API 的组合之外,自定义 Train loop 仍然具备较大的灵活性等优点。因此本方案也针对 train loop 的使用进行了补充。不同于 section 1 中的 load_dataset(streaming=True) 的形式,这里直接采用前文提到的方式获取 FUSE 下的文件列表,并进行 Loop 读取。以开源项目 PandaLLM [6]为例,在 Train Loop 中有:

localpath = pathlib.Path('/opt/ml/input/training')
fileslist = list(localpath.rglob("*"))

for i in range(len(filesdict)):
   _file = filelist[i]
   sub_train_dataset = load_and_cache_examples(_file)
   sub_train_sampler = DistributedSampler(sub_train_dataset)
   sub_train_dataloader = DataLoader(dataset=sub_train_dataset,
                                     sampler=sub_train_sampler,
                                     batch_size=...,
                                     num_workers=...,
                                     pin_memory=True,
                                     drop_last=True
                                    )

   for i, batch in enumerate(sub_train_dataloader):
      model.train() ...

dist.barrier()

其中值得注意的是,在 Dataloader 中启用 num_workerpin_memory 参数来实现多 worker 及异步数据加载,并同时减少数据拷贝开销。当 pin_memory=True 时,数据会被直接拷贝到 pinned memory 并随后拷贝至 GPU,节省了数据先拷贝到 pageable memory 的开销。

Figure1. How to Optimize Data Transfers in CUDA [4]

性能分析

测试环境如下:

机型:1 * p4d.24xlarge (8 * A100 40G)

数据:使用高速 S3 数据 duplicate 工具[8]构建性能测试用数据集

单个文件大小 文件个数 总大小
570K 183960 100GB
100M 1000 100GB

参考[1],这里同样选取两种大小不同(570KB 以及 100M)的文件形式来进行各自 100GB 数据集的构建,数据类型使用中文 LLM 训练常用的 zh_wiki 数据集(.jonsline 格式),测试 SageMaker Training Instance 上的性能。为拆解到每个环节,使用 practice 3 中 train loop 的形式以便进行消融实验。

1. Bulk Transfer

在 SageMaker Training 未开启 Fast File Mode 时,我们可以依赖 SageMaker File Mode 所提供的数据自动下载(为便于测试,性能测试中使用 aws s3 cp 命令来替代),或者 s5cmd [9]来进行训练前的数据下载。测试详情如下:

单个文件大小 文件个数 s3 cp 百 GB 耗时(Sec) s5cmd 百 GB 耗时(Sec)
570K 183960 2075 105
100M 1000 256 27

基于以上数据,假设总数据集总大小为 100G,单个文件 570K 时,s5cmd – 105s (~1GB/s),s3 cp – 2075s (~50MB/s)

单个文件 100M 的情况,s5cmd – 27s (~3.7GB/s),s3 cp – 256s (~400MB/s)。启动训练任务之后常规的文件传输类型需要进行批量传输,而该环节 SageMaker Fast File Mode 无实际的数据加载发生。

2. Load, Tokenization and Batch Distribution

在开始 loop 文件列表之后,并实际进行模型的 forward step 之前,需要进行文件读取、mini-batch load、tokenization、以及数据分发到 GPU 进程等一系列操作。对于前向过程发生之前所有必要数据 op 的汇总耗时如下,

单个文件大小 文件个数 FFM File Loop 平均耗时(Sec) 预估总耗时(Sec) Local File Loop 平均耗时(Sec) 预估总耗时(Sec)
570K 183960 0.6278 115490 0.543 99890
100M 1000 25.86 25860 25.45 25450

* 通过测试多次文件 Loop 的总耗时取得单 Loop 平均耗时,并通过百 GB 文件数量预估百 GB 数据 op 总耗时。

可以看出,叠加 Dataloader 在多 worker 下的异步加载以及 pin memory 开启的性能提升,训练过程中所有必要数据 op 的累计耗时对于本地 NVMe 读取或是 FFM 的 FUSE 读取的性能差异不显著。FUSE 读取在小文件情况下的性能折损相对更多,而在大文件下的整体性能已经基本与本地 NVMe 读取性能一致。

3. Full Train Loop

选取 llama2-7b deepspeed stage1 + gradient checkpointing + per_device_batch_size = 2 + gradient_accumulation_step=2,单个 global step 的训练部分耗时平均约 9.7s。将 100G 文件换算为约 90M 行数据(单条数据平均约 1.14KB),推测可得整个训练部分的耗时为 9.7s * 90M / (8GPU * 2device_batch * 2accumulation_step) 共 27M sec。叠加以上 Bulk Transfer+Data Operation+Training 几个部分,整体对比如下:

单个文件大小 s5cmd 批量传输(Sec) 数据 OP (Sec) 训练(Sec) 累计(Sec) DIFF
NVMe 570K 105 99890 2780000 2879995
100M 27 25454 2780000 2805481
FFM FUSE 570K 0 115490 2780000 2895490 0.54%
100M 0 25856 2780000 2805856 0.01%

Figure2. 完整训练 Loop 的耗时汇总

*增加 deepspeed stage 将进一步增大训练时长,导致数据 op 在整体的占比进一步降低。

**增加 batch size 对于单个 step 训练时长的影响一般显著大于 data loading。

从耗时差异对比整个训练过程,SageMaker FFM 及流式按需读取,对比批量传输及本地读取对整体性能的影响可以忽略。

总结

上文介绍了利用 SageMaker Fast File Mode 按需读取的数据接入形式,针对 stanford alpaca 项目对 dataset map 进行轻量改造为例使得流式数据加载适配 transformers Trainer API,以及以 file loop + train loop 训练形式为例,以文件粒度的流式加载进行大模型训练的两种方案。

  1. 对于数据集中单一文件过大无法一次性加载到内存,导致不得不使用流式数据加载的情形,提供了 Stream Dataset 与 transforms Trainer API 进行集成的改造示例。
  2. 利用 SageMaker Fast File Mode,在对训练耗时影响可忽略的前提下,避免了训练启动前的数据批量拷贝及等待过程,进一步提升了在机器学习平台/大模型训练平台上的训练及模型调试等任务的体验。

同时相关几点建议如下:

  1. 对于语言类大模型训练等比较典型的模型较大而 GPU 对一个 batch 的前向和后向计算慢于数据的网络传输和 batch 的准备的场景,或基于集群的模型开发调试等场景,使用 SageMaker FFM + 流式 batch 或文件加载的形式可以明显获得架构简化及启动等待耗时节省的优势。
  2. 同时,对于文本形式的数据集来说,建议先用低成本的算力进行数据侧的清洗及处理,如对小文件进行合并,或将 tokenization 等过程从训练 pipeline 拆解至低成本算力进行处理等。对于后续的无论是批量传输或流式按需读取加载,均可以达到更好的整体性能及性价比。
  3. 对于图像类的模型训练如 resnet,由于其图片训练集是天然的小文件无法进行合并处理,同时模型的参数量(或可训练参数量)较小,GPU 对一个 batch 的前向和后向计算快于或可比于数据的网络传输和 batch 的准备等场景,建议使用 FSx for Lustre 高速存储直接挂载来进行加速,以获得更高的训练 ROI。

参考

[1] SageMaker训练的数据源选型 – https://aws.amazon.com/cn/blogs/machine-learning/choose-the-best-data-source-for-your-amazon-sagemaker-training-job/

[2] SageMaker LLM训练最佳实践 – https://aws.amazon.com/cn/blogs/machine-learning/training-large-language-models-on-amazon-sagemaker-best-practices/

[3] Stream Dataset介绍 – https://huggingface.co/docs/datasets/stream

[4] GPU数据传输优化 – https://developer.nvidia.com/blog/how-optimize-data-transfers-cuda-cc/

[5] Stanford Alpaca – https://github.com/tatsu-lab/stanford_alpaca/blob/main/train.py

[6] 中文开源大语言模型PandaLLM – https://github.com/dandelionsllm/pandallm

[7] SageMaker LLM训练Sample Code – https://github.com/haozhx23/Alpaca-Llama2-Multinode-on-SageMaker

[8] S5cmd加速传输工具 – https://github.com/peak/s5cmd

[9] 测试数据集构造工具 – https://github.com/IAmSomeoneLikeYou/object_duplicate

本篇作者

郑昊

AWS GCR AI/ML Specialist SA。主要专注于 Language Model 的训练及推理、搜推算法及系统基于 AWS AI/ML 技术栈的相关优化及方案构建。在阿里,平安有多年算法研发经验。

王大伟

亚马逊云科技高级存储解决方案架构师,负责数据与存储架构设计,先后服务于 EMC、NetApp 等公司,具有十五年从事数据与存储相关经验,目前主要关注在人工智能与机器学习、高性能计算领域,如 EDA、自动驾驶、基因分析等领域的存储设计与优化。