亚马逊AWS官方博客

在 Amazon SageMaker AI 上使用 TorchRec 构建大规模推荐模型—模型训练篇

背景

随着深度学习技术的迅速发展,推荐系统已经从早期的协同过滤和矩阵分解方法,演进为以深度神经网络为核心的复杂架构。这些现代推荐系统在处理大规模稀疏特征时,面临着严峻的技术挑战——模型需要同时管理数以亿计的用户和物品特征,如何在分布式环境中高效地存储、训练和优化这些模型,成为推荐系统实践中的关键问题。现有的框架如 TensorFlow Recommenders 虽然为推荐系统提供了灵活的解决方案,但在处理超大规模稀疏特征和分布式训练方面仍存在局限。与之相比,PyTorch 中提供的 TorchRec 专门针对大规模推荐系统设计,通过将大规模稀疏特征 Embedding 分片存放在 GPU 上,并结合 GPU 的并行处理能力实现了更高效的解决方案。

本文将介绍如何在 Amazon SageMaker AI 平台上使用 TorchRec 构建和训练大规模推荐模型,并介绍不同配置下的模型训练性能对比。后续我们将发布第二篇文章,分享 TorchRec 模型推理相关的实践经验。

TorchRec 介绍

TorchRec 是 Meta 开源的 PyTorch 扩展库,提供了大规模推荐系统所需的常用稀疏计算和并行处理的基础组件。它的核心优势在于能够高效处理推荐系统中常见的超大规模稀疏特征,特别是将大型嵌入表分片到多个 GPU 上进行并行训练,而不是像传统推荐系统那样从参数服务器(PS)上拉取/同步参数,可以更加高效使用 GPU 的算力资源,目前已在多个大型科技公司的线上推荐系统中得到应用,例如 Meta 在 Instagram 业务上训练并上线了一个 1250 亿参数的模型。其典型的场景包括:视频推荐、书籍推荐、广告推荐、社交推荐等,在百亿规模模型参数时可以做到即插即用,不需要二次开发的成本即可成倍提升训练性能。

其主要特性如下:支持数据并行及模型并行的原语及混合使用,轻松实现大规模多设备/多节点模型训练;支持多种 Embedding 分片策略,包括 data-parallel、table-wise、row-wise、column-wise 等,可通过分片规划器自动根据带宽、显存、模型大小等决定分配策略,也支持手动调整分片方式;通过优化后的训练流水线提高训练效率;支持量化训练及推理等。

TorchRec 训练过程

同其它深度学习的训练过程类似,在 TorchRec 中,其训练可以拆解为预处理、前向计算、反向传播、参数更新几个部分。在当前的实现中,这几个阶段,除了预处理外,其它都需要顺序的执行。其中:

预处理阶段:系统对输入特征数据进行预处理和分发,将数据从数据并行格式转换为模型并行格式(如按表、行或列进行分片)。此过程支持流水线操作,与后续阶段部分重叠执行,以提高效率。

前向计算阶段:模型先进行 Embedding 查找,分布式环境下通过 all-to-all 通信将查询索引分发到各个 GPU,拉取对应的 Embedding 向量。随后,将 Embedding 与其他输入结合,通过底层和顶层的 MLP(多层感知机)进行特征计算。

反向传播阶段:先计算 MLP 的梯度,然后进行 Embedding 的反向计算,包括梯度的转换和累积。通过 all-reduce 操作在各个 GPU 间同步梯度信息。

参数更新阶段:最后更新模型参数,包括 Embedding 表的更新和 MLP 参数的优化。

Amazon SageMaker AI 介绍

