亚马逊AWS官方博客

在 Amazon SageMaker 上训练 TensorFlow 变长特征模型

需求&背景

在推荐系统和广告系统领域,处理变长特征序列一直是一个重要且具有挑战性的问题。特别是在 DSP(Demand-Side Platform)广告系统中,精排序(Re-ranking)模型需要同时处理用户的历史行为序列、广告的历史展示序列等多种变长特征,这些特征的处理质量直接影响模型的效果。

目前业界在使用 TensorFlow 处理变长特征时面临以下几个主要痛点:

  • 数据表示的局限性
    • 稀疏特征的存储效率低下
    • 序列长度不一致导致的计算资源浪费
  • 工程实现的复杂性
    • TensorFlow 针对变长序列特征的支持比较复杂
    • 自定义算子开发成本高
    • 实现比较复杂,公开的 Internet 网络上找不到可以直接参考的代码
    • 分布式训练场景下海量的特征数据的预处理需要优化

基于以上技术挑战,本文提出并实践了在 Amazon SageMaker 上实现 TensorFlow 变长特征处理及模型训练的解决方案,包括:

  • 数据处理
    • 特征维度规模达到数百量级的多维特征处理
    • 将变长序列的特征处理为定长序列的特征,或者仍然保持为变长序列的特征
    • pyspark 分布式集群特征处理
  • 模型训练
    • 实现基于 padding 和序列长度截断的定长序列的训练
    • 实现保持原始长度的变长序列训练方案
    • CPU + parameterserver Strategy 的分布式训练

解决方案及对比

以下我们详细介绍在 TensorFlow 2.x 框架下基于 padding 和截断的定长序列特征和保持变长序列特征的两种训练方案,这两种方案各有特色,分别适用于不同的应用场景,旨在建立一个高效、可扩展的变长特征处理框架,为推荐和广告系统的模型训练提供更好的支持。

方案 1. tf.feature_column + tf.keras API(变长序列特征截断且 padding 为定长序列特征)

  • 核心特点:序列长度截断 + padding + 每个特征使用单独的 embedding table。
  • 数据处理:通过 Sagemaker processing job 或 EMR Spark 集群将 ORC 格式转换为 TFRecord 格式,并进行序列截断和填充。
  • 优势:该方案实现相对简单;且有很多客户也是使用的类似的方案。

方案 2. tf.keras.layers processing + tf.keras API(保持变长特征序列)

  • 核心特点:变长序列处理 + 特征共享 Embedding 表。
  • 数据处理:通过 SageMaker processing job 或 EMR Spark 集群将 ORC 格式转换为 TFRecord 格式。
  • 优势:不需要额外的特征处理的工作,与现有推荐系统上下游更好地集成。

方案对比分析

为了帮助工程师更直观地理解两种方案的差异,我们从多个维度进行了详细对比:

对比维度 方案 1 方案 2
1 实现复杂度 低,开发周期短 高,需要更多自主开发
2 参考资料 丰富,示例代码多 较少,参考案例缺乏
3 生产验证 充分,经验丰富 需要更多验证
4 序列处理 定长,需要截断和填充 变长,无需特殊处理
5 信息完整性 可能损失信息 信息保留完整
6 数据存储 填充导致数据膨胀 存储效率高
7 Embedding 特性 特征独立 Embedding 支持 Embedding 共享
8 资源消耗 数据处理开销大 处理开销小

在实际的项目中,可以根据业务场景和不同的考虑因素,如序列长度特征数量,计算资源约束,开发周期要求,模型效果等综合考虑。比如如果对模型效果要求较高,且计算资源充足,推荐采用方案 2;如果开发周期紧张,且能够接受序列截断带来的有限模型效果影响,可以考虑方案 1。

以下章节我们将分别详细介绍两种方案落地实施的具体技术实现方法。

实施落地的挑战及对应解决方法

在将理论方案转化为实际可落地的解决方案时,我们遇到了许多个技术挑战。这些挑战主要集中在特征数据处理、TensorFlow 变长特征模型训练,分布式处理等方面,本章将详细介绍这些挑战及其解决方案。

在介绍详细方案实施前,我们先介绍下特征数据结构和模型的网络结构。

训练模型示例

