Amazon Web Services ブログ

新着アイテムを SageMaker Feature Store へ登録するニアリアルタイムパイプラインの構築

ブログやニュース記事において情報の鮮度は大切です。ユーザーに記事をレコメンドする場合、いち早く新着記事をレコメンドに含めることで PV 数の獲得やユーザー体験向上につながります。近年では Factorization Machines などの機械学習を利用したレコメンド手法が用いられることが増えています。機械学習を利用した手法で投稿されたばかりの記事などの新着アイテムを即座にユーザのレコメンドアイテムに含めるためには、投稿されたアイテムの特徴量をニアリアルタイムで抽出し、保存しておく必要があります。一般的にユーザーのリクエストからレコメンド結果を返す際には、レスポンスの速度を上げるために、アイテムの特徴量データを Amazon ElastiCache for Redis や Amazon DynamoDB などの Key-Value ストアに格納します。一方でモデルの学習を行う際は、大規模なデータを扱いやすいデータウェアハウスやデータレイクなどの場所に特徴量を格納する方法が適しています。これらの用途の異なるデータストア間で整合性を保ったまま逐次同期を取るシステムを構築することは大変です。新着アイテムに対して学習時とリアルタイム推論時にそれぞれ個別に前処理や特徴量生成の計算を行うことも可能ですが、計算基盤が異なることで学習時と推論時の結果が異なってしまう恐れがあります。
以上のような問題の解決のため Amazon SageMaker Feature Store を利用することができます。SageMaker Feature Store は特徴量を管理するための Feature Store を構築するマネージドサービスであり、リアルタイム推論に適したオンラインストアと学習時など大規模なデータを扱うことに適したオフラインストアを簡単に構築できます。また、自動的にオンラインストアとオフラインストアの間でデータの同期を行うことも可能です。Feature Store を導入することで、特徴量生成までの前処理基盤と学習推論基盤を疎結合とすることができるためデータの品質を一定に保つことができます。また、Feature Store が複数のプログラミング言語に対応したインターフェースを提供することにより、開発者は各基盤での開発言語を自由に選択できるようになります。

本記事では新着アイテムを含めたユーザーへのアイテムレコメンド基盤のアーキテクチャデザインをご紹介します。またその中で、新着アイテムの特徴量抽出を含む前処理計算と Feature Store へのデータインプットを行う、ニアリアルタイムパイプラインのアーキテクチャを Amazon Kinesis Data Streams、AWS Lambda、Amazon SageMaker Feature Store を利用して実装します。

アーキテクチャデザイン

下図に新着アイテムをユーザーのレコメンドにニアリアルタイムで含めるための含めるまでの処理の流れを示します。

  1. Publisher がアイテムを投稿するとアイテムの前処理と特徴量抽出が行われ、特徴量をオンラインストアに格納するパイプラインが実行されます。
  2. オンラインストアに格納された特徴量は自動的にオフラインストアへ同期され、継続的にレコメンドモデルを学習する際の学習データとして利用されます。オフラインストアのデータを使って学習したモデルをデプロイして推論エンドポイントを公開します。
  3. 大抵の場合、新着アイテムがレコメンドに反映されるためにはモデルの学習が完了するまでのラグがあるため、デプロイされるまでの間も新着アイテムを表示できるよう設計します。公開したレコメンド用の API が、クライアントからのリクエストを受け取るとオンラインストアから学習時にモデルに含まれていない新着アイテムのリスト、推論エンドポイントからのユーザの行動履歴に基づいた推論結果の2つをまとめてレコメンドアイテムとしてレスポンスします。新着アイテムに関しては、リクエストに対してクライアントのコンテキスト情報に基づいた新着アイテムのリストを返すことを想定しています。例えばユーザーが閲覧しているアイテムのカテゴリーやジャンルなどです。アイテム ID 内に特定の値や文字列としてこれらのメタデータを付与する方法も考えられますが、今回はジャンルごとにリストを作成します。このブログでは扱いませんが、新着アイテムと事前に推論済みの結果をまとめる方法として例えば Ranking Model のアルゴリズムを使う方法などが考えられます。