Amazon SageMaker AI 作为一个端到端机器学习平台,提供了丰富的功能模块,包括数据预处理、模型训练/调优、模型部署/监控、MLOps 等。在高参数量推荐模型训练的场景下,Amazon SageMaker 具有明显的优势:

  • 提供托管的 Training Job 支持。TorchRec 1.0 版本需要 PyTorch 2.5 的训练环境,SageMaker 内置的 Deep Learning Container(DLC)已经提供了对应的版本,用户只需要提供自己的训练代码及依赖文件(requirements.txt)即可快速启动训练作业,无需定制训练环境及管理底层的基础设施。
  • 内置的分布式模型训练支持。SageMaker Training Job 支持通过 API 参数控制训练需要的实例类型及数量,并提供相关的环境变量帮助脚本识别当前的实例信息以初始化集群。
  • 内置的 Elastic Fabric Adapter(EFA)支持。在多机分布式训练任务中,TorchRec 会产生大量的跨机通信,如果实例的跨机带宽无法满足要求则会导致训练效率的下降,甚至出现多机训练效率低于单机的情况。Amazon EFA 通过专门的软硬件优化,实现了非常高效的跨机通信效率。同时,对部分高端 GPU 实例(例如 p4d, p4de, p5) 提供 GPUDirect RDMA 功能支持,进一步提高通信效率。更多 EFA 信息请参考官方文档
  • 多种数据源支持。SageMaker 支持从多个数据源读取训练数据,例如:S3、FSx for Lustre、EFS、local NVMe SSD 等。不同的数据读取方式效率及成本均不同,需要综合衡量选择。比较常用的选项包括:当数据量不大的时候可以从 S3 直接下载数据(File Mode 或者 Fast File Mode)、如果训练数据量较大,需要大量时间下载或者无法在单机上存放完整副本,则可以使用 S3 + FSx for Lustre 的模式,在 S3 上存储完整的数据,并通过 FSx for Lustre 提供高效的数据访问。

通过 SageMaker AI 提交训练作业

我们只需要通过简单的 SageMaker API 即可实现训练作业的提交及执行,相关计算资源会自动被拉起并做好显卡、网卡等驱动初始化,数据自动下载等任务。

这里我们展示了在 SageMaker Notebook 环境上提交作业的方式,示例代码如下:

import os
import sagemaker
import boto3
from sagemaker.pytorch import PyTorch

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

environment = {
    'DATA_S3_PATH': 's3://bucket/path/',  # 训练数据路径,可以通过 s5cmd 提高数据下载效率
    'FI_PROVIDER': "efa",  # 启用 EFA 网卡
    'NCCL_PROTO': 'simple',
    'FI_EFA_USE_DEVICE_RDMA': "1",
    'NCCL_LAUNCH_MODE': "PARALLEL",
    'NCCL_NET_SHARED_COMMS': "0"
}

estimator = PyTorch(
    entry_point="start.py",
    framework_version="2.5.1",  # 使用 PyTorch 2.5.1
    py_version="py311",
    source_dir="./code",  # 代码目录,会被自动打包上传到训练容器中
    hyperparameters=hyperparameters,
    role=role,
    instance_count=2,  # 期望使用的训练实例数量
    instance_type="ml.p4d.24xlarge",
    keep_alive_period_in_seconds=600,  # 启用 SageMaker warmpool
    environment=environment,  # 设置环境变量
    base_job_name="torchrec-1d-bs8k",
    sagemaker_session=sagemaker_session,
    debugger_hook_config=False,
    disable_output_compression=True,
    enable_remote_debug=True,  # 启用远程调试
)

estimator.fit(wait=False)  # 提交训练作业,wait表示不阻塞当前会话
Python

其中:

  • entry_point 代表训练代码的入口函数,可以是shell脚本、python脚本或者其它可执行文件
  • source_dir 代表代码目录,目录中的代码会被自动打包上传到训练容器
  • framework_versionpy_version 用来选择训练容器环境,这里代表使用 Python 3.11 及 PyTorch 2.5.1。
  • environment 表示需要传递给容器的环境变量,例如 FI_PROVIDER: efa 表示启用 EFA 网卡作为跨机通信设备,提供 GPUDirect RMDA 功能,提高通信效率。

模型训练实践经验

