亚马逊AWS官方博客

Amazon SageMaker 增加批量转换功能和适用于 TensorFlow 容器的管道输入模式

在几天前的纽约峰会期间,我们推出了两个新的 Amazon SageMaker 功能:一是批量转换功能,这是一种新的批量推断功能,客户可以通过它对 PB 级的数据进行非实时场景预测;二是适用于 TensorFlow 容器的管道输入模式。SageMaker 依然是我们最受欢迎的服务之一,此博客机器学习博客都对它进行了非常广泛的介绍。事实上,要赶上 SageMaker 团队快速的创新步伐是一件较为困难的事情。自上一篇有关 SageMaker 自动模型调整和超参数优化功能的博客发布以来,该团队已经推出了 4 种新的内置算法和许多的新功能。下面我们来看新推出的批量转换功能。

批量转换

批量转换功能是一种高性能、高吞吐量的数据转换和推断生成方法。它非常适合处理大批量数据、不需要亚秒级延迟或需要同时预处理和转换训练数据的场景。最大的好处?您无需编写任何额外的代码即可使用此功能。您可以使用所有现有的模型,并根据这些模型开始批量转换作业。此功能不加收任何费用,您只需为底层资源付费。

下面我们来看如何将此功能用于内置的对象检测算法。我将利用示例笔记本来训练对象检测模型。现在我将打开 SageMaker 控制台,然后打开 Batch Transform(批量转换)子控制台。

我可以在这里启动新的批量转换作业。

我可以在此为我的转换作业命名,选择我希望使用的模型,以及要使用的实例数量和类型。此外,我可以配置同时向我的推断发送的记录数量以及负载的大小。如果我未手动指定这些参数,则 SageMaker 将选择一些合理的默认值。

然后我需要指定输入位置。我可以使用清单文件或直接将所有文件加载到某个 S3 位置。由于我使用的是映像,我已经手动指定了我的输入内容类型。

最后,我将配置输出位置并启动作业!

一旦作业开始运行,我可以打开作业详细信息页面,点击链接查看 Amazon CloudWatch 中的指标和日志。

我可以看到作业正在运行,如果要看 S3 中的结果,我可以看到每个映像的预测标签。

转换结果将按每个输入文件生成一个输出 JSON 文件,其中包含检测到的对象。

这时可以非常轻松地为 AWS Glue 中的存储桶创建表,以及使用 Amazon Athena 查询结果或使用 Amazon QuickSight 将结果可视化

当然还可以通过 SageMaker API 以编程方式启动这些作业。

有关如何在您自己的容器中使用批量转换功能的更多详细信息,请参阅文档

适用于 Tensorflow 的管道输入模式

借助管道输入模式,客户可以使用高度优化的多线程后台进程,直接以流的方式将训练数据集从 Amazon Simple Storage Service (S3) 传入 Amazon SageMaker。与文件输入模式相比,这种模式极大地提高了读取吞吐量,因为后者必须首先将数据下载到本地 Amazon Elastic Block Store (EBS) 卷。这意味着您的训练作业可以更快启动,更快完成,使用的磁盘空间更少,与模型训练有关的费用也更低。此外,它还可让您训练超过 16 TB EBS 卷容量限制的数据集。

今天初,我们对管道输入模式进行了一些试验,发现 78 GB 数据集的启动时间最高减少了 87%,吞吐量是一些对比场景的两倍,最终总训练时间减少了 35%。

通过增加对适用于 TensorFlow 的管道输入模式的支持,进一步方便客户利用内置算法更快的速度优势。下面我们来看实际操作。

首先,我需要确保我的训练作业使用 sagemaker-tensorflow-扩展名。这将为我们提供新的 PipeModeDataset 类,它以通道和记录格式为输入,并返回一个 TensorFlow 数据集。我们可以将它用于 TensorFlow 估算器的 input_fn ,并从通道读取。下面的示例代码是一个简单的示例。

from sagemaker_tensorflow import PipeModeDataset

def input_fn(channel):
    # Simple example data - a labeled vector.
    features = {
        'data': tf.FixedLenFeature([], tf.string),
        'labels': tf.FixedLenFeature([], tf.int64),
    }
    
    # A function to parse record bytes to a labeled vector record
    def parse(record):
        parsed = tf.parse_single_example(record, features)
        return ({
            'data': tf.decode_raw(parsed['data'], tf.float64)
        }, parsed['labels'])

    # Construct a PipeModeDataset reading from a 'training' channel, using
    # the TF Record encoding.
    ds = PipeModeDataset(channel=channel, record_format='TFRecord')

    # The PipeModeDataset is a TensorFlow Dataset and provides standard Dataset methods
    ds = ds.repeat(20)
    ds = ds.prefetch(10)
    ds = ds.map(parse, num_parallel_calls=10)
    ds = ds.batch(64)
    
    return ds

然后,您可以按照与正常 TensorFlow 估算器相同的方式定义模型。对于估算器的创建时间,您只需确保 input_mode='Pipe' 在参数中即可。

现已推出

这两项新功能都已免费推出,我期待看到客户利用新的批量转换功能创造的好东西。我现在就可以告诉您的是,它可帮助我们处理 AWS 营销部的一些内部 ML 工作负载

同样,请在评论区或 Twitter 上发表您对这项功能的看法!

Randall