使用 Amazon SageMaker 构建语义内容推荐系统

下载和准备数据集

在本模块中,您需要下载数据集、预处理数据集、将数据集分为训练数据集和验证数据集,然后将数据集暂存在 Amazon S3 存储桶中。

您的机器学习模型将使用 20newsgroups 数据集进行训练,该数据集包含 20,000 个新闻组帖子,涵盖 20 个主题。20newsgroups 数据集由卡内基梅隆大学计算机科学学院整理,可从 scikit-learn 公开获取。

 时长

20 分钟

步骤 1:获取数据集

为了准备数据、训练机器学习模型并部署模型,您首先需要在 Jupyter Notebook 环境中导入一些库,并定义一些环境变量。将以下代码复制粘贴到 Jupyter Notebook 实例的代码单元格中,然后点击 Run(运行)。

注意:请务必将 BUCKET 名称 sagemaker-xx 替换为您自己的 S3 存储桶名称。

!pip install sagemaker==1.9.0
import numpy as np
import os
import matplotlib.pyplot as plt
import sagemaker
import seaborn as sns
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
from sklearn.datasets.twenty_newsgroups import strip_newsgroup_header, strip_newsgroup_quoting, strip_newsgroup_footer
newsgroups_train = fetch_20newsgroups(subset='train')['data']
newsgroups_test = fetch_20newsgroups(subset = 'test')['data']
NUM_TOPICS = 30
NUM_NEIGHBORS = 10
BUCKET = 'sagemaker-rjd'
PREFIX = '20newsgroups'
train_labels = fetch_20newsgroups(subset='train')['target']
categories = fetch_20newsgroups(subset='train')['target_names']

在代码运行过程中,方括号之间会出现一个星号 (*)。几秒钟后代码执行完毕,星号 (*) 将被替换为数字 1。

scikit-learn 数据集自然分为训练集和测试集。对测试集完成一些预处理后,将其放置一旁,仅供测试使用。将训练集划分为训练集和验证集,以便在训练过程中测试模型性能。

步骤 2:预处理数据

在自然语言处理中,在训练任何机器学习模型之前,首要任务之一就是将原始文本数据预处理为机器可读的数值。这个过程通常需要一系列步骤。

首先,使用 scikit-learn 提供的 API 去除数据集中的所有页眉、页脚和引号。

在 Jupyter Notebook 中,将以下代码复制粘贴到代码单元格中,然后点击 Run(运行)。

for i in range(len(newsgroups_train)):
    newsgroups_train[i] = strip_newsgroup_header(newsgroups_train[i])
    newsgroups_train[i] = strip_newsgroup_quoting(newsgroups_train[i])
    newsgroups_train[i] = strip_newsgroup_footer(newsgroups_train[i])

现在,让我们看一下其中一个训练示例。将以下代码复制粘贴到 Jupyter Notebook 中。

newsgroups_train[1]

如您所见,数据仅仅是纯文本段落。为了让机器能够读取这些数据,您需要为句子中的每个单词分配一个 Token,将数据“Token 化”为数字格式。您可以通过先计算最常见的 Token,然后只保留前 2000 个,将 Token 总数限制在 2000 个以内。设置这个限制是因为出现频率较低的词语对主题模型的影响会逐渐减小,最终可以忽略不计。接下来,对于每个文档,使用词袋 (BoW) 模型将其转换为向量,该向量跟踪每个 Token 在训练示例中出现的次数。

在本例中,使用 nltk 包中的词形还原器 WordNetLemmatizer,然后用 scikit-learn 中的 CountVectorizer 执行 Token 计数。WordNetLemmatizer 使用名词作为词性 (POS),将词语词形还原为词元。词形还原旨在返回实际词语,而词干提取(另一种预处理方法)通常返回非字典词或词根,它们在机器学习中的作用通常不大。

在列表推导式中,实现了一个简单的规则:只考虑长度超过 2 个字符、以字母开头并匹配 token_pattern 的词语。

在 Jupyter Notebook 中,复制粘贴以下代码,然后点击 Run(运行)。

!pip install nltk
import nltk
nltk.download('punkt')
nltk.download('wordnet')
from nltk import word_tokenize          
from nltk.stem import WordNetLemmatizer 
import re
token_pattern = re.compile(r"(?u)\b\w\w+\b")
class LemmaTokenizer(object):
    def __init__(self):
        self.wnl = WordNetLemmatizer()
    def __call__(self, doc):
        return [self.wnl.lemmatize(t) for t in word_tokenize(doc) if len(t) >= 2 and re.match("[a-z].*",t) 
                and re.match(token_pattern, t)]

定义了 Token 处理器之后,现在可以在将 vocab_size 限制为 2000 的同时执行 Token 计数。

在 Jupyter Notebook 中,复制粘贴以下代码,然后点击 Run(运行)。

import time
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
vocab_size = 2000
print('Tokenizing and counting, this may take a few minutes...')
start_time = time.time()
vectorizer = CountVectorizer(input='content', analyzer='word', stop_words='english',
                             tokenizer=LemmaTokenizer(), max_features=vocab_size, max_df=0.95, min_df=2)
vectors = vectorizer.fit_transform(newsgroups_train)
vocab_list = vectorizer.get_feature_names()
print('vocab size:', len(vocab_list))

# random shuffle
idx = np.arange(vectors.shape[0])
newidx = np.random.permutation(idx) # this will be the labels fed into the KNN model for training
# Need to store these permutations:

vectors = vectors[newidx]

print('Done. Time elapsed: {:.2f}s'.format(time.time() - start_time))

现在,让我们仔细看看代码。