在实际运行分布式模型训练作业时,我们还需要考虑数据格式、数据读取效率、适配模型结构、调整模型训练参数、选择合适的实例类型等。

在 TorchRec 上适配模型结构

要使用 TorchRec 加速大规模 Embedding训练,需要遵循以下三个核心步骤:配置 Embedding 参数、使用 DistributedModelParallel 进行分片、构建训练 Pipeline。接下来,我们提供了一个简单的示例:其中使用用户ID和商品ID两个特征表,并基于 TorchRec 内置的 DLRM_DCN 模型来构建训练 pipeline。

  1. 配置 Embedding 参数

Pytorch 通过 torch.nn.Embedding 和 torch.nn.EmbeddingBag 来表示嵌入。EmbeddingBag 是 Embedding 的池化版本,TorchRec 通过创建 Embedding Collection 来扩展这些模块。

这里,我们首先需要使用 EmbeddingBagConfigEmbeddingBagCollection 来配置 Embedding 的基本参数:

Python
from torchrec import EmbeddingBagConfig, EmbeddingBagCollection

# 假设我们有两个特征表:用户ID和商品ID
EMBEDDING_DIM = 16  # embedding维度
USER_NUM = 100000   # 示例用户数量
ITEM_NUM = 50000    # 示例商品数量

# 构建embedding配置字典
eb_configs = {}

# 用户ID的embedding配置
eb_configs["user_embedding"] = EmbeddingBagConfig(
    name="user_embedding",
    embedding_dim=EMBEDDING_DIM,
    num_embeddings=USER_NUM,
    feature_names=["user_id"]
)

# 商品ID的embedding配置
eb_configs["item_embedding"] = EmbeddingBagConfig(
    name="item_embedding",
    embedding_dim=EMBEDDING_DIM,
    num_embeddings=ITEM_NUM,
    feature_names=["item_id"]
)

# 定义 EmbeddingBagCollection
ebc = EmbeddingBagCollection(
    tables=eb_configs
    device=torch.device('meta'),
)
Python
  1. 使用 DistributedModelParallel 进行分片

PyTorch 提供了 DistributedModelParallel API 来帮助用户进行模型分片(Sharding),默认情况下:

  • 只对 embedding 部分进行模型并行(model parallel)
  • 对于 dense 层部分采用数据并行(data parallel)
  • 提供接口支持自定义并行策略
    Python
    # 定义模型参数
    DENSE_IN_FEATURES = 100  # 密集特征输入维度
    DENSE_ARCH_LAYERS = [20, 64]  # 密集层架构
    OVER_ARCH_LAYERS = [5, 1]  # 顶层架构
    DCN_NUM_LAYERS = 2  # DCN层数
    DCN_LOW_RANK_DIM = 8  # DCN低秩维度
    
    # 定义模型结构
    train_model = DLRM_DCN(
        embedding_bag_collection=ebc,
        dense_in_features=DENSE_IN_FEATURES,
        dense_arch_layer_sizes=DENSE_ARCH_LAYERS,
        over_arch_layer_sizes=OVER_ARCH_LAYERS,
        dcn_num_layers=DCN_NUM_LAYERS,
        dcn_low_rank_dim=DCN_LOW_RANK_DIM,
    )
    
    model = DistributedModelParallel(
        module=train_model,
        device=device,
        plan=plan
    )
    
    Python
  1. 构建训练 Pipeline

使用 TrainPipelineSparseDist 来构建训练 pipeline,这个组件负责在训练过程中处理数据流和计算流程。

train_pipeline = TrainPipelineSparseDist(
    model,
    optimizer,
    device
)
Python

自定义数据加载器

在推荐系统中,数据通常具有稀疏特征和高维特征的特点,TorchRec 针对这类数据设计了 JaggedTensor 和 KeyedJaggedTensor 这两类数据结构。原始数据一般都是规则的列表,由于 TorchRec 训练时对数据的读写速度要求比较高,如果训练时才将这些列表转换为 KeyedJaggedTensor 会极大地降低训练速度,因此,有必要先将原始数据处理成方便读取和转换为 KeyedJaggedTensor 的格式。