このブログでは上記 1 の処理に該当する、Publisher がアイテムを投稿し、オンラインストアにデータの格納するまでのニアリアルタイムパイプライン部分の実装方法を紹介します。

  1. まず Publisher がアイテムを投稿するイベントに応じて Kinesis Data Streams へ raw データのレコードを登録します。
  2. その後、ストリームにレコードが追加されたことを検知した Lambda によってアイテムデータの前処理と特徴量抽出を行いデータを整形します。
  3. 最後に Lambda から整形の終わった特徴量を SageMaker Feature Store のオンラインストアへ格納します。

実装

SageMaker Feature Store での Feature Group の作成

セットアップ

はじめに今回利用する Feature Group を作成します。今回は SageMaker Studio 上の Notebook にて、AWS SDK を利用したPython コードベースで作成していきます。SageMaker Studio のセットアップについてはこちらをご参照ください。
また、今回は SageMaker Studio から Domain に付与された実行ロールを使用して SageMaker Feature Store の API にアクセスします。実行ロールには以下の IAM ポリシーが必要です。

SageMaker に必要な IAM ロールについてはこちらをご参照ください。
また、今回は後半で Kinesis Data Streams に対してレコードを投入します。SageMaker Studio の Notebook 上から Kinesis Data Streams にアクセスする場合には以下の IAM ポリシーも追加でアタッチする必要があります。

Feature Store の使用を開始するには、SageMaker、boto3、および Feature Store のセッションからそれぞれインスタンスを作成します。

import sagemaker
import boto3
import json
import pandas as pd
import time
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
default_bucket = sagemaker_session.default_bucket()

boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

特徴量の設計

ここから Feature Group を作成していきます。SageMaker Feature Store では各特徴量は Feature Group という単位で管理されます。Feature Group 内には Identifier と EventTime のカラムが必須です。これらを指定して特徴量セットのレコードを取り出します。また、Feature Group 内で定義できる特徴量のデータ型は Integral、Fractional、Stringの3種類です。
注意すべき点はオンラインストアが Identifier ごとに1つの最新のレコードしか保存しないため EventTime による過去のレコードの取得を行えない点と、オンラインストアから EventTtime を指定してレコードを取得できない点です。同様に現状SageMaker Feature Store では Identifier 以外でのレコードの絞り込みができないので、今回は Genre を Identifier とした最新のアイテムをリストとして保存するための Feature Group を別途作成する設計をします。

以下の手順では異なる方法で2つの Feature Group を作成します。1つ目はデプロイされているモデルに含まれていない新着アイテムをカテゴリー(Genre)ごとに管理する Feature Group である latest-items-feature-group、2つ目はアイテム全体の Feature Group である items-feature-group です。latest-items-feature-group はIdentifier を GENRE とし、10個の新着アイテムの ITEM-ID を含んだリストを String 形式で保存します。items-feature-group は Identifier を ITEM_ID とし、TITLE と GENRES をメタデータとして保存します。
以下の手順では、初めに latest-items-feature-group を boto3 クライアント を利用した AWS SDK を利用して新規に作成します。次に items-feature-group を AWS SageMaker Python SDK を利用して CSV を元に Feature Group を作成したあと、データのインポートを行います。今回はこちらのデータセットを利用します。

AWS SDKを利用したFeature Groupの作成とレコードの確認

はじめに new-items の Feature Group を作成します。上記の MovieLens の ml-latest-small の movie データセットには全部で 20 個のカテゴリがあります。Feature Group を AWS SDK を利用して作成した後、Animation のためのレコードを投入します。
初めに Feature Groups 内のスキーマを定義する Feature Definition をリストとして作成します。

feature_definition = [
    {'FeatureName': 'GENRE', 'FeatureType': 'String'}, 
    {'FeatureName': 'LATEST_ITEM_LIST', 'FeatureType': 'String'}, 
    {'FeatureName': 'TIMESTAMP', 'FeatureType': 'Fractional'}
]

次に Feature Group で利用する変数を設定します。record_identifier と event_time は Feature Group に必須なパラメータです。