CountVectorizer API 使用了三个超参数,在训练后续模型时,它们有助于解决过拟合或欠拟合问题。

第一个超参数是 max_features,您可以将其设置为词汇表大小。如前所述,由不常用词构成的巨大词汇表会给数据引入不必要的噪音,从而导致训练出的模型性能较差。

第二个和第三个超参数是 max_df 和 min_df。min_df 参数会忽略出现在少于 min_df% 文档中的词语,而 max_df 参数则忽略出现在超过 max_df% 文档中的词语。max_df 参数确保去除停用词表未涵盖的高频词。通常来说,这是一种很好的做法,因为主题模型试图通过寻找聚集在一起形成主题的不同词组来识别主题。如果某些词语出现在所有文档中,它们会降低模型的表达能力。相反,提高 min_df 参数可以确保不包含极其罕见的词语,从而减少模型过拟合的倾向。

为了生成训练集和验证集,首先需要对 CountVectorizer API 生成的 BOW 向量进行随机排列。在随机排列的过程中,您需要跟踪原始索引和重排后的索引。在本教程后续模块中,当您使用 KNN 模型查找与未见过的文档相似的文档时,需要知道重排前与训练数据关联的原始索引。

步骤 3:将训练数据集和验证数据集暂存至 Amazon S3

完成预处理后,现在您可以创建训练数据集和验证数据集,并将其暂存到您的 S3 存储桶中。

首先,将向量转换为稀疏表示形式。

将以下代码复制粘贴到 Jupyter Notebook 中,然后点击 Run(运行)。

import scipy.sparse as sparse
vectors = sparse.csr_matrix(vectors, dtype=np.float32)
print(type(vectors), vectors.dtype)

现在,将数据集拆分为训练数据和测试数据。

  • 训练数据 (80%) 在模型训练循环中使用。您将使用基于梯度的优化方法,以迭代的方式优化模型参数。基于梯度的优化是一种利用模型损失函数的梯度来寻找模型参数值的方法,目的是使模型误差最小化。
  • 测试数据(剩余 20% 的客户)用于评估模型性能,并衡量训练后的模型对未见数据的泛化能力。

注意:只有在对整个数据集进行随机排列之后,以下数据拆分方法才有效。或者,您也可以使用 sklearn.modelselection train_test_split API 进行拆分,并将 random_state 种子设置为一个数值,以确保可重现性。

将以下代码复制到一个新的代码单元格中,然后点击 Run(运行)来拆分数据:

# Convert data into training and validation data
n_train = int(0.8 * vectors.shape[0])

# split train and test
train_vectors = vectors[:n_train, :]
val_vectors = vectors[n_train:, :]

# further split test set into validation set (val_vectors) and test  set (test_vectors)

print(train_vectors.shape,val_vectors.shape)

接下来,定义训练路径、验证路径以及模型训练后用于存储 NTM 构件的输出路径。

将以下代码复制粘贴到一个新的代码单元格中,然后点击 Run(运行)。

from sagemaker import get_execution_role

role = get_execution_role()

bucket = BUCKET
prefix = PREFIX

train_prefix = os.path.join(prefix, 'train')
val_prefix = os.path.join(prefix, 'val')
output_prefix = os.path.join(prefix, 'output')

s3_train_data = os.path.join('s3://', bucket, train_prefix)
s3_val_data = os.path.join('s3://', bucket, val_prefix)
output_path = os.path.join('s3://', bucket, output_prefix)
print('Training set location', s3_train_data)
print('Validation set location', s3_val_data)
print('Trained model will be saved at', output_path)

对于训练、验证和测试通道中的数据,NTM 支持 CSV 和 RecordIO protobuf 格式。下面的辅助函数 split_convert_upload 会将原始向量转换为 RecordIO 格式,并使用 n_parts 参数,您可以选择将数据集拆分成多个分片,用于分布式训练。

要转换向量,请将以下代码复制粘贴到一个新的代码单元格中,然后点击 Run(运行)。

def split_convert_upload(sparray, bucket, prefix, fname_template='data_part{}.pbr', n_parts=2):
    import io
    import boto3
    import sagemaker.amazon.common as smac
    
    chunk_size = sparray.shape[0]// n_parts
    for i in range(n_parts):

        # Calculate start and end indices
        start = i*chunk_size
        end = (i+1)*chunk_size
        if i+1 == n_parts:
            end = sparray.shape[0]
        
        # Convert to record protobuf
        buf = io.BytesIO()
        smac.write_spmatrix_to_sparse_tensor(array=sparray[start:end], file=buf, labels=None)
        buf.seek(0)
        
        # Upload to s3 location specified by bucket and prefix
        fname = os.path.join(prefix, fname_template.format(i))
        boto3.resource('s3').Bucket(bucket).Object(fname).upload_fileobj(buf)
        print('Uploaded data to s3://{}'.format(os.path.join(bucket, fname)))
split_convert_upload(train_vectors, bucket=bucket, prefix=train_prefix, fname_template='train_part{}.pbr', n_parts=8)
split_convert_upload(val_vectors, bucket=bucket, prefix=val_prefix, fname_template='val_part{}.pbr', n_parts=1)

大功告成!您已经准备好并暂存了数据集!

总结

在本模块中,您导入并获取了将用于内容推荐系统的数据集。然后,您通过预处理、词形还原和 Token 化来准备数据集。最后,您将数据集拆分为训练集和验证集,并将它们暂存到您的 Amazon S3 存储桶中。

在下一个模块中,您将使用 Amazon SageMaker NTM 算法训练主题模型,并将模型部署到 Amazon SageMaker 中。

训练和部署主题模型