为此,我们设计了一套数据处理和加载的方法,首先将原始列表转换为二进制文件,在数据加载时,通过定位每个 batch 在二进制文件中的位置按需来读取数据和生成 KeyedJaggedTensor,而不需要将所有数据都加载到内存里。此外,通过引入 length 和 cum_length,我们设计的方法也支持变长特征。所设计的数据处理方法和数据加载器的核心代码分别如下,完整代码请参考文末的代码库链接。

  1. 数据处理核心代码

我们将分类特征、连续特征、标签分别存成不同的二进制文件,对于分类特征,我们还存储 length 和 cum_length 以方便在数据读取时定位每一个 batch 所对应的特征位置。

def process_and_save(df, output_dir):        
    cat_value_columns = []
    cat_length_columns = []
    cont_value_columns = []
    label_columns = []

    for column in df.columns:
        if 'hashed' in column:
            cat_value_columns.append(column)
        elif 'length' in column:
            cat_length_columns.append(column)
        elif column in DEFAULT_INT_NAMES:
            cont_value_columns.append(column)
        elif column in DEFAULT_LABEL_NAMES:
            label_columns.append(column)
            
    cat_values = df[cat_value_columns].to_numpy()
    flattened_cat_values = cat_values.transpose(1, 0).reshape(-1).astype(np.int64)
    cat_values_bytes = flattened_cat_values.tobytes()
    cat_value_save_path = os.path.join(output_dir, 'cat_value.bin')
    with open(cat_value_save_path, 'wb') as f:
        f.write(cat_values_bytes)

    cat_lengths = df[cat_length_columns].to_numpy()
    flattened_cat_lengths = cat_lengths.transpose(1, 0).reshape(-1).astype(np.int32)
    cat_lengths_bytes = flattened_cat_lengths.tobytes()
    cat_lengths_save_path = os.path.join(output_dir, 'cat_length.bin')
    with open(cat_lengths_save_path, 'wb') as f:
        f.write(cat_lengths_bytes)

    flattened_cat_cum_lengths = np.cumsum(flattened_cat_lengths.astype(np.int64))
    cat_cum_lengths_bytes = flattened_cat_cum_lengths.tobytes()
    cat_cum_length_save_path = os.path.join(output_dir, 'cat_cum_length.bin')
    with open(cat_cum_length_save_path, 'wb') as f:
        f.write(cat_cum_lengths_bytes)

    cont_values = df[cont_value_columns].to_numpy()
    flattened_cont_values = cont_values.reshape(-1).astype(np.float32)
    cont_values_bytes = flattened_cont_values.tobytes()
    cont_value_save_path = os.path.join(output_dir, 'numerical.bin')
    with open(cont_value_save_path, 'wb') as f:
        f.write(cont_values_bytes)

    labels = df[label_columns].to_numpy().reshape(-1).astype(np.int32)
    labels_bytes = labels.tobytes()
    labels_save_path = os.path.join(output_dir, 'label.bin')
    with open(labels_save_path, 'wb') as f:
        f.write(labels_bytes)

    return df
Python
  1. 数据加载器核心代码

首先,分别获取分类特征、连续特征、标签的二进制文件标识符;然后,基于 batch idx 去读取对应的数据,由于标签和连续特征都是固定长度,它们可以直接定位每个 batch 的位置,分类特征可以基于 label、length、cum_len 来定位每个 batch 的位置。

