使用Amazon SageMaker 训练因子分解机模型并应用于推荐系统。
1. 单击此处之后,AWS 管理控制台将在新窗口中打开,因此您可以使本分步指南保持打开状态。开始在搜索栏中键入 SageMaker,并选择 Amazon SageMaker 以打开服务控制台。
1. 从 Amazon SageMaker 控制面板中选择笔记本实例。
2. 在创建笔记本实例页面上,在笔记本实例名称字段中输入名称。本教程使用 RecommendationSystemTest作为实例名称,如有需要,您可以选择其他名称。
在本教程中,您可以保留默认的笔记本实例类型:ml.t2.medium。
要使笔记本实例能够访问并安全地将数据上传到 Amazon S3,必须指定 IAM 角色。在 IAM 角色字段中,选择创建新角色,让 Amazon SageMaker 创建具有所需权限的角色,并将其分配给您的实例。或者,您也可以在您的账户中选择现有 IAM 角色来达到此目的。
3.在创建 IAM 角色框中,选择任意 S3 存储桶。 这让您的 Amazon SageMaker 实例可以访问您账户中的所有 S3 存储桶。在本教程的稍后部分,您将创建一个新的 S3 存储桶。但是,如果您有一个想要使用的存储桶,请选择特定 S3 存储桶并指定相应存储桶的名称。
选择创建角色。
4.在笔记本实例页面上,您应该会看到新的 RecommendationSystemTest笔记本实例处于待处理状态。
您的笔记本实例应在两分钟内从待处理转变为服务中状态。
1.在笔记本实例页面上,当状态转变为服务中后,选择RecommendationSystemTest并使用操作下拉菜单将其打开,或选择服务中状态旁边的打开 Jupyter。
2.打开 Jupyter 后,首先登陆到Kaggle下载 The Movie Dataset数据集并上传到Jupter中
3.打开 Jupyter 后,从文件选项卡中选择新建,然后选择 conda_python3。
4. 将以下代码复制到笔记本中的代码单元格中,为了demo的录制和运算的效率,我们对原始数据集进行采样,最后选择了10w条数据。
# 导入原始数据集 import pandas as pd df_all = pd.read_csv("ratings.csv") print(df_all.shape) print(df_all['userId'].nunique()) print(df_all['movieId'].nunique()) # 随机采样 df = df_all.sample(n=100000) df.drop(['timestamp'],axis=1,inplace=True) df.to_csv("ratings_100k.csv",index=False) user_number = df['userId'].nunique() movie_number = df['movieId'].nunique() print(df.shape) print(user_number, movie_number)
5. 接下来进行训练集和测试集的划分,在这里我们采取4:1的比例,调用sklearn的train_test_split进行划分。
# 划分训练集和测试集 from sklearn.model_selection import train_test_split df_train, df_test = train_test_split(df, test_size=0.2, random_state=42) df_train.to_csv('100k_train.csv', index=False) df_test.to_csv('100k_test.csv', index=False) print(df_train.shape, df_test.shape) print(df_train.head(5)) print(df_test.head(5))
6. 数据集中的用户ID和电影ID均为随机字符串,为了方便我们后续建立有序矩阵以及模型训练后预测结果的数据对应,我们首先为用户和电影分别建立由0开始的index序列,并使其与ID字符串对应。
# 分别建立用户和电影字典 import csv def createIDtoIndexDict(filename, user_number, movie_number): u_Dict = {} m_Dict = {} i = 0 m = 0 with open(filename, 'r') as f: sample = csv.reader(f, delimiter=',') for userId, movieId, rating in sample: if userId == 'userId': continue else: if userId not in u_Dict.keys(): u_Dict[userId] = i i = i+1 if movieId not in m_Dict.keys(): m_Dict[movieId] = m m = m+1 return u_Dict, m_Dict u_Dict, m_Dict = createIDtoIndexDict('ratings_100k.csv', user_number, movie_number) # print(u_Dict, m_Dict)
7. 因子分解机的训练是针对稀疏矩阵的,因此我们要将数据集中电影、评分、用户的序列转为稀疏矩阵,并根据用户评分的结果生成标签向量。我们使用Python Scipy模块中的lil_matrix来构建。生成的矩阵应当是每一个用户ID作为单独一列、每一部电影在所有用户列之后也作为单独一列,针对原数据集中每行的数据,在对应的用户列和电影列置1。标签向量以用户评分为基准,我们设定用户评分大于等于6的为“喜爱”,并在对应的标签向量位置置1,反之为“不喜爱”,标签向量相应位置置0。
训练集和测试集均进行同样的操作。
# 建立稀疏矩阵(Sparse Matrix)和标签向量(Label Vector) import scipy import numpy as np def loadDataset(filename, lines, columns): X = scipy.sparse.lil_matrix((lines, columns)).astype('float32') Y = [] line = 0 with open (filename, 'r') as f: sample = csv.reader(f, delimiter=',') for userId, movieId, rating in sample: if userId == 'userId': continue else: X[line, u_Dict[userId]] = 1 X[line, user_number + m_Dict[movieId]] = 1 if rating == 'rating' or int(float(rating)) < 4: Y.append(0) else: Y.append(1) line=line+1 Y=np.array(Y).astype('float32') return X, Y columns = user_number + movie_number X_train, Y_train = loadDataset('100k_train.csv', df_train.shape[0], columns) X_test, Y_test = loadDataset('100k_test.csv', df_test.shape[0], columns) print(X_train.shape, X_test.shape) print(Y_train.shape, Y_test.shape)
8. 稀疏矩阵中绝大多数元素均为0,如果直接保存稀疏矩阵,会占用大量的存储空间,因此我们将其转为protobuf格式的数据,并保存到S3。
# 转换稀疏矩阵为protobuf格式,并保存到S3 import io,boto3 import sagemaker.amazon.common as smac bucket = 'sagemaker-demo-movie-recommendation' prefix = 'sagemaker/movie-recommendation' train_key = 'train.protobuf' train_prefix = '{}/{}'.format(prefix, 'train') test_key = 'test.protobuf' test_prefix = '{}/{}'.format(prefix, 'test') output_prefix = 's3://{}/{}/output'.format(bucket, prefix) def writeDatasetToProtobuf(X, Y, bucket, prefix, key): buf = io.BytesIO() smac.write_spmatrix_to_sparse_tensor(buf, X, Y) buf.seek(0) print(buf) obj = '{}/{}'.format(prefix, key) boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf) print('Wrote dataset: {}/{}'.format(bucket,obj)) return 's3://{}/{}'.format(bucket,obj) train_data = writeDatasetToProtobuf(X_train, Y_train, bucket, train_prefix, train_key) test_data = writeDatasetToProtobuf(X_test, Y_test, bucket, test_prefix, test_key) print(train_data) print(test_data) print('Output: {}'.format(output_prefix))
9. 转到S3的控制台中查看,可以看到在存储桶下train目录中存了转换格式后的训练集;在test目录中存了转换格式后的测试集。
1. FM是Amazon SageMaker自带的算法之一,因此通过Amazon SageMaker训练模型非常容易。首先我们需要引入SageMaker的SDK,并建立Amazon SageMaker的session、定义位于该Region的因子分解机算法Container以及获取SageMaker的运行角色。随后我们定义FM训练需要的一些参数。首先是环境参数,包括之前定义好的Container、角色、输出位置和session、还包括训练使用的EC2实例,本例中采用“ml.c4.xlarge”来训练。
之后,我们需要定义FM算法的超参(Hyperparameters)。在本例中特征列为用户数与电影数的总和、预测方式为二分类(即结果为判断“喜爱”或是“不喜爱”)、最小批量为1000、epoch时期为50次。其中num_factors即为在算法介绍中提到的潜藏特征K的数量,根据Amazon SageMaker官方文档的说明,建议在2-1000之间,通常64为最优值,因此,我们也设为64。
最后为模型提供训练集和测试集在S3中的位置,训练就开始了。
# FM模型训练 from sagemaker import get_execution_role from sagemaker.amazon.amazon_estimator import get_image_uri import sagemaker sess = sagemaker.Session() role = get_execution_role() container = get_image_uri(boto3.Session().region_name, 'factorization-machines') fm = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=output_prefix, sagemaker_session=sess) fm.set_hyperparameters(feature_dim=68035, predictor_type='binary_classifier', mini_batch_size=1000, num_factors=64, epochs=50) fm.fit({'train': train_data, 'test':test_data})
2. 点击Amazon SageMaker控制台的训练任务,可以看到我们刚刚开始的训练,训练完成后这里也会显示Completed状态。
1. 最后就是模型部署,Endpoint可以理解为模型基于http访问的API接口,有了Endpoint就可以进行预测服务了。
# 部署模型 fm_predictor = fm.deploy(initial_instance_count=1, instance_type='ml.t2.medium')
2. 转到Amazon SageMaker控制台的终端节点部分可以看到我们不熟的终端节点。
3. 等待几分钟部署完毕之后就可以点进终端节点查看详细信息
1. 最后我们进行一个简单的预测,从参与训练的电影库中推荐10部电影给某个已存在的用户。
首先生成该用于的稀疏矩阵。
# 预测:从参与训练的电影库(26004部)中推荐10部给已存在的某个用户; # 生成该用户的稀疏矩阵 def createInferenceForUser(userId, lines, columns): X = scipy.sparse.lil_matrix((lines, columns)).astype('float32') line = 0 for line in range(0, lines): X[line, u_Dict[userId]] = 1 X[line, user_number + line] = 1 line=line+1 print(X.shape) return X # 随机选择一个用户ID为10965 user_inference = createInferenceForUser('10965', movie_number, user_number + movie_number) print(user_inference.shape)
2.然后调用部署好的终端节点进行预测,对预测的结果按照score进行排序选择排名前10的电影。
import json from sagemaker.predictor import json_deserializer #invoke endpoint client = boto3.client('sagemaker-runtime') def fm_serializer(data): js = {'instances': []} for row in data: js['instances'].append({'features': row.tolist()}) return json.dumps(js) # 把电影dict转为dataframe df_m_Dict = pd.DataFrame.from_dict(m_Dict, orient='index',columns=['movie_index']) df_m_Dict = df_m_Dict.reset_index().rename(columns = {'index':'movieId'}) inf_start = 0 step = 10 response = [] while inf_start <= user_inference.shape[0]+1: raw_response = client.invoke_endpoint( EndpointName = 'factorization-machines-2020-04-13-07-19-57-191', Body = fm_serializer(user_inference[inf_start:inf_start+step].toarray()), ContentType='application/json') result = json.loads(raw_response['Body'].read()) for counter, p in enumerate(result['predictions']): p['movieId'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].movieId.values[0] response.append(p) inf_start = inf_start + step print(len(response)) # 将response list转为dataframe df_response = pd.DataFrame(response, columns=['score', 'predicted_label', 'movieId']) df_response_like = df_response[df_response.predicted_label == 1.0] # 按照score选择前十个电影推荐 final = df_response_like.sort_values('score', ascending=False).head(10) print(final)
3. 可以看到运行的结果就是给该用户推荐的前10部电影,至此,一个简单的电影推荐预测就完成了。
在此步骤中,您将终止与 Amazon SageMaker 相关的资源。
重要说明:终止当前未在使用的资源可降低成本,是最佳实践。不终止资源将产生费用。
要删除 Amazon SageMaker 终端节点和 S3 存储桶中的对象,请复制、粘贴并运行以下代码:
sagemaker.Session().delete_endpoint(fm_predictor.endpoint) bucket_to_delete = boto3.resource('s3').Bucket(bucket_name) bucket_to_delete.objects.all().delete()