亚马逊AWS官方博客

使用AWS Sagemaker部署的终端节点进行推荐预测的常用场景

使用AWS Sagemaker系列文章:

第一篇:使用AWS Sagemaker训练因子分解机模型并应用于推荐系统

第二篇:使用AWS Sagemaker部署的终端节点进行推荐预测的常用场景(本博文)

————
上次我们初步介绍了使用 SageMaker 快速训练和部署 Factorization Machines 模型,接下来我们利用Endpoint进行预测模拟的实际用例。

利用Endpoint进行预测模拟的实际用例

考虑几个常见的实际用例在生产环境中的推荐系统

  • 从参与训练的电影库(26004部)中推荐10部给已存在的用户;
  • 从参与训练的电影库(26004部)中推荐10部给新用户;
  • 应用内容筛选并推荐10部电影给已存在的用户;

1. 从参与训练的电影库(26004部)中推荐10部给已存在的用户

首先我们随机选择一位用户,本例中该用户ID是100020528。观察该用户在训练集中点评过的电影,共计4部。

In [56]:

#select 100020528 as a random user
for k in df[df['UserId']=='100020528'].index:
     print(df_movie_all_db[df_movie_all_db['MovieID']==str(df.at[k,'MovieID'])].Title.values[0], "         ", df.at[k,'Rate'])

苏醒 6

推动情人的床 6

录影片段 4

雷德怒潮 6

 

为这一用户生成预测数据的稀疏矩阵。

In [7]:

#revise the loadDataset() to accept the userID as an input.
def createInferenceForUser(UserId, lines, columns):
    user_number = 53902
    X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
    line = 0
    for line in range(0, lines):
        X[line, u_Dict[UserId]] = 1
        X[line, user_number+line] = 1
        line=line+1

    print(X.shape)
    return X

 

In [8]:

user_inference = createInferenceForUser('100020528', movie_number, user_number+movie_number)
print(user_inference.shape)

(26004, 79906)

(26004, 79906)

 

生成的是26004✖️79906的矩阵。

 

随后调用Endpoint对预测数据进行预测,并返回预测结果。由于预测数据较大,我们分段进行,每一个请求传递10行预测数据。

In [10]:

#invoke endpoint
client = boto3.client('sagemaker-runtime')

In [20]:

#Make inference
import json
inf_start = 0
step = 10
response = []
while inf_start <= user_inference.shape[0]+1:
    raw_response = client.invoke_endpoint(
                EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
                Body = fm_serializer(user_inference[inf_start:inf_start+step].toarray()),
                ContentType='application/json')
    result = json.loads(raw_response['Body'].read())
    for counter, p in enumerate(result['predictions']):
        p['MovieID'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].MovieID.values[0]
        response.append(p)
    inf_start = inf_start+step
    print(inf_start)
print(len(response))

26004

对预测结果进行整理后,显示如下:

In [127]:

df_response_like = df_response[df_response.predicted_label == 1.0]
final = df_response_like.sort_values('score', ascending=False).head(10)
for final_index in final.index:
    print(df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Title.values[0],'\t',df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Rate.values[0],'\t',final.at[final_index,'score'])

 

逃离循环 6.8 0.8900507092475891

燕尾蝶 8.6 0.8846622705459595

盲证 7.4 0.8812546133995056

头文字D 新OVA 启程之绿 7.5 0.8807412981987

狐妖小红娘剧场版:月红篇 9.2 0.8801539540290833

追随你脚步 7.5 0.8792228698730469

黑色闪电 7.4 0.879056453704834

蓝色情人节 7.8 0.8788111805915833

骗中骗2 7.5 0.878635048866272

放牛班的春天 9.2 0.8783532977104187

 

2. 从参与训练的电影库(26004部)中推荐10部给新用户

如果是一个全新的用户,并不存在于我们训练数据集中的用户列表里。我们可以将稀疏矩阵中所有表示用户的列置为0来生成预测数据。

In [130]:

def createNewUserInference(lines, columns):
    user_number = 53902
    X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
    line = 0
    for line in range(0, lines):
        X[line, user_number+line] = 1
        line=line+1
    print(X.shape)
    return X

 

In [123]:

def callEndpointInference(query):
    import json
    inf_start = 0
    step = 10
    response = []
    while inf_start <= query.shape[0]:
        raw_response = client.invoke_endpoint(
                    EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
                    Body = fm_serializer(query[inf_start:min(inf_start+step, query.shape[0]+1)].toarray()),
                    ContentType='application/json')
        result = json.loads(raw_response['Body'].read())
        for counter, p in enumerate(result['predictions']):
            p['MovieID'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].MovieID.values[0]
            response.append(p)
        inf_start = inf_start+step
        print(inf_start)
    return response

 

In [125]:

def showResult(response):
    df_response = pd.DataFrame(response)
    df_response_like = df_response[df_response.predicted_label == 1.0]
    final = df_response_like.sort_values('score', ascending=False).head(10)
    for final_index in final.index:
        print(df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Title.values[0],'\t',df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Rate.values[0],'\t',final.at[final_index,'score'])

 

针对新用户生成的预测数据依然是一个26004✖️79906的矩阵。

In [131]:

new_inference = createNewUserInference(movie_number, user_number+movie_number)

(26004, 79906)

 

In [132]:

%%time
new_response = callEndpointInference(new_inference)

 

CPU times: user 5min 18s, sys: 25.9 s, total: 5min 44s

Wall time: 16min

 

预测结果如下,可以看到,与之前已存在用户的预测结果有很大不同。

In [133]:

showResult(new_response)

 

青涩恋爱 7.9 0.7878496646881104

死亡实验 8.0 0.7846202850341797

时空线索 7.7 0.7836878299713135

侠僧探案传奇之王陵之谜 7.0 0.7817953824996948

宝拉X 7.1 0.7813129425048828

没有秘密 6.5 0.7806361317634583

雨中曲 9.0 0.7800688743591309

卡里加里博士的小屋 8.7 0.7798536419868469

玩命警车 6.3 0.7796841263771057

玩具总动员3 8.8 0.7790806293487549

 

3. 应用内容筛选并推荐10部电影给已存在的用户

回到已存在用户的推荐预测部分,之前我们的预测耗时较长,约需要17分钟左右才能完成一次查询,这在实际应用中是不现实的。主要原因之一是我们采用对电影库进行全量预测,其实这降低了性能表现和相关性。如果采用内容筛选的办法,提前限定预测影片的范围,缩小预测数据规模,可以提高性能表现和相关性。

让我们看一下100020528用户在全量数据中的历史表现。这位用户点评了135部电影,对应的类型如下:

In [88]:

df_movie_preference = df_movie_all_db[df_movie_all_db['MovieID'].isin(movie_rated_by_user)].copy()
df_movie_preference.groupby('Tags').count()

 

Out[88]:

Link Rate Title Country RateCollected MovieID
Tags
剧情 / 动作 / 恐怖 1 1 1 1 0 1
剧情 / 动作 / 犯罪 2 2 2 2 0 2
剧情 / 历史 / 奇幻 1 1 1 1 0 1
剧情 / 喜剧 / 奇幻 1 1 1 1 0 1
剧情 / 喜剧 / 悬疑 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 喜剧 / 爱情 1 1 1 1 0 1
剧情 / 喜剧 / 爱情 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 奇幻 1 1 1 1 0 1
剧情 / 恐怖 3 3 3 3 0 3
剧情 / 恐怖 / 历史 1 1 1 1 0 1
剧情 / 恐怖 / 短片 1 1 1 1 0 1
剧情 / 悬疑 3 3 3 3 0 3
剧情 / 悬疑 / 恐怖 1 1 1 1 0 1
剧情 / 悬疑 / 惊悚 2 2 2 2 0 2
剧情 / 悬疑 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 悬疑 / 惊悚 / 犯罪 1 1 1 1 0 1
剧情 / 情色 1 1 1 1 0 1
剧情 / 惊悚 9 9 9 9 0 9
剧情 / 惊悚 / 历史 1 1 1 1 0 1
剧情 / 惊悚 / 恐怖 6 6 6 6 0 6
剧情 / 惊悚 / 犯罪 4 4 4 4 0 4
剧情 / 歌舞 / 奇幻 1 1 1 1 0 1
剧情 / 爱情 3 3 3 3 0 3
剧情 / 爱情 / 冒险 1 1 1 1 0 1
剧情 / 爱情 / 同性 1 1 1 1 0 1
剧情 / 爱情 / 悬疑 / 恐怖 / 奇幻 1 1 1 1 0 1
剧情 / 犯罪 4 4 4 4 0 4
剧情 / 科幻 / 恐怖 2 2 2 2 0 2
剧情 / 科幻 / 惊悚 2 2 2 2 0 2
剧情 / 科幻 / 惊悚 / 犯罪 1 1 1 1 0 1
喜剧 / 动作 / 科幻 1 1 1 1 0 1
喜剧 / 动作 / 科幻 / 恐怖 1 1 1 1 0 1
喜剧 / 恐怖 2 2 2 2 0 2
喜剧 / 恐怖 / 奇幻 2 2 2 2 0 2
喜剧 / 悬疑 / 恐怖 1 1 1 1 0 1
喜剧 / 惊悚 2 2 2 2 0 2
喜剧 / 惊悚 / 恐怖 1 1 1 1 0 1
喜剧 / 惊悚 / 恐怖 / 犯罪 1 1 1 1 0 1
喜剧 / 惊悚 / 犯罪 1 1 1 1 0 1
喜剧 / 爱情 / 恐怖 1 1 1 1 0 1
喜剧 / 科幻 / 恐怖 1 1 1 1 0 1
恐怖 15 15 15 15 0 15
恐怖 / 奇幻 1 1 1 1 0 1
恐怖 / 短片 / 犯罪 1 1 1 1 0 1
悬疑 / 恐怖 1 1 1 1 0 1
悬疑 / 惊悚 2 2 2 2 0 2
悬疑 / 惊悚 / 恐怖 6 6 6 6 0 6
悬疑 / 惊悚 / 恐怖 / 犯罪 1 1 1 1 0 1
悬疑 / 惊悚 / 犯罪 2 2 2 2 0 2
悬疑 / 短片 / 奇幻 1 1 1 1 0 1
惊悚 7 7 7 7 0 7
惊悚 / 恐怖 10 10 10 10 0 10
惊悚 / 恐怖 / 犯罪 3 3 3 3 0 3
惊悚 / 犯罪 1 1 1 1 0 1
爱情 1 1 1 1 0 1
爱情 / 恐怖 / 奇幻 1 1 1 1 0 1
爱情 / 短片 / 情色 1 1 1 1 0 1
科幻 1 1 1 1 0 1
科幻 / 奇幻 1 1 1 1 0 1
科幻 / 恐怖 1 1 1 1 0 1

65 rows × 6 columns

可以看到这位用户偏爱含有“恐怖”和“惊悚”标签的电影,加起来几乎是其点评过电影的50%,所以我们可以对电影库进行一下筛选,看有多少电影是包含这两个标签的。

In [93]:

df_movie_all_db[df_movie_all_db.Tags.isin(['惊悚','恐怖'])].count()

 

Out[93]:

Link 1625

Rate 1625

Title 1625

Tags 1625

Country 1625

RateCollected 0

MovieID 1625

dtype: int64

In [111]:

#Check whether all 1625 movies is in m_Dict, and clean the movies that are not in m_Dict
for m_h in horror_and_terrified_movie:
    if m_h in m_Dict.keys():
        continue
    else:
        print(m_h)
        horror_and_terrified_movie.remove(m_h)
print(len(horror_and_terrified_movie))

1124

观测结果,在我们参与训练的电影库(26004部)中有1124部是符合这两个类型的。我们进一步把100020528这位用户点评过的电影从这1124部中移除出去。

In [137]:

m_rated = []
for el_index in df_movie_preference.index:
    m_rated.append(df_movie_preference.at[el_index, 'MovieID'])

 

In [141]:

for m_h in horror_and_terrified_movie:
    if m_h in m_rated:
        print(m_h)
        horror_and_terrified_movie.remove(m_h)
print(len(horror_and_terrified_movie))

1104

 

最后我们获得了共计1104部电影的清单。针对这位用户,以及这1104部电影的清单重新生成预测数据矩阵。预测数据矩阵为1104✖️79906阶。

In [143]:

filtered_user_inference = createInferenceForUserPreference('100020528', horror_and_terrified_movie, user_number+movie_number)
print(filtered_user_inference.shape)

(1104, 79906)

 