class VarParametricDataset(Dataset):  # Var stands for variable length
    def __init__(
        self,
        binary_file_path: str,
        batch_size: int,
        drop_last_batch,
        **kwargs,
    ):
        self._batch_size = batch_size

        # Load categorical
        self._num_categorical_features = len(DEFAULT_CAT_NAMES)
        self._label_file = None
        self._categorical_feature_file = None
        self._categorical_length_file = None
        self._categorical_cum_length_file = None
        
        self._length_bytes_per_batch = np.dtype(np.int32).itemsize * batch_size
        self._cum_length_bytes_per_batch = np.dtype(np.int64).itemsize * batch_size

        self._feature_bytes_per_item = np.dtype(np.int64).itemsize

        path_to_open = os.path.join(binary_file_path, f"cat_value.bin")
        self._categorical_feature_file = os.open(path_to_open, os.O_RDONLY)

        path_to_open = os.path.join(binary_file_path, f"cat_length.bin")
        self._categorical_length_file = os.open(path_to_open, os.O_RDONLY)

        path_to_open = os.path.join(binary_file_path, f"cat_cum_length.bin")
        self._categorical_cum_length_file = os.open(path_to_open, os.O_RDONLY)

        # Load numerical
        bytes_per_feature = {}
        for name in DEFAULT_INT_NAMES:
            bytes_per_feature[name] = np.dtype(np.float32).itemsize

        self._numerical_bytes_per_batch = (
            bytes_per_feature[DEFAULT_INT_NAMES[0]]
            * len(DEFAULT_INT_NAMES)
            * batch_size
        )

        path_to_open = os.path.join(binary_file_path, "numerical.bin")
        self._numerical_features_file = os.open(path_to_open, os.O_RDONLY)
        batch_num_float = (
            os.fstat(self._numerical_features_file).st_size
            / self._numerical_bytes_per_batch
        )

        # Load labels
        self._label_bytes_per_batch = np.dtype(np.int32).itemsize * len(DEFAULT_LABEL_NAMES) * batch_size
        path_to_open = os.path.join(binary_file_path, "label.bin")
        self._label_file = os.open(path_to_open, os.O_RDONLY)

        batch_num_float = (
            os.fstat(self._label_file).st_size / self._label_bytes_per_batch
        )

        # number of bytes per feature for cum_length and length
        self._cum_length_bytes_per_feature = os.fstat(self._categorical_cum_length_file).st_size // self._num_categorical_features
        self._length_bytes_per_feature = os.fstat(self._categorical_length_file).st_size // self._num_categorical_features

        # number of batches means the ALL data for all ranks
        number_of_batches = (
            math.ceil(batch_num_float)
            if not drop_last_batch
            else math.floor(batch_num_float)
        )
        # for this data_loader, we should divide the num_batch by world_size
        self._num_entries = number_of_batches

        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)

    def __len__(self):
        return self._num_entries

    def __getitem__(self, idx: int):
        """Numerical features are returned in the order they appear in the channel spec section
        For performance reasons, this is required to be the order they are saved in, as specified
        by the relevant chunk in source spec.
        Categorical features are returned in the order they appear in the channel spec section
        """

        if idx >= self._num_entries:
            raise IndexError()

        return self._get_item(idx)

    def _get_item(
        self, idx: int
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor]]:
        click = self._get_label(idx)
        numerical_features = self._get_numerical_features(idx)
        categorical_features, lengths = self._get_categorical_features(idx)

        return categorical_features, lengths, numerical_features, click

    def _get_label(self, idx: int) -> torch.Tensor:
        raw_label_data = os.pread(
            self._label_file,
            self._label_bytes_per_batch,
            idx * self._label_bytes_per_batch,
        )
        array = np.frombuffer(raw_label_data, dtype=np.int32)
        all_labels = torch.tensor(array, dtype=torch.int32).view(-1, len(DEFAULT_LABEL_NAMES))
        train_label_ids = [id for id, label in enumerate(DEFAULT_LABEL_NAMES) if label in TRAIN_LABEL_NAMES]
        train_labels = all_labels[:, train_label_ids]
        return train_labels

    def _get_numerical_features(self, idx: int) -> Optional[torch.Tensor]:
        if self._numerical_features_file is None:
            return None

        raw_numerical_data = os.pread(
            self._numerical_features_file,
            self._numerical_bytes_per_batch,
            idx * self._numerical_bytes_per_batch,
        )
        array = np.frombuffer(raw_numerical_data, dtype=np.float32)
        return (
            torch.from_numpy(array).to(torch.float32).view(-1, len(DEFAULT_INT_NAMES))
        )

    def _get_categorical_features(self, idx: int) -> Optional[torch.Tensor]:
        if self._categorical_feature_file is None:
            return None

        categorical_features = []
        lengths = []
        
        for feature_id in range(self._num_categorical_features):
            cum_length_data = os.pread(
                self._categorical_cum_length_file,
                self._cum_length_bytes_per_batch,
                feature_id * self._cum_length_bytes_per_feature + idx * self._cum_length_bytes_per_batch,
            )
            cum_length = np.frombuffer(cum_length_data, dtype=np.int64)

            length_data = os.pread(
                self._categorical_length_file,
                self._length_bytes_per_batch,
                feature_id * self._length_bytes_per_feature + idx * self._length_bytes_per_batch,
            )
            length = np.frombuffer(length_data, dtype=np.int32)

            feature_offset = (cum_length[0]-length[0]) * self._feature_bytes_per_item
            feature_bytes_batch = (cum_length[-1]-(cum_length[0]-length[0])) * self._feature_bytes_per_item

            raw_cat_data = os.pread(self._categorical_feature_file, feature_bytes_batch, feature_offset)
            feature = np.frombuffer(raw_cat_data, dtype=np.int64)

            length = torch.tensor(length, dtype=torch.int32).view(-1)
            lengths.append(length)

            feature = torch.tensor(feature, dtype=torch.int64).view(-1)
            categorical_features.append(feature)
        
        lengths = torch.cat(lengths)
        categorical_features = torch.cat(categorical_features)

        return categorical_features, lengths