feature_group_name = 'latest-items-feature-group'# feature group name
record_identifier_name = 'GENRE' # from feature_def, this time is LATEST_ITEM.
event_time_name = 'TIMESTAMP' # from feature_def, this time is TIMESTAMP.
feature_description = "feature group for new items of Animation genre"

offline_feature_store_bucket = f"s3://{sagemaker_session.default_bucket()}/realtime-data-input"
offline_config = {"OfflineStoreConfig":{"S3StorageConfig": {
     "S3Uri": offline_feature_store_bucket
}}}

実際に Feature Group の作成します。CreateFeatureGroup API を利用します。

sagemaker_client.create_feature_group(
        FeatureGroupName = feature_group_name,
        RecordIdentifierFeatureName = record_identifier_name,
        EventTimeFeatureName = event_time_name,
        FeatureDefinitions = feature_definition,
        Description = feature_description,
        OnlineStoreConfig = {'EnableOnlineStore': True},
        RoleArn = role,
        **offline_config)

以下の方法で Feature Group の詳細を確認できます。

sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)

次に、作成した Feature Group にデータの投入と取得を行い、Feature Store の挙動を確認します。
まずは feature store のセッションを持ったインスタンスを用いて、PutRecord API でオンラインストアへ特徴量レコードを投入します。

genre = 'Animation'
latest_item_list = [182639, 183397, 187541, 190219, 193565, 193567, 193573, 193581, 193583, 193587]
current_time_sec = round(time.time())

featurestore_runtime.put_record(
    FeatureGroupName=feature_group_name,
    Record=[
        {'FeatureName': 'GENRE', 'ValueAsString': genre}, 
        {'FeatureName': 'LATEST_ITEM_LIST', 'ValueAsString': str(latest_item_list)}, 
        {'FeatureName': 'TIMESTAMP', 'ValueAsString': str(current_time_sec)}
    ]
)

次に GetRecord API でさきほど投入したレコードを取得します。

featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=genre
)

以下のようなレスポンスを確認できたら成功です。

{'ResponseMetadata': {'RequestId': 'c8ea22b1-053f-416a-be1d-91359511327b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c8ea22b1-053f-416a-be1d-91359511327b',
   'content-type': 'application/json',
   'content-length': '255',
   'date': 'Fri, 18 Mar 2022 04:05:24 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'GENRE', 'ValueAsString': 'Animation'},
  {'FeatureName': 'LATEST_ITEM_LIST',
   'ValueAsString': '[182639, 183397, 187541, 190219, 193565, 193567, 193573, 193581, 193583, 193587]'},
  {'FeatureName': 'TIMESTAMP', 'ValueAsString': '1647576320'}]}

SageMaker Python SDK を利用した CSV からの Feature Group の作成とレコードの確認

次に MovieLens のデータセットの中から movies.csv をアイテムデータセットとして Feature Store にインポートしていきます。SageMaker Python SDK は AWS SDK をラップしている SageMaker のための開発ライブラリで、SageMaker 全般の機能を抽象化し、簡単に実装することができます。
初めに Feature Store に必要なコンポーネントの準備を行います。

from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import (
    FeatureDefinition,
    FeatureTypeEnum,
)


feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Pandas のデータフレームで CSV ファイルを読み込みます。データセットは movieId、title、genres の3つのカラムから構成されています。

df_items = pd.read_csv('ml-latest-small/movies.csv')

必要に応じてデータを加工します。今回はカラム名と、Feature Group に登録する際に必要な EventTime の値となる TIMESTAMP を追加しています。
データの準備が終わったら実際に Feature Group を作成していきます。必要な変数を定義した後、FeatureDefinition を作成します。先ほどは手動でスキーマを作成しましたが、今回は CSV のカラムとセルの値に基づいて動的に FeatureDefinition を生成します。

feature_group_name = 'items-feature-group'
feature_description = "feature group for all items"
record_identifier_name = 'ITEM_ID'
event_time_name = 'TIMESTAMP'

