亚马逊AWS官方博客

在 Amazon SageMaker 管道模式下使用 Horovod 实现多 GPU 分布式训练

Original URL: https://aws.amazon.com/cn/blogs/machine-learning/multi-gpu-and-distributed-training-using-horovod-in-amazon-sagemaker-pipe-mode/

 

当前,我们可以使用多种技术通过少量数据训练出深度学习模型,具体包括针对图像分类任务的迁移学习、少样本学习甚至是一次性学习等,也可以基于预训练的BERT或GPT2模型对语言模型进行微调。但是,在部分应用用例中我们仍然需要引入大量训练数据。例如,如果当前图像与ImageNet数据集内的图像完全不同,或者当前语言语料库只针对特定领域、而非通用类型,那么单凭迁移学习将很难带来理想的模型性能。作为深度学习研究人员,您可能需要从零开始尝试新的思路或方法。在这种情况下,我们必须使用大型数据集训练出大型深度学习模型;在找不到最佳训练方法的情况下,整个过程可能需要几天、几周甚至是几个月。

在本文中,我们将一同了解如何在Amazon SageMaker的单一实例之上运行多GPU训练,并讨论如何在Amazon SageMaker上实现高效多GPU与多节点分布式训练。

Horovod基础知识

在使用大量数据进行模型训练时,最好是将训练作业分配给多个GPU(单一实例或者多个实例)。深度学习框架提供内置方法以支持多GPU训练或分布式训练。但除此之外,还有另外一种实现方法,即直接使用分布式深度学习框架(例如Horovod)。Horovod是Uber公司打造的分布式深度学习开源框架,能够与TensorFlow、Keras、PyTorch以及Apache MXNet等一线热门深度学习工具包协同使用。Horovod使用all-reduce算法取代以往的参数服务器方法进行快速分布式训练,其中还提供多种优化方法以进一步加快分布式训练的执行速度。关于更多详细信息,请参阅遇见Horovod:面向TensorFlow的Uber开源分布式深度学习框架

为Horovod准备数据

在使用Horovod执行训练作业时,Horovod会为其集群当中的每个GPU上的工作节点启动独立的进程(每个GPU对应一个工作节点)。例如,如果您使用一个包含4 GPU的训练实例(一台Amazon SageMaker ml.p3.8xlarge或 Amazon Elastic Compute Cloud (Amazon EC2) p3.8xlarge实例)运行Horovod训练作业,则将对应启动4个工作进程。数据集本体已经出于数据并行性的需求而被拆分为多个分片,所有这4个工作节点都将分别读取自己的数据集分片。如果有40000个训练样本,则每个工作节点将获得10000个互不重复的训练样本。如果您使用Horovod进行分布式训练甚至是多GPU训练,则应事先做好数据分片准备,并指引工作节点从文件系统中读取各个分片。(某些深度学习框架可以自动执行此操作,例如PyTorch的DataParallel与DistributedDataParallel)

下图所示,为进行分片存储的两种可行架构。

您可以通过多种不同方式为Amazon SageMaker训练作业提供数据集。一种最典型的方法就是将所有数据集存储在 Amazon Simple Storage Service (Amazon S3)存储桶内,并在需要时进行访问。大家当然可以使用共享文件系统(例如 Amazon FSx for Lustre 或 Amazon Elastic File System ,简称Amazon EFS)实现数据存储,但通过Amazon SageMaker内置的两种输入模式(文件模式与管道模式)直接从Amazon S3中检索数据能够避免系统产生额外的服务成本。

在文件模式下,当Amazon SageMaker启动训练作业后,数据集将从指定的S3存储桶被传送至训练实例当中,并将其放置在某个特定目录之内。但如果您使用的数据集极为庞大,那么将对象从存储桶复制至训练实例的存储上往往需要耗费很长时间,而且直到数据传输完成,您的训练作业才会真正开始。这会在某些情况下拖慢机器学习(ML)的执行流程,甚至影响到创新或研究工作的项目进度。

另外,大家也可以通过管道模式直接访问存储在Amazon S3中的数据集。管道模式在训练实例与S3存储桶之间创建直接输入管道,并允许训练进程直接访问对象,这就消除了在训练开始之前将所有对象复制至训练实例中的工作。要对给定Amazon S3 URI中的数据集以管道模式加以访问,请在创建Amazon SageMaker Estimator时将输入模式设置为 Pipe ,具体参见以下代码:

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='train.py',
                          role='SageMakerRole',
                          train_instance_type='ml.p3.2xlarge',
                          train_instance_count=2,
                          framework_version='2.1.0',
                          py_version='py3',
                          input_mode='Pipe')

在管道模式下,训练数据将作为FIFO流的形式进行交付。TensorFlow扩展的dataset类极大降低了访问流数据集的难度。关于管道模式与TensorFlow的更多详细信息,请参阅在Amazon SageMaker上使用高速管道模式加快模型训练,以及Amazon SageMaker TensorFlow扩展 GitHub repo。