Python

模型训练机型信息

在本次实践中,我们对比分析了如下几款机型,使用到的 GPU 卡包括24GB 显存的A10G、48GB 显存的 L40s、40GB 显存的 A100、80G 显存的 H100。这几款机型在 GPU 算力、网络带宽、总显存上均有较大差异,此外,除 A100、H100 机型外,其余均未提供 NVSwitch 及 GPUDirect RMDA 支持,对单机多卡、多机多卡分布式支持效果均比较一般,从后续的实验数据中也可以观察到类似情况。

实例类型 vCPU 实例内存 (GiB) GPU 配置 GPU 内存 (GB) 网络带宽(Gbps GPUDirect RDMA GPU 对等(NVSwitch) 实例存储 (GB) EBS 带宽 (Gbps)
g5.48xlarge 192 768 8 A10G 192 100 ENA 和 EFA 2 个 3800 NVMe SSD 19
g6e.48xlarge 192 1536 8 L40s 384 400 ENA 和 EFA 7600 NVMe SSD 60
p4d.24xlarge 96 1152 8 A100

320

HBM2

400 ENA 和 EFA 600 GB/s 8 个 1000 NVMe SSD 19
p5.48xlarge 192 2048 8 H100 640
HBM3
3200 ENA 和 EFA 900 GB/s 8 个 3.84 NVMe SSD 80

实验数据

在这里,我们对比分析了不同实例类型、不同存储类型、单机/多机分布式配置下的训练性能,并分析了不同机器间带宽对训练性能的影响。数据量说明:在实验中,1 小时数据量约为 1 千 4 百万条记录,占用 300GB 存储;1 天数据量约为 3.5 亿条记录,占用 5TB 存储。实验数据如下:

单机性能对比

这里我们对比了不同 GPU 配置下的性能表现,在测试中,我们均采用的是机器自带的本地 NVMe SSD 硬盘,提供了较快的读取速度:

实例及数量 文件系统 数据量 batch(每卡) workers 预取因子 总耗时(分钟)
g5.48x * 1 NVMe 1天 8192 8 8 168
g6e.48x * 1 NVMe 1天 8192 8 8 90
p4d.24x * 1 NVMe 1天 8192 8 8 12.6
p4d.24x * 1 NVMe 1天 8192 4 8 12.5
p5.48x * 1 NVMe 1天 8192 4 4 11.9

可以看到在单机情况下训练速度 p5 ≈ p4 >> g6e > g5 机型,分析如下:

  • g5 中的显卡型号为 24GB 显存的 A10g,在单卡算力上较其它机型差异较大,因此耗时也是最久。
  • 对比 g6e 中的 48GB 显存 L40s 显卡与 p4d 中的 40GB 显存 A100 显卡,虽然 L40s 单卡性能参数(例如 FP32、FP16)均超越了 A100,但由于其缺少高速 NVLink 链路,因而在需要密集卡间通信的分布式多卡场景下速度上大打折扣,更适合单机的训练及推理任务。
  • 虽然 p5 中的 80GB H100 显卡算力比 A100 更强,显存也更高,且都支持 900 Gbps 节点内卡间带宽,但速度上却并未有太多提升,我们分析由于 TorchRec 的流水线并行度还有待提升,导致训练效率在单机上无法进一步提升太多,但多机性能 p5 有更好表现。

不同存储方案性能对比

为了量化分析本地 NVMe SSD 设备、FSx for Lustre 两种存储的性能对训练时间长度的影响,我们做了如下实验:

  • 通过使用单台 p4d.24xlarge 机器,分别从不同的数据源读取 1 小时的训练数据(约 3 亿条数据, 300GB 存储)
  • 分析 FSx for Lustre 中不同条带(Strip)配置对训练时间的影响,设置两组实验,条带数分别为 10、20,分别对应 12TB 及 24TB 的预申请存储空间。
实例及数量 文件系统 数据量 batch(每卡) workers 预取因子 总耗时(分钟) 备注
p4d.24x * 1 NVMe 1小时 8192 8 8 0.73
p4d.24x * 1 FSx 1小时 8192 8 8 2.5 Strip = 10
p4d.24x * 1 FSx 1小时 8192 8 8 1.9 Strip = 20
p4d.24x * 2 FSx 1小时 8192 8 8 1.08 Strip = 20

在单机模式下,我们可以看到无疑本地 NVMe SSD 可以提供更快的数据读取效率,但随着条带数量的增加,FSx for Lustre 读取性能也会随之提高,同时,在多机下训练速度也有更为明显的提升。

此外,我们还需要考虑到:1)总的训练时间也包括数据下载时间,但 FSx for Lustre 提供了共享存储,可以省去数据预先下载的时间。2)单机的存储空间有限,如果需要运行更大数据规模的训练任务,单机则无法存放完整的数据,需要使用 FSx for Lustre 这种共享存储或者修改 DataLoader 的数据加载逻辑。3)如果原始数据存放在 S3 上,我们建议使用 s5cmd 来加速文件从 S3 的下载过程,实测 5TB 数据需要约 20 分钟完成下载到本地 NVMe SSD。