items_feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
feature_definitions = []
for column in df_items.columns:
    feature_type = items_feature_group._DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get(
        str(df_items[column].dtype), None
    )
    if not feature_type:
        feature_type = FeatureTypeEnum.STRING
    feature_definitions.append(
        FeatureDefinition(column, feature_type)
    )  # You can alternatively define your own schema
    
    
items_feature_group.feature_definitions = feature_definitions

変数と生成した FeatureDefinition を用いて、Create メソッドから Feature Group を作成します。

items_feature_group.create(
    description = feature_description,
    record_identifier_name = record_identifier_name,
    event_time_feature_name = event_time_name,
    role_arn = sagemaker.get_execution_role(),
    s3_uri = offline_feature_store_bucket,
    enable_online_store = True)

以下の方法で作成した Feature Group の確認を行えます。今回作成した latest-items-feature-group と items-feature-group の2つを確認できます。後ほど利用するため、表示された FeatureGroupArn の値をそれぞれメモしてください。

sagemaker_client.list_feature_groups()

次に、作成した items-feature-group に対して CSV のデータをインポートします。SageMaker Python SDK では ingest メソッドを利用することで Pandas DataFrame 形式のデータを直接 Feature Group に入力することができます。

items_feature_group.ingest(data_frame=df_items, max_workers=16, max_processes=16, wait=True)

入力が完了したら items-feature-group からデータを取得してみます。BatchGetRecord API を利用することでオンラインストアから複数レコードを取得します。Record Identifier に複数の値を指定することで複数のレコードの特徴量をまとめて取得できます。

featurestore_runtime.batch_get_record(
    Identifiers=[
        {
            'FeatureGroupName': feature_group_name,
            'RecordIdentifiersValueAsString': [
                '1', '2', '3'
            ],
            'FeatureNames': [
                'TITLE',
            ]
        },
    ]
)

以下のようなレスポンスを確認できたら成功です。

{'ResponseMetadata': {'RequestId': 'cae47135-8038-407d-a598-a741ce6cc340',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cae47135-8038-407d-a598-a741ce6cc340',
   'content-type': 'application/json',
   'content-length': '505',
   'date': 'Fri, 18 Mar 2022 04:10:08 GMT'},
  'RetryAttempts': 0},
 'Records': [{'FeatureGroupName': 'items-feature-group',
   'RecordIdentifierValueAsString': '2',
   'Record': [{'FeatureName': 'TITLE', 'ValueAsString': 'Jumanji (1995)'}]},
  {'FeatureGroupName': 'items-feature-group',
   'RecordIdentifierValueAsString': '3',
   'Record': [{'FeatureName': 'TITLE',
     'ValueAsString': 'Grumpier Old Men (1995)'}]},
  {'FeatureGroupName': 'items-feature-group',
   'RecordIdentifierValueAsString': '1',
   'Record': [{'FeatureName': 'TITLE', 'ValueAsString': 'Toy Story (1995)'}]}],
 'Errors': [],
 'UnprocessedIdentifiers': []}

オフラインストアに同期された特徴量データの確認(Option)

上記の手順でオンラインストアから特徴量を取得することができました。オフラインストアの実体は Amazon Simple Storage Service(S3)であるため、S3 に Parquet 形式で保存されている特徴量を実際に確認できます。PutRecord でレコードを投入してから15分以内にオンラインストアからオフラインストアへデータが自動的に同期されます。これらの特徴量はデフォルトの設定では AWS Glue Data Catalog によって管理されているため、Amazon Athena のクエリを利用して値を取得することができます。

Kinesis Data Streams のデータストリームの作成

Kinesis Data Streams を利用してデータストリームを作成します。今回はマネジメントコンソール上から作成しますが、 AWS CLI や SDK を利用してリソースを作成することも可能です。詳しくはこちらをご参照ください。

今回はストリーム名とシャードの数を入力します。今回はオンデマンドモードを利用し、データストリーム名は以下の値とします。

  • データストリーム名:任意の名前(今回は realtime-data-input )

それ以外はデフォルトのままデータストリームを作成します。作成後ステータスが Active になるまで待ちましょう。Kinesis Data Stream の安全性を高めるためにストリームの作成後は設定からサーバー側の暗号化を有効にしてください。また、後ほど利用するため、データストリームの概要に表示された ARN の値をメモしてください。