配合Horovod使用管道模式

当您配合Horovod使用管道模式执行单机多卡或者多机多卡的分布式训练时,有一点需要特别注意。下图所示为这类场景的基本架构。

管道模式将数据从Amazon S3流式的传送到训练实例当中的Unix命名管道/FIFOs当中。一个FIFO文件仅支持一对写入/读取程序,且每轮训练周期内我们只能为一条通道创建一个FIFO文件。通常,人们会为训练数据集定义一条通道,并为验证或测试数据集定义另一条单独的通道,而后将这些输入通道作为Amazon SageMaker Estimator中 fit() 函数的参数传递至训练作业。详见以下代码:

from sagemaker.session import s3_input

input_channel = {'train': s3_input('s3://your-bucket-name/train-dataset/')}

tf_estimator.fit(inputs=input_channel)     

这种方式在Horovod多GPU训练场景下又会造成怎样的影响?简而言之,使用Horovod在多GPU训练作业中启动的各个进程,会相互争用单一FIFO,导致多个进程无法同时访问这些FIFO。而且由于同一时间内只有单一工作进程能够访问FIFO,且在完成训练作业之前不会释放句柄,就导致所有其他工作进程无法从该FIFO中读取数据,最终令训练作业陷入死锁式的无限循环。如果您看到类似于以下形式的重复提示消息,则表明您遇到了这样的问题:

[1,0]<stderr>:Stalled ranks:
[1,0]<stderr>:0: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]
[1,0]<stderr>:2: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]
[1,0]<stderr>:3: [training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_11_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_12_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_14_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_15_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_18_0, training/Adam/DistributedAdam_Allreduce/HorovodAllreduce_training_Adam_gradients_AddN_19_0 ...]

您可以对S3存储桶中的数据集进行分片,且数量与用于训练作业的GPU数量相对应。如果您拥有4000个TensorFlow记录文件,且使用一台带有4 GPU的ml.p3.8xlarge实例进行模型训练,则可以为互不重复的1000个TensorFLow记录文件设定不同的前缀,如以下代码所示:

s3://your-bucket-name/train/0/
s3://your-bucket-name/train/1/
s3://your-bucket-name/train/2/
s3://your-bucket-name/train/3/

使用SharedByS3Key 作为Amazon S3数据类型分配方式进行的数据集分片方法,并不完全适用于Horovod。 这是因为在使用SharedByS3Key 时,分片只会以实例为单位、而非以工作进程为单位进行,且实例中的工作进程与GPU的数量保持一致。同样的,各个实例仍然只拥有一条输入通道。因此,大家需要将数据集的分片数量,设定为与Horovod集群内GPU数相同。

接下来,我们需要为Amazon SageMaker训练定义四条输入通道,具体参见以下代码:

from sagemaker.session import s3_input

shuffle_config = sagemaker.session.ShuffleConfig(234)

train_s3_uri_prefix = 's3://your-bucket-name/train'
input_channels = {}

for idx in range(4):
    train_s3_uri = f'{train_s3_uri_prefix}/train/{idx}/'
    train_s3_input = s3_input(train_s3_uri, shuffle_config=shuffle_config)
    input_channels[f'train_{idx}'] = train_s3_input

ShuffleConfig 将确保根据每个训练轮次,对Amazon S3前缀下各文件的使用顺序进行随机分配。关于更多详细信息,请参阅ShuffleConfig

在Amazon SageMaker Estimator上调用fit方法时,请使用以下通道定义:

tf_estimator.fit(input_channels)