多机性能对比

由于 g5 的单卡性能太弱,因此,我们仅针对 g6e, p4d, p5 机型进行多机实验。实验数据如下:

实例及数量 文件系统 数据量 batch(每卡) workers 预取因子 总耗时(分钟) 备注
g6e.48x * 1 NVMe 1天 8192 8 8 90.1
g6e.48x * 2 NVMe 1天 8192 8 8 100.2
p4d.24x * 1 NVMe 1天 8192 4 8 12.5
p4d.24x * 2 NVMe 1天 8192 4 8 33 未启用 EFA
p4d.24x * 2 NVMe 1天 8192 8 8 12
p5.48x * 1 NVMe 1天 8192 4 4 11.9
p5.48x * 2 NVMe 1天 8192 4 8 4.9

可以看到:

  • g6e 在多机情况下训练速度反而有下降,这主要是受到 GPU 卡之间通信带宽不足的拖累;
  • p5 多机训练效率明显高于单机,具有显著的加速比,因为其提供了高达2 Tbps 的跨机通信带宽,从硬件层面保证了通信效率;
  • p4d 在不启用 EFA 的情况下,训练速度同样下降明显、p4d 在启用 EFA 的情况下,速度和单节点接近,分析同样是受限于机器间的通信带宽。这是因为 TorchRec 分布式训练对通信需求很高,不使用 EFA 会导致跨机器通信效率下降,我们会通过下面的机器间网络流量分析来做进一步的定量分析。