Lambda 関数の作成

最後に Kinesis Data Streams からレコードを取得し、Feature Store へ特徴量の格納を行う Lambda 関数を作成します。実際に利用する際には Lambda 内で特徴量の前処理を行うことも可能ですし、必要に応じて Lambda から AWS サービスを呼び出して利用することもできます。VPC 内に Lambda を展開することで、プライベートアクセスを通して VPC 内のリソースや Direct Connect 経由でオンプレミスに展開されている API を利用することができます。
Lambda 関数の VPC への展開方法についてはこちらをご確認ください。また、閉域ネットワーク内で SageMaker を利用する方法についてはこちらをご確認ください。

Lambda で利用する IAM ロールの作成

AWS Lambda から AWS サービスにアクセスするためには適切な権限を設定した IAM ロールを作成する必要があります。Lambda で利用する IAM ロールの作成方法の詳細はこちらをご確認ください。
今回は Lambda 関数が Kinesis Data Streams からレコードを取得し、Feature Store にデータを投入する権限を付与するために以下のインラインポリシーをコピーしてアタッチしてください。 その際に各 Resource の ARN は前の手順でコピーした値をそれぞれ用いてください。コピーの際にはインデントなど JSON の形式が崩れないように注意してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sagemaker:PutRecord",
                "sagemaker:GetRecord"
            ],
            "Resource": [
                "arn:aws:sagemaker:ap-northeast-1:000000000000:feature-group/latest-items-feature-group",
                "arn:aws:sagemaker:ap-northeast-1:000000000000:feature-group/items-feature-group"
            ],
            "Effect": "Allow",
            "Sid": "SageMakerFeatureStore"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards",
                "kinesis:ListStreams",
                "kinesis:SubscribeToShard",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/realtime-data-input"
        }
    ]
}

コピーが成功したらポリシーの確認を選択し、適当な名前をつけて保存します。

関数の作成

実際に Lambda 関数を作成します。コンソールからの Lambda 関数の作成の詳細はこちらからご確認ください。今回は一から作成を選択して以下のように情報を入力します。

  • 関数名:任意の名前(今回は RealtimeFeatureStoreInput )
  • ランタイム:Python 3.9
  • アーキテクチャ:x86_64
  • 実行ロール:既存のロール(既存のロールから前の手順で作成した IAM ロールを選択してください。)

以上の入力が完了したら右下の関数の作成を選択してください。これで Lambda 関数が作成されました。

関数が作成されたらコードソースに以下のコードをペーストしたあと Deploy をしてください。その後、変更が保存されたことを確認してください。

from collections import deque
import json
import base64
import sys
from datetime import datetime
import time
import ast

import boto3

# Define sagemaker session
try:
    sm_fs = boto3.Session().client(service_name='sagemaker-featurestore-runtime')
except:
    print(f'Failed while connecting to SageMaker Feature Store')
    print(f'Unexpected error: {sys.exc_info()[0]}')

print(f'boto3 version: {boto3.__version__}')

# Set feature group name
items_feature_group_name = 'items-feature-group'
latest_items_feature_group_name = 'latest-items-feature-group'


# Get past-latest-items list by genre
def get_past_items(feature_group_name, genre):
    # get latest items by the genre
    past_latest_items = sm_fs.get_record(
        FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=genre
    )
    # if the genre does not exist, make a new feature record
    try:
        return ast.literal_eval(past_latest_items['Record'][1]['ValueAsString'])
    except:
        past_latest_items = [None, None, None, None,
                             None, None, None, None, None, None]
        return past_latest_items


# Feature Update Function
def update_items_feature(feature_group_name, item_id, title, genres, current_time_sec):
    record = [
        {'FeatureName': 'ITEM_ID', 'ValueAsString': str(item_id)},
        {'FeatureName': 'TITLE', 'ValueAsString': title},
        {'FeatureName': 'GENRES', 'ValueAsString': genres},
        {'FeatureName': 'TIMESTAMP', 'ValueAsString': str(current_time_sec)}
    ]
    # input features to SageMaker Feature Store
    sm_fs.put_record(FeatureGroupName=feature_group_name, Record=record)
    print("Item features input sucsess!")
    return