对于验证及测试类任务,我们只需在单一工作进程上运行(通常使用主工作进程或Rank 0工作进程)。在这里,我们不需要设置多条验证或测试通道。但如果您使用 tf.keras.model.fit() 函数进行训练,则训练会在只有一个Horovod工作进程进行验证时停止(关于更多详细信息,请参阅Horovod GitHub repo上的 issue #600)。如果需要使用 tf.keras.model.fit() 进行验证,大家还应为各验证数据集提供对应的输入通道(类似于训练输入通道)。请注意,截至2020年7月,管道模式下训练作业的输入通道总数上限为20个。具体请参见以下代码:

validation_s3_uri = 's3://your-bucket-name/validation/'

for idx in range(4):
    validation_s3_input = s3_input(validation_s3_uri)
    input_channels[f'validation_{idx}'] = validation_s3_input
    
eval_s3_uri = 's3://your-bucket-name/eval/'
eval_s3_input = s3_input(eval_s3_uri)
input_channels['eval'] = eval_s3_input

相较于直接使用S3存储桶前缀,我们在这里可以使用包含有对象键列表的普通ManifestFile。关于更多详细信息,请参阅输入数据

在训练代码中使用数据通道

在训练脚本中,我们需要强制要求各个Horovod工作进程只访问属于它自己的数据集分片,确保两个工作进程不会访问同一输入通道。在本文的用例中,我们将使用从0开始的索引定义各输入通道名称。为此,我们可以使用hvd.rank() 函数,由其为当前工作进程在集群范围之内提供唯一的排名索引,且排名同样从0开始(请参考以下代码中的第13行)。在本文示例中,我们使用Amazon SageMaker TensorFlow扩展 PipeModeDataset。对于其他深度学习框架,请在每个训练轮次中从名为 /opt/ml/input/data/[channel_name]_${epoch} 的FIFO文件中读取数据。关于更多示例,请参见 GitHub repo

 1: from sagemaker_tensorflow import PipeModeDataset
 2: 
 3: features = {'data': tf.FixedLenFeature([], tf.string),
 4:             'labels': tf.FixedLenFeature([], tf.int64)}
 5:
 6: def parse(record):
 7:     parsed = tf.parse_single_example(record, features)
 8:     return ({
 9:         'data': tf.decode_raw(parsed['data'], tf.float64)
10:    }, parsed['labels'])
11:
12: # For Horovod and Pipe mode, use the input channel allocated to this worker using rank information
13: channel_name = 'train_{}'.format(hvd.rank())
14:
15: ds = PipeModeDataset(channel=channel_name, record_format='TFRecord')
16: ds = ds.map(parse)
17: ds = ds.batch(64)
18: ds = ds.prefetch(10)

在包含一个或多个实例的Horovod集群中,排名分配方式为从0开始,至GPU数量-1结束。只要正确定义了输入通道的名称并从0开始使用索引,我们就不必分神管理各实例或名位的排列顺序。

使用Tensorboard进行监控

在对训练进程加以灵活监控方面,我们可以在每个训练轮次结束时首先将日志上传至S3存储桶,再通过任意远程计算实例调用Tensorboard。为此,我们需要创建一项回调以将本地日志推送至S3存储桶路径,此路径仅限于运行在Horovod上的主(Rank 0)计算节点。具体请参见以下代码:

class Sync2S3(tf.keras.callbacks.Callback):
    def __init__(self, logdir, s3logdir):
        super(Sync2S3, self).__init__()
        self.logdir = logdir
        self.s3logdir = s3logdir
    
    def on_epoch_end(self, batch, logs={}):
        os.system('aws s3 sync '+self.logdir+' '+self.s3logdir)

...

if hvd.rank() == 0:
    logdir = args.output_data_dir + '/' + datetime.now().strftime("%Y%m%d-%H%M%S")
    callbacks.append(TensorBoard(log_dir=logdir))
    callbacks.append(Sync2S3(logdir=logdir, s3logdir=tensorboard_logs))

通过将训练日志存储在S3存储桶内,大家可以在任意服务器上运行Tensorboard,包括EC2实例、Amazon SgaeMaker notebook实例甚至是本地计算机,并为该Tensorboard托管服务器提供访问Amazon S3日志对象的权限。为了支持从Amazon S3源处直接提取日志数据,您的Tensorboard必须为1.14.0或者更高版本。以下命令行使用的是位于us-east-1区域内S3存储桶上的日志记录:

S3_REGION=us-east-1
tensorboard --logdir s3://{bucket_name}/tensorboard_logs/

如果您是在Amazon SageMaker notebook实例上运行以上命令,则可通过 https://<SageMaker-notebook-instance-name>.notebook.<notebook-region>.sagemaker.aws/proxy/6006/完成访问。

资源清理

在完成本文中分布式训练作业之后,请清理相应资源以避免其后续产生额外费用,包括S3存储桶、FSx for Lustre以及各个Amazon SageMaker实例。

总结

在Amazon SageMaker上以管道模式使用Horovod的多GPU或分布式训练方法,能够为数据集的各个分片创建独立的训练通道并在数据通道内访问对应分片,借此实现大规模模型训练。这种方式能够缩短在实际训练开始之前将数据集传输至训练实例所占用的时间,因此特别适用于具有大规模训练数据集的Amazon SageMaker训练场景。

关于在Amazon SageMaker上运行的完整训练示例(管道模式加Horovod),请参阅GitHub repo

 

本篇作者

Muhyun Kim

Amazon机器学习解决方案实验室数据科学家。他运用机器学习与深度学习技术帮助客户解决各类业务问题,同时帮助他们提升相关技能水平。

Jiyang Kang

Amazon机器学习解决方案实验室深度学习架构师。凭借在AWS上为全球企业设计工作负载的丰富经验,他目前专门负责为客户的新型业务问题设计并实施机器学习解决方案。

>

Hussain Karimi

Amazon机器学习解决方案实验室数据科学家。他与各行各业的客户开展合作,设计并构建起能够产生实际商业价值的自动化算法模型。