此外,我们也尝试了使用不同的 Embedding 分片数量,包括双机 8 分片,双机 16 分片,测试结果表明这两种配置项对训练速度没有明显的影响。

机器间网络流量分析

为了量化评估多机之间的网络通信量,我们在 p5.48xlarge 上通过读取 EFA 设备的监控数据,绘制如下统计图:

其中,每 10s 中统计采样一个数据点,并计算此时间段内的平均速率。可以看到,流经 EFA 设备的通信带宽约为 1.3 Tbps,约为 p5.48xlarge 机器间带宽(3.2 Tbps)的 40%,但远超过 p4d.24xlarge 设备所提供的最大带宽 400 Gbps。

总结

实践经验总结:

  • 数据格式及文件大小:我们建议使用更为紧凑的二进制文件,同时使用合适的文件大小以提升数据读取效率;
  • 存储:本地 NVMe SSD 可以提供较好的数据访问速度及低延迟,但也要考虑数据下载时间及容量限制。如果原始数据存放在 S3 上,我们建议使用 s5cmd 来加速文件从 S3 的下载过程。如果本地无法存放完整数据,则需要考虑共享存储、或者定制 DataLoader。
  • 机型:在当前的软件版本下,如果追求训练速度,则可以优先选择多台 p5.48xlarge 进行训练,其它情况可以考虑使用 p4d.24xlarge。你也需要根据自己的数据量及情况选择合适的机型。

其它注意事项:

  • 分布式训练时,数据读取的顺序要保证一致性,否则可能会出现多机器与单机训练指标不一致的情况。
  • 启用 EFA 时,如果选择启用 VPC 网络,则安全组需要放行进/出两个方向的流量,源、目标为当前安全组 id 的流量;否则,不需要额外配置安全组信息。
  • 使用 FSx for Lustre 时,在拓展存储空间后,条带数量不会发生变化,需要通过 lfs migrate 命令或者复制文件来重新平衡条带。

后续优化方向:

  • 使用多台机器进行训练的时候,当前的 Embedding 分片策略会产生很大的跨机通信,需要考虑更优的 Embedding 分片及缓存策略降低跨机之间的通信量,以提高分布式训练的效率及资源利用率。
  • 我们看到 TorchRec 目前已经支持了 fp8 精度的训练及推理,后续可以验证下此精度下的训练速度及效果影响。
  • 优化不同实验中的参数项,例如更加深入地分析不同 batch size、worker、prefetch 配置下,对性能、训练效率的影响。

完整示例代码:https://github.com/Chevolier/torchrec-lab


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

本篇作者

张闯

亚马逊云科技应用科学家,长期从事生成式 AI、自然语言处理、计算机视觉等领域的研究和开发工作。支持 GenAI 实验室项目,在大语言模型、多模态模型、强化学习、智能客服、内容安全等方向有丰富的算法开发以及落地实践经验。

陈斌

亚马逊云科技生成式 AI 产品技术专家,负责基于亚马逊云科技的生成式 AI 产品解决方案的咨询和设计。

王鹤男

亚马逊云科技资深应用科学家,负责生成式 AI 实验室,在生成式 AI 领域有丰富的实践经验,对于大语言模型、文生图模型、多模态模型等都有研究和应用,熟悉计算机视觉、自然语言处理、传统机器学习模型等领域,领导了首汽约车语音降噪、LiveMe 直播场景反欺诈等项目,为企业客户提供云上的人工智能和机器学习赋能。曾任汉迪推荐算法工程师,神州优车集团人工智能实验室负责人等职位。