我们使用一个简单的 DNN 神经网络来 demo 变长特征的训练模型,包括如下结构

  • SimpleChannel 层:
    • 使用了一个简单的全连接网络(一个 Dense 层)。
    • 第一个 Dense 层使用 ReLU 激活函数。
    • 返回四个值,但 s1, s2, s3 只是简单的占位符,没有实际的计算意义。
  • SimpleEmbedding 层:
    • 简单的 Embedding Dense 示例,只包含一个 Dense 层用于嵌入处理。

模型示例代码如下:

##使用了一个简单的全连接网络(两个Dense层),第一个Dense层使用ReLU激活函数。
##返回四个值,但s1, s2, s3只是简单的占位符,没有实际的计算意义。
class SimpleChannel(tf.keras.layers.Layer):
    def __init__(self, in_size, out_size):
        super().__init__()
        self.in_size = in_size
        self.fl = int(in_size / 4)
        self.out_size = out_size
        
    def build(self, input_shape):
        self.dense = tf.keras.layers.Dense(self.out_size, activation='relu')
        super().build(input_shape)
        
    def call(self, inputs, i1, i2, i3):
        # 简化处理,直接使用一个Dense层
        x = self.dense(inputs)
        
        # 为了保持输出格式一致,我们仍然返回四个值
        s1 = tf.reduce_sum(x[:, :self.out_size//3], axis=1, keepdims=True) + i1
        s2 = tf.reduce_sum(x[:, self.out_size//3:2*self.out_size//3], axis=1, keepdims=True) + i2
        s3 = tf.reduce_sum(x[:, 2*self.out_size//3:], axis=1, keepdims=True) + i3
        
        return x, s1, s2, s3

#简单的embedding dense 示例,只包含一个 Dense 层用于嵌入处理
class SimpleEmbedding(tf.keras.layers.Layer):
    def __init__(self, out_size, emb_size):
        super().__init__()
        self.out_size = out_size
        self.emb_size = emb_size
        
    def build(self, input_shape):
        self.dense = tf.keras.layers.Dense(self.out_size * self.emb_size, activation='relu')
        super().build(input_shape)

    def call(self, inputs):
        # 简化embedding处理,直接使用一个Dense层
        return self.dense(inputs)
PowerShell

特征数据结构

根据以往广告媒体类客户项目中遇到的变长特征情况,我们构造了 dummy 的特征数据,具体格式如下:

  • 一共 300+特征列,一个 label 字段列。
  • 如果某一列是单值特征,字符串通过\003 可分成两部分,第⼀部分是根据明⽂特征计算出的 64 位⽆符号整型哈希值;第二部分是哈希值对应的 weight。
  • 如果某一列是多值特征,字符串通过\001 分成多个单值特征,每个单值特征的构成与上述描述⼀致。
  • label 字段列,共有两个取值:0 代表负样本;1 代表正样本。
  • 一个 label 字段列(目标特征字段),共有两个取值:0 代表负样本 ,1 代表正样本。
  • 变长特征示例 :
    特征列名 1(单值特征):_11001
    值:12249734119335549443\0030.6931471
    特征列名 2(多值特征):_12001
    值:13237249145391275771\0030.693147\00118246459961346429377\0030.693147
    ...省略
    label 列名:label
    值:0
    PowerShell

方案 1. 变长序列特征截断且 padding 为定长序列特征

变长特征 padding 处理

该方案使用 TF 的 feature column API 在模型训练时传入特征训练数据,该 API 只能接收定长序列的特征字段类型。因此需要额外的处理和变换来将原始的变长特征序列进行变换,统一转成固定长度的特征序列。并且由于原始特征字段不定长,在转换时我们需要考虑填充到多大的固定的长度,如何填充的问题。

生产环境下通常特征数据量巨大,比如 TB 级别海量的原始 orc 压缩文件,因此我们使用一个 SageMaker pyspark 的 processing job 来实现具体的定长特征转换的工作。

SageMaker Processing Job 提供了强大的分布式处理能力,能够高效地并行处理大规模数据集特别是 TB 级别的海量特征数据。它能够自动配置和管理 Spark 集群,实现按需弹性扩展,避免了手动维护集群的复杂性。通过集成 Apache Spark 和 MLlib,可以轻松使用各种内置的特征工程和机器学习算子。同时与 SageMaker 的其他组件无缝集成,简化了从数据处理到模型部署的端到端机器学习工作流程。感兴趣的小伙伴可以参阅附录中的 SageMaker Processing Job 资料。

针对特征填充长度和填充 weigth 权重值方法的问题,我们在具体转换 job 之前,先分别运行统计特征基数和最大序列长度的 pyspark job,从而获取每个特征列所有唯一值和最大变长值的序列长度,具体如下:

spark = SparkSession.builder.appName("FeatureColumnStats").getOrCreate()
# 读取数据
df = spark.read.format("orc") \
    .option("compression", "snappy") \
    .option("recursiveFileLookup", "true") \
    .load("s3://sagemaker-us-west-2-687912291502/mv-poc/raw/")

feature_columns = [col for col in df.columns if col.startswith('_')]
# 对每个特征列lambda调用,统计最大序列长度
def process_column_lenth(dataframe, column_name):
    return dataframe.agg(
        lit(column_name).alias("column_name"),
        max(
            when(
                (col(column_name).isNull()) | (col(column_name) == ""),
                0
            ).otherwise(
                size(split(col(column_name), '\x01'))
            )
        ).alias("max_sequence_length")
    )
# 对每个特征列lambda调用,提取所有的唯一值并存入单独的列表
def process_column_unique(dataframe, column_name):
    return dataframe.select(
        lit(column_name).alias("column_name"),
        collect_set(
            when(
                (split(split(col(column_name), '\x01')[0], '\x03')[0] != '') & 
                (split(split(col(column_name), '\x01')[0], '\x03')[0].isNotNull()),
                split(split(col(column_name), '\x01')[0], '\x03')[0]
            )
        ).alias("unique_values")
    )
# 处理每个特征列的唯一值及最大序列长度
feature_dfs_lenth = [process_column_lenth(df, col_name) for col_name in feature_columns]
feature_dfs_unique = [process_column_unique(df, col_name) for col_name in feature_columns]
PowerShell

我们使用 pyspark DataFrame API 进行分布式处理,并通过 spark broadcast 广播将唯一值和最大序列长度字典数据分发到每个 pyspark worker 工作节点:

FEATURE_HASH_DICT = load_feature_hash_dict(args.feature_hash_dict)
FEATURE_LENTH_DICT = load_feature_lenth_dict(args.feature_lenth_dict)
sc = spark.sparkContext    
FEATURE_HASH_DICT_BROADCAST = sc.broadcast(FEATURE_HASH_DICT)
FEATURE_LENTH_DICT_BROADCAST = sc.broadcast(FEATURE_LENTH_DICT)

## 拆分到多个分区
df = df.repartition(args.num_partitions)
feature_columns = [col for col in df.columns if col.startswith('_')] 
df_processed = df.rdd.mapPartitions(
        lambda iterator: process_partition(iterator, feature_columns, FEATURE_HASH_DICT_BROADCAST,FEATURE_LENTH_DICT_BROADCAST,args.padding_lenth)
    ).toDF()
PowerShell

特征处理逻辑如下:

  • label 列保持不变。
  • 对 feature 每一列进行 padding 的时候,传入之前统计的所有列的唯一值和最大序列长度的字典。
  • 如果该列的值数量已经是统计出的该列唯一值长度,则不再填充,截断为唯一值长度大小。
  • 如果没有达到唯一值长度,则按该列的最大变长值进行 key 和 weight 的填充,从该特征列唯一值字典没有出现的 key 中随机选择一个。
  • 填充 weight value 值的时候,需要设置为一个极小值(比如 10e-7=0.00000001,之所以不能设置为 0,是因为 feature_column API 的要求),以便让 padding 的特征对之后特征处理没有影响。

对于结构化特征的建模,序列特征很长不一定模型效果就好,且 padding 太长会导致特征数据膨胀,因此本文示例中我们把序列特征截断到一定长度(比如 10)来做演示。

def process_feature(value, hash_set, column_lenth,padding_lenth):
    result = []

    if value:  # 如果 value 不是空字符串
        features = value.split('\x01')
        for feature in features:
            hash_value, weight = feature.split('\x03')
            hash_int = int(hash_value)
            if float(weight) == 0:
                weight = "0.00000001"
            result.append(f"{hash_value}\x03{weight}")
            if len(result) >= padding_lenth:
                break  # 如果已经有10个键,就停止处理

    # 如果结果中的键少于10个,才从hash_set中补充
    if len(result) < column_lenth and len(result) < padding_lenth:
        fill_count = 1
        while len(result) < column_lenth:
            fill_hash = f"00000000000000{fill_count}"
            result.append(f"{fill_hash}\x030.000000001")
            fill_count = fill_count+1
            if len(result) >= padding_lenth:
                break  # 达到最长个数的键就停止
    return '\x01'.join(result)
PowerShell

特征数据 padding 好之后,我们就可以使用 TensorFlow 的 FixedLenFeature 构造特征定长特征列,并使用 categorical_column_with_hash_bucket 进行特征加权,具体代码如下:

# 对每个特征列按照padding长度构造FixedLenFeature
for start, end in feature_cl:
    feature_description.update({
      'id__{}'.format(colname): tf.io.FixedLenFeature([(sequence_length['_{}'.format(colname)]], tf.string)
          for colname in range(start, end+1)
      })

    feature_description.update({
      'weighted_id__{}'.format(colname): tf.io.FixedLenFeature(sequence_length['_{}'.format(colname)]], tf.float32)
          for colname in range(start, end+1)
  })
PowerShell

根据特征的基数来选择 hash bucket size:

 for key, value in feature_hash.items():
    new_key = key
    # Get the length of the value (array)
    length = len(value)
    # Determine the value of hash_bucket_size based on the length
    if length <= TEN_MILLION:
        hash_bucket_size[new_key] = length*3
    else:
        # If it exceeds 10 million, take the fourth root of the value
        hash_bucket_size[new_key] = int(math.pow(length, 0.25))

tf.print("hash_bucket_size:")
for key, value in hash_bucket_size.items():
    tf.print(f"{key}: {value}")
PowerShell

使用 categorical_column_with_hash_bucket 和 weighted_categorical_column 对特征列的 weight 值进行 value 加权:

sparse_col = {
  colname:  tf.feature_column.categorical_column_with_hash_bucket(colname, hash_bucket_size=min(hash_bucket_size[colname.split("id_")[-1]],10), dtype=tf.string)
    for colname in feature_description.keys() if colname.startswith('id__')
}


weighted_sparse_col = {
    'weighted_{}'.format(colname) : tf.feature_column.weighted_categorical_column(col, 'weighted_{}'.format(colname))
      for colname, col in sparse_col.items()
  }
PowerShell

SageMaker TensorFlow 定长特征模型训练

准备好 TensorFlow 的定长特征相关逻辑代码后,我们使用 SageMaker 的 Training Job 来拉起 TensorFlow 的分布式训练。

SageMaker Training Job 为 TensorFlow 提供了完整的分布式训练支持,可以轻松配置多实例多 GPU 的训练环境,充分利用分布式计算资源加速模型训练。它原生支持 TensorFlow 的分布式策略,包括 ps 参数服务器、distributed strategy 等模式。且通过内置的 MLOps 功能,可以自动记录训练指标、模型版本和实验跟踪,实现完整的模型生命周期管理。Training Job 还提供了自动超参数调优、热启动训练等高级特性,并能与 SageMaker 其他服务无缝集成,简化了从训练到部署的全流程。感兴趣的小伙伴可以参阅附录中的 SageMaker 训练资料。

在 SageMaker 中拉起上面章节中的定长特征模型训练代码示例如下:

original_job_name = 'tensorflow-poc'
job_name = f"{original_job_name}-{timestamp}"
print(job_name)
environment = {'job_name': job_name,
               'feature_lenth_dict':'s3://sagemaker-us-west-2-687912291502/tf_train/features_20240710_041505/part-00000-f02ce9a7-3057-4b81-b6aa-520954364de1-c000.json',
               'feature_hash_dict':'s3://sagemaker-us-west-2-687912291502/tf_train/features_20240710_041505/part-00000-757be5db-4d45-493f-a7b9-836104157742-c000.json'
              }

from sagemaker.tensorflow import TensorFlow

wide_deep_estimator = TensorFlow(entry_point='./DNN-varlen-feature-raggedtensor.py',
                             role=role,
                             source_dir = './train/',
                             max_run=86400*3,
                             use_spot_instances=True,
                             max_wait=86400*3,
                             instance_count=20,
                             train_volume_size=400,
                             instance_type='ml.m6i.8xlarge',
                             input_mode='FastFile',
                             framework_version='2.14',
                             py_version='py310',
                             subnets=['subnet-0cd82a76a056fabab'], # Should be same vpc with FSx, best to use same subnet with FSx
                             security_group_ids=['sg-0193c82932eb9f168','sg-04c9ce51b0c7665e7','sg-0af43c5507997cdb7',], # Needed when use FSx
                             keep_alive_period_in_seconds=1800,
                             environment=environment,
                             distribution={'parameter_server': {'enabled': True}})                          
                             
inputs = {'training1': train_s3_1,'fsx': fsx_fs}
wide_deep_estimator.fit(inputs, job_name = job_name)
PowerShell

上文代码所示 SageMaker Training Job 主要需要注意的点:

  • environment=environment:指定环境变量,传递模型训练需要的其他数据,比如上面章节提及的变长特征所有唯一值 feature_lenth_dict 字典及最大序列长度 feature_lenth_dict 字典。
  • instance_count=20,instance_type=’ml.m6i.8xlarge’:指定训练机器机型及数量,从而方便的拉起多机 CPU 分布式训练集群。
  • input_mode=’FastFile’:启用 fastfile mode 进行定长特征数据的流式 ingestion,不用预先将 TB 级别训练数据加载到训练算力机磁盘,而是 batch 批量边训练边 load 加载。
  • inputs = {‘training1′: train_s3_1,’fsx’: fsx_fs}:指定 S3 路径的特征数据路径,SageMaker 会自动 ingest S3 上的训练数据到多台算力机的/opt/ml/data/trainging1/对应目录下;指定 fsx lustre 存储挂载,tensorflow 多机上的模型 checkpoint 就可以使用 FSX 共享存储目录保存。
  • distribution={‘parameter_server’: {‘enabled’: True}}):启用 tensorflow 的 PS 参数服务器,SageMaker 会自动拉起 tensorflow 的分布式训练集群,从而在训练脚本中可以方便的获取 leader/worker 等节点。

方案 2. 保持变长特征序列

变长特征转换

由于不需要对变长特征再做 padding 填充或者截断,特征处理会轻量级很多,只需要对原来的特征 weight 值进行与方案 1 同样的极小值处理和 tfrecord 格式转换,这里我们还是用一个 pyspark sagemaker processing job 进行分布式的处理,其代码示例如下:

def create_example(row, feature_columns):
    feature = {}
    for col in feature_columns:
        if '\x01' in row[col]:
            values = row[col].split('\x01')
        else:
            values = [row[col]]
        
        int64_list = []
        float_list = []
        
        for value in values:
            parts = value.split('\x03')
            if len(parts) >= 1:
                try:
                    int64_list.append(str(parts[0]))
                except ValueError:
                    print("too big to convert into int ",str(parts[0]))
                    int64_list.append(0)
            
            if len(parts) >= 2:
                try:
                    float_list.append(float(parts[1]))
                except ValueError:
                    float_list.append(0.0)

        feature[f"id_{col}"] = int64_list
        feature[f"weighted_id_{col}"] = float_list

    feature['target'] = int(row['label'])
    return feature

def process_partition(iterator, feature_columns):
    
    for row in iterator:
        processed_row = {}
        for column in feature_columns:
            processed_row[column] = process_feature(row[column],column)
        processed_row['label'] = row['label']
        yield create_example(processed_row, feature_columns)


        
def main():
    args = parse_args()    
    spark = SparkSession.builder\
            .appName("FeatureProcessing") \
            .config("spark.driver.extraJavaOptions", "-Dlog4j.rootCategory=ERROR,console -Dlog4j.logger.org.apache.spark=ERROR -Dlog4j.logger.org.apache.hadoop=ERROR") \
            .config("spark.executor.extraJavaOptions", "-Dlog4j.rootCategory=ERROR,console -Dlog4j.logger.org.apache.spark=ERROR -Dlog4j.logger.org.apache.hadoop=ERROR") \
            .getOrCreate()
    sc = spark.sparkContext
    

    df = spark.read.format("orc").option("compression", "snappy").load(args.input_data_uri)
    df = df.repartition(args.num_partitions)
    feature_columns = [col for col in df.columns if col.startswith('_')]
    df_processed = df.rdd.mapPartitions(
        lambda iterator: process_partition(iterator, feature_columns)
    ).toDF()
PowerShell

SageMaker TensorFlow 变长特征模型训练

以上方案 1 中,我们把变长特征通过截断和 padding,处理为定长的序列特征字段,该方案会导致特征数据膨胀问题,比如原来某一特征列在原始数据中只有 5 个多值 key 和 weight 权重,而如果特征基数为 10,我们需要 padding 为定长 10 的特征序列,则会膨胀一倍。对于该特征大部分记录中都没有到 10 或者基数的时候,每条记录都会数据膨胀。

在 TB 级别海量特征场景下,数据膨胀的问题会更加明显(10TB 数据会增长到 20TB),特征处理的资源开销会是一个需要考虑的问题。

因此另一个方案就是保持原有的不定长特征序列,在 TensorFlow 中对不定长特征进行统一的 embedding 编码,并进行训练,这是最理想的方式,没有单独的特征处理的 job,且与数据平台上下游衔接也更加平滑。

但是目前业界不定长特征列在 TensorFlow 中并没有官方通用的方法,本文中给出了一种使用 RaggedTensor 表示 shape 不规整的 tensor 的方法及对应的 TensorFlow 训练代码,详见附录中 sample 示例,其关键的实现具体如下:

  • 封装 DenseToRaggedLayer(tf.keras.layers.Layer) 把 Dense Tensor 转为 RaggedTensor,从而表示 shape 不规整的 Tensor
    class DenseToRaggedLayer(tf.keras.layers.Layer):
        
        def __init__(self, ignore_value, **kwargs):
            super(DenseToRaggedLayer, self).__init__(**kwargs)
            self.ignore_value = ignore_value
    
        def call(self, inputs):
            return tf.RaggedTensor.from_tensor(inputs, padding=self.ignore_value) 
    
    PowerShell

    不同长度的特征列 share 一个 embedding table

    share_embedding_sparse_config = [
      {'name': 'share_emb_1',
       'columns': {    
           colname : {}
              for colname in feature_description.keys() if colname.startswith('id_')},
       'bucket_size': 50000000, #5000w + 4K mini batch per work can work on r5.4xlarge.
       'embedding_size': 16},
    ]
    
    PowerShell
  • 使用 keras.layers.Hashing 以便支持 RaggedTensor ,并构造带有权重的 share embedding
    带有权重的 share embedding 的实现有几点需要注意的地方:
    • 在 keras 中,tf.keras.layers.Embedding 没有 combiner 选项,但可以使用 tf.keras.layers.Dense 实现相同的效果(参考 https://tensorflow.google.cn/guide/migrate/migrating_feature_columns?hl=zh-cn)。
    • 在 keras 中,tf.keras 没有直接实现 share embedding 的 layer,直接用同一个 layer 针对不同的 input 调用就间接实现了 share。
    • 在 keras 中,使用 preprocessing API 的时候,使用 CategoryEncoding API 的参数 count_weights 把权重带入,注意 count_weights 是支持变长权重的。
  • 使用 keras.layers.CategoryEncoding 以便支持 RaggedTensor
    for conf in share_embedding_sparse_config:
            shared_embedding = tf.keras.layers.Dense(conf['embedding_size'], use_bias=False)
            for feature_name, inner_conf in conf['columns'].items():
                cur_input = keras.Input(shape=(None,), name=feature_name, dtype=tf.string) #id特征
                weight_name = 'weighted_' + feature_name
                weigh_cur_input = keras.Input(shape=(None,), name=weight_name, dtype=tf.float32) #id特征的权重
                #preprocessing.Hashing支持tf.sparse tensor
                #下面的hashing的output_mode要设定为int(索引编号),否则会因为bucket size很大,占用很多的内存。
                ragged_hashed_input = tf.keras.layers.Hashing(num_bins=conf['bucket_size'], name=feature_name + '_hash', output_mode='int',)(DenseToRaggedLayer(name=feature_name + '_rag', ignore_value = '-1')(cur_input))          
                #为了使用特征的权重,这里需要把参数output_mode设置为count
                encoded_data = tf.keras.layers.CategoryEncoding(num_tokens=conf['bucket_size'], output_mode='count', sparse=True)(ragged_hashed_input, count_weights= DenseToRaggedLayer(name=weight_name + '_rag', ignore_value = 0)(weigh_cur_input))
                shared_emb_data = shared_embedding(encoded_data)
                                     
                deep_raw_inputs.append(cur_input)
                deep_raw_inputs.append(weigh_cur_input)
                deep_inputs.append(shared_emb_data)
        
        # BUILD MODEL
    
        deep = layers.Concatenate()(deep_inputs)
    
    PowerShell

    在接收变长特征训练数据时,通过 map 操作,使用 tf.io.parse_example 根据预定义的 feature_description 将数据解析成特征字典,从解析后的特征字典中提取并移除标签(target)。
    在处理成字典的时候,给没有值的 ID 特征填充一个无意义的值 ‘-1’,并且确保给被填充的 ID 特征对应的权重为 0,从而消除这些填充 ID 的影响,具体代码示例如下:

    def parse_record_batch(message):
        parsed_feature_dict = tf.io.parse_example(message, feature_description)
        label = parsed_feature_dict.pop('target')
        for feature_name, cur_tensor in parsed_feature_dict.items():
            if feature_name.startswith('id'):
                parsed_feature_dict[feature_name] = tf.sparse.to_dense(cur_tensor, '-1')
            if feature_name.startswith('weighted'):
                #给id特征对应的padding的权重设定为'0',从而把padding的id的特征值的影响去除
                parsed_feature_dict[feature_name] = tf.sparse.to_dense(cur_tensor, 0)
        return parsed_feature_dict, label
    
    PowerShell

    变长特征训练的 SageMaker Training Job 与上文定长 padding 方式的 SageMaker Training Job 一致,entry point 调整为变长训练的脚本(本文中为 DNN-varlen-feature-raggedtensor.py),同样参数中启动 TensorFlow PS 参数服务器。

    wide_deep_estimator = TensorFlow(entry_point='./DNN-varlen-feature-raggedtensor.py',
                                 role=role,
                                 source_dir = './train/',
                                 instance_count=20,
                                 instance_type='ml.m6i.8xlarge',
                                 framework_version='2.14',
                                 py_version='py310',
                                 subnets=['subnet-0cd82a76a056fabab'], # Should be same vpc with FSx, best to use same subnet with FSx
                                 security_group_ids=['sg-0193c82932eb9f168','sg-04c9ce51b0c7665e7','sg-0af43c5507997cdb7',], # Needed when use FSx
                                 keep_alive_period_in_seconds=1800,
                                 environment=environment,
                                 distribution={'parameter_server': {'enabled': True}})
    
    PowerShell

总结

本文介绍了在 Amazon SageMaker 平台上训练 TensorFlow 变长特征模型的技术方案和落地实践经验。通过本文,算法的小伙伴可以提高训练数据特征存储和处理的效率,优化计算资源利用率,确保分布式训练的扩展性。

本文中的示例代码和方法可以应用于 DSP(Demand-Side Platform)广告系统对候选广告进行精排序(Re-ranking)等相关领域和业务场景。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

附录

Amazon SageMaker 服务:https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/machine-learning-environments.html

SageMaker Processing Job : https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-pytorch.html

SageMaker TensorFlow 训练:https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/using_tf.html#training-with-parameter-servers

Tensorflow Dense embedding参考:https://tensorflow.google.cn/guide/migrate/migrating_feature_columns?hl=zh-cn)

Sample 代码示例:https://github.com/aws-samples/tensorflow_varlen_distribute_training_sample_code

本篇作者

唐清原

亚马逊云科技高级解决方案架构师,负责 Data Analytic & AIML 产品服务架构设计以及解决方案。10+数据领域研发及架构设计经验,历任 IBM 咨询顾问,Oracle 高级咨询顾问,澳新银行数据部领域架构师职务。在大数据 BI,数据湖,推荐系统,MLOps 等平台项目有丰富实战经验。

梁宇辉

亚马逊云科技机器学习产品技术专家,负责基于亚马逊云科技的机器学习方案的咨询与设计,专注于机器学习的推广与应用,深度参与了很多真实客户的机器学习项目的构建以及优化。对于深度学习模型分布式训练,推荐系统和计算广告等领域具有丰富经验。