使用这一矩阵进行预测。

In [149]:

%%time
filtered_response = callEndpointInference_noid(filtered_user_inference)

CPU times: user 13.2 s, sys: 879 ms, total: 14.1 s

Wall time: 40.9 s

预测用时40.9秒,预测结果如下:

In [154]:

showResultfromDataFrame(df_filtered_counter)

杀人鳄鱼潭 7.4 0.8682911992073059

没有秘密 6.5 0.8636484742164612

热血青年 6.5 0.8625574111938477

驱魔警察 7.4 0.8622710108757019

鬼玩人 7.0 0.8613290786743164

一千灵异夜之灵魂实验 7.0 0.8611863851547241

连体 5.4 0.8586516976356506

僵尸城市 6.3 0.8585190176963806

35楼的生存游戏 6.7 0.8584233522415161

危险工作 6.5 0.8558875918388367

 

从预测结果中可以直观感受到,更贴近用户历史行为的喜好。

 

应用稀疏数据格式优化响应时间

当然40.9秒的预测时间在实际中依然过长,这是由于在本例的演示中,预测数据全部采用json dense格式传递,造成数据量大,只能分批预测和返回。实际应用中可以采用FM接受的json sparse数据格式。

我们使用最后的用户示例演示一下json sparse格式传递预测数据获得结果的过程以及对这一格式整个响应时间做个测量。

重新定义Invoke Endpoint的方法以及内部构成数据的格式如下:

In [298]:

def slicedInferenceUserPreference(userid, step, movielist):
    import json
    #total_line = 26004 #movie_number
    
    start = 0
    response = []
    line = 0
    response = []
    
    while start <= len(movielist)+1:
        js={'instances': []}
        for line in range(0, min(int(step), len(movielist)-start)):
            js['instances'].append({'data':{'features': {
                'keys':[int(u_Dict[userid]), 53902+m_Dict[movielist[line+int(start)]]],
                "shape":[79906],
                "values":[1,1]}}})
        
        #print(json.dumps(js))
        raw_response = client.invoke_endpoint(
                EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
                Body = json.dumps(js),
                ContentType='application/json')
        result = json.loads(raw_response['Body'].read())
        for counter, p in enumerate(result['predictions']):
            p['MovieID'] = movielist[counter]
            response.append(p)
        print(start+line)
        start = start+step    
    return(response)

In [299]:

%%time
test_user_preference_response = slicedInferenceUserPreference('100020528', 1000, horror_and_terrified_movie)

999

1103

CPU times: user 21.8 ms, sys: 519 µs, total: 22.3 ms

Wall time: 5.93 s

 

运行结果共计耗时5.93秒,且其中大部分是由于网络延时,已经可以考虑在实际中应用了。预测结果也与之前保持一致。

In [301]:

showResultfromDataFrame(preference_response)

杀人鳄鱼潭 7.4 0.8682911992073059

没有秘密 6.5 0.8636484742164612

热血青年 6.5 0.8625574111938477

驱魔警察 7.4 0.8622710108757019

鬼玩人 7.0 0.8613290786743164

一千灵异夜之灵魂实验 7.0 0.8611863851547241

连体 5.4 0.8586516976356506

僵尸城市 6.3 0.8585190176963806

35楼的生存游戏 6.7 0.8584233522415161

危险工作 6.5 0.8558875918388367

 

从本次的实验示例中可以感受到,SageMaker作为AWS平台上机器学习及人工智能的核心服务之一,极大地简化了我们准备数据、分析数据、构建模型、验证模型、参数调优和部署模型的整个工作流程。使用SageMaker提供的算法,更可以使拥有大量数据资产的用户以极快的速度发挥数据的价值,在应用中引入机器学习或人工智能。

使用AWS Sagemaker系列文章:

第一篇:使用AWS Sagemaker训练因子分解机模型并应用于推荐系统

第二篇:使用AWS Sagemaker部署的终端节点进行推荐预测的常用场景(本博文)

————

本篇作者

崔辰

AWS大中华区创新中心技术业务拓展经理。加入AWS之前,崔辰在中国惠普、IBM、微软以及海航科技等公司担任过售前技术顾问、市场经理和战略合作经理等职务。在10多年的科技领域工作经历中,崔辰服务过众多企业级客户。