def update_latest_items_feature(feature_group_name, item_id, genres, current_time_sec):
    # split genres to list items
    list_genres = genres.split('|')

    for i in list_genres:
        past_latest_items = deque(get_past_items(feature_group_name, i))
        # delete oldest item in list
        past_latest_items.popleft()
        # append this item in list
        past_latest_items.append(item_id)

        latest_items_list = list(past_latest_items)
        record = [
            {'FeatureName': 'GENRE', 'ValueAsString': i},
            {'FeatureName': 'LATEST_ITEM_LIST',
                'ValueAsString': str(latest_items_list)},
            {'FeatureName': 'TIMESTAMP',
                'ValueAsString': str(current_time_sec)}
        ]
        # input features to SageMaker Feature Store
        sm_fs.put_record(FeatureGroupName=feature_group_name, Record=record)
        print(str(i) + " list feature input sucsess!")
    return


# Main Handler
def lambda_handler(event, context):
    records = event['Records']

    ret_records = []
    for rec in records:
        data = rec["kinesis"]["data"]
        agg_data_str = base64.b64decode(data)
        agg_data = json.loads(agg_data_str)

        item_id = agg_data['item_id']
        title = agg_data['title']
        genres = agg_data['genres']
        current_time_sec = agg_data['time']

        update_items_feature(items_feature_group_name, item_id, title, genres, current_time_sec)
        update_latest_items_feature(
            latest_items_feature_group_name, item_id, genres, current_time_sec)

        print(
            f' updating features ITEM_ID: {item_id}, TITLE: {title}, GENRES: {genres}')

    return

update_items_feature 関数で items-feature-group、update_latest_items_feature 関数で latest-items-feature-group にそれぞれ新着アイテムのレコードを格納します。update_latest_items_feature 関数内では genre が複数ある場合は分割し、それぞれのgenre の10個の新着アイテムをリスト形式から String に変換して格納しています。新しい genre が追加された場合には空のリストの中に新着アイテムを追加し、新しい genre のレコード作成して latest-items-feature-group に格納します。

トリガーの追加

最後にLambda関数のトリガーの設定を行います。トリガーを追加から以下のように変更してください。

  • トリガーリソース:Kinesis
  • ストリーム:Kinesis Data Stream で作成したストリーム名(今回は realtime-data-input )
  • バッチサイズ:1

Kinesis が Lambda 関数の画面上に表示されたら成功です。

動作テスト

最後に Kinesis Data Streams に新着アイテムのレコードを投入し、投入されたレコードが Feature Store に格納されているか確認を行います。
初めに Kinesis のセッションを持った boto3 インスタンスを作成します。

kinesis_client = boto3.client('kinesis')

次に Kinesis Data Streams にレコードを投入していきます。payload にレコードとなる値を格納し、Kinesis クライアント上から PutRecord API で新着アイテムのデータを投入します。Lambda 関数の挙動の確認のため、genres は Animation と Fantasy の2種類とします。今回は手動で投入しますが、本番環境上ではサーバー側から新着アイテムの情報を Kinesis Data Streams に投入します。

item_id = 193611
current_time_sec = round(time.time())

payload = {
    'item_id': item_id,
    'title': 'Dobinmushi',
    'genres': 'Animation|Fantasy',
    'time': current_time_sec,
}
data = json.dumps(payload)
response = kinesis_client.put_record(StreamName = STREAM_NAME,
                                         Data = data,
                                         PartitionKey = '1')

次に投入したデータが Feature Store 上に格納されているか確認します。これまでの手順が成功していれば、Kinesis Data Streams に投入されたレコードは Lambda 関数によって items-feature-group と latest-items-feature-group にそれぞれ格納されているはずです。
はじめに items-feature-group の特徴量を確認します。次のコードを実行すると投入したアイテムのレコードが feature group に存在することが確認できると思います。Lambda 関数のコールドスタートが原因でデータの反映が若干遅くなることがあるかもしれません。

feature_group_name = 'items-feature-group'
featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=str(item_id)
)

以下のようなレスポンスが確認できたら成功です。

{'ResponseMetadata': {'RequestId': '1ab47029-ccad-48d8-a705-563574e7935e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1ab47029-ccad-48d8-a705-563574e7935e',
   'content-type': 'application/json',
   'content-length': '234',
   'date': 'Fri, 18 Mar 2022 04:33:52 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'ITEM_ID', 'ValueAsString': '193611'},
  {'FeatureName': 'TITLE', 'ValueAsString': 'Dobinmushi'},
  {'FeatureName': 'GENRES', 'ValueAsString': 'Animation|Fantasy'},
  {'FeatureName': 'TIMESTAMP', 'ValueAsString': '1647577855'}]}

次に latest-items-feature-group の特徴量をを確認します。Record Identifier である genre が Animationの場合は最初に作成されたリストから古いアイテムが1つ削除され、今回投入した新しいアイテムが末尾に追加されていることが確認できます。

feature_group_name = 'latest-items-feature-group'
genre = 'Animation'
featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=genre
)
{'ResponseMetadata': {'RequestId': 'd43e1ff0-e7b2-4395-b7ca-82e86129ecf9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd43e1ff0-e7b2-4395-b7ca-82e86129ecf9',
   'content-type': 'application/json',
   'content-length': '255',
   'date': 'Fri, 18 Mar 2022 04:36:39 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'GENRE', 'ValueAsString': 'Animation'},
  {'FeatureName': 'LATEST_ITEM_LIST',
   'ValueAsString': '[183397, 187541, 190219, 193565, 193567, 193573, 193581, 193583, 193587, 193611]'},
  {'FeatureName': 'TIMESTAMP', 'ValueAsString': '1647577855'}]}

一方で genre が Fantasy の場合はレコードが新たに作成されるため、リストの末尾の最新アイテム以外は全て None となっていることが確認できます。

feature_group_name = 'latest-items-feature-group'
genre = 'Animation'
featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=genre
)
{'ResponseMetadata': {'RequestId': 'b6c0ed5a-f1ad-4ecc-86e6-003c9ef35e09',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b6c0ed5a-f1ad-4ecc-86e6-003c9ef35e09',
   'content-type': 'application/json',
   'content-length': '235',
   'date': 'Fri, 18 Mar 2022 04:39:03 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'GENRE', 'ValueAsString': 'Fantasy'},
  {'FeatureName': 'LATEST_ITEM_LIST',
   'ValueAsString': '[None, None, None, None, None, None, None, None, None, 193611]'},
  {'FeatureName': 'TIMESTAMP', 'ValueAsString': '1647577855'}]}

クリーンアップ

今回作成したリソースは料金が発生します。不必要な場合は以下のリソースを削除してください。

  • 作成した Feature Group の削除
b = sagemaker_client.list_feature_groups().get('FeatureGroupSummaries')
print(len(b))
for i in b:
    c = i['FeatureGroupName']
    sagemaker_client.delete_feature_group(FeatureGroupName=str(c))
    print(c)
  • Feature Group で利用した S3 バケットの削除
  • 作成した Kinesis Data Streams のストリームの削除
  • 作成した Lambda 関数の削除

まとめ

本記事ではユーザーへのアイテムレコメンド基盤のための新着アイテムを登録するニアリアルタイムパイプラインを Kinesis Data Streams、Lambda、SageMaker Feature Storeを利用して実装しました。このパイプラインを利用することで、新着アイテムを一早くレコメンドに含めることができます。
SageMaker Feature Store のさらなる利用や、リクエストへの Feature Store のレコードを利用したレンスポンス方法の実装の詳細については以下のコンテンツを追加でご覧ください。

 


著者について

黒澤 蓮 (Ren Kurosawa) は AWS Japan のソリューションアーキテクトで、Web 業界のお客様を中心にアーキテクチャ設計や構築をサポートしています。データアナリティクスサービスや機械学習の領域を得意としています。将来の夢は宇宙でポエムを詠むことです。