背景&引言
众所周知,AI 算法模型开发落地有三个主要阶段:数据准备、模型训练、模型部署。目前已经有较多厂商及开源社区推出通用的AI、MLOps平台支撑模型训练与部署阶段,但主要偏重于机器学习模型开发,部署,服务层面,自 2019 年后才陆续有各厂商推出数据准备支撑阶段的产品及服务,即特征平台(如AWS Sagemaker feature Store)。
特征平台的主要能力包含:特征注册中心、离线存储&消费、在线存储&消费、离线&在线特征同步,特征版本,尤其特征版本最为重要,实现特征point-in-time cross join,避免特征穿越造成train-server skew的重要功能特性。
各个厂商在特征平台的架构和实现方式方面迥然不同,缺乏跨平台的通用的特征库方案。
Feast (Feature Store) 是一套开源特征库框架,纯 python 框架,与 Pandas dataframe 无缝集成,对ML,AI算法工程师友好,它提供了在线,离线特征库注册,特征库存储,特征数据摄取、训练数据检索、特征版本、离线-在线特征同步等功能;且具有云原生亲和力,可以构建在多个公有云平台上。
本文介绍了Feast框架的整体架构及设计思路,并step by step详细说明了Feast on AWS集成和使用,包括安装部署离线/在线特征库、使用特征库、特征库同步的方法等。对于使用Feast开源框架构建MLOps平台的用户,本文可以作为快速构建和开发指南。
Feast 整体架构
Feast的主要功能组件:
- Feast Repo&Registry:轻量级的目录级及 Split 文件数据库格式 Repository,用于特征库基础设施及元数据注册
- Feast Python SDK/CLI: 开发构建及使用特征库的主要功能组件
- Feast Apply:命令行工具执行安装部署配置的特征库到底层基础设施,并且注册特征库元数据到 Runtime 运行态
- Feast Materialize:离线-在线特征库版本同步工具
- Get Online Features:在线特征数据提取,调用对应的在线特征库基础设施 API 抽取特征数据,用于模型推断
- Get Historical Features:离线特征数据抽取,调用对应的离线特征库基础设施 API 抽取历史特征数据,用于模型训练或者特征组合
- Online Store: 在线特征库,根据不同云厂商的 nosql 数据库承载,存储特征快照版本数据
- Offline Store:离线特征库,根据不同云厂商数仓承载,存储特征历史版本数据
Feast On AWS 安装部署方案
依赖准备
- Feast on AWS 使用 Redshift 作为离线特征库,需要Redshift集群(如果采用Spectrum 外部表,还需要 Spectrum 角色及Glue Catalog 权限)
- Feast on AWS 使用DynamoDB 作为在线特征库,需要 DynamoDB 读写权限
- 可以用 Terraform 或者 CloudFormation 准备需要的 Redshift,DDB,IAM 角色等
- 以下使用 Terraform 为例安装部署 Feast 需要的Redshift,S3,IAM 角色等各种基础设施
1) 安装部署 Terraform
sudo yum install python3-devel
sudo yum install -y yum-utils
sudo yum-config-manager —add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo
sudo yum -y install terraform
2) 编写 Terraform 配置文件
project: feast_aws_repo
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-southeast-1
offline_store:
type: redshift
cluster_id: feast-demo2-redshift-cluster
region: ap-southeast-1
database: flinkstreamdb
user: awsuser
s3_staging_location: s3://feastdemobucket
iam_role: arn:aws:iam::**********:role/s3_spectrum_role
3) 构建基础设施
cd infra
sudo terraform init
sudo terraform plan -var="admin_password=xxxxx"
sudo terraform apply -var="admin_password=xxxxx"
4) 如果需要 Spectrum 承载离线特征库,需要在 Redshift 中建立 Spectrum 外部 schema,以便指向Glue Catalog 中的 s3 外部表
aws redshift-data execute-statement \
—region ap-southeast-1 \
—cluster-identifier feast-demo-redshift-cluster \
—db-user awsuser \
—database dev —sql "create external schema spectrum from data catalog database 'flinkstreamdb' iam_role \
'arn:aws:iam::**********:role/s3_spectrum_role' create external database if not exists;“
Feast 特征库 Repository 准备
1) 依赖安装及升级
pip3 install -U numpy==1.21
pip3 install feast[aws]
2) 初始化 repository
feast init -t xxxxx(repository_name)
AWS Region (e.g. us-west-2): ap-southeast-1
Redshift Cluster ID: feast-demo-redshift-cluster
Redshift Database Name: flinkstreamdb
Redshift User Name: awsuser
Redshift S3 Staging Location (s3://*): s3://feastdemobucket
Redshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::xxxxxx:role/s3_spectrum_role
创建好的特征库的 schema 及骨架示例:
$ tree ./feast_aws_repo/
./feast_aws_repo/
├── data
│ └── registry.db
├── driver_repo.py
├── feature_store.yaml
- *.yam l配置指定 Feast repository 的基础环境资源(s3、Redshift、DDB 等)
- *.py 配置特征库元数据,特征v iew 及 schema 等
- db 保存基于 *.py 元数据构建后的特征组,特征库对象实例,以便运行态使用
安装部署后的feature_store.yaml示例:
project: feast_aws_repo
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-southeast-1
offline_store:
type: redshift
cluster_id: feast-demo2-redshift-cluster
region: ap-southeast-1
database: flinkstreamdb
user: awsuser
s3_staging_location: s3://feastdemobucket
iam_role: arn:aws:iam::xxxxxxx:role/s3_spectrum_role
driver_repo 的司机行程特征库元数据示例:
from datetime import timedelta
from feast import Entity, Feature, FeatureView, RedshiftSource, ValueType
driver = Entity(
name="driver_id",
join_key="driver_id",
value_type=ValueType.INT64,
)
driver_stats_source = RedshiftSource(
table="feast_driver_hourly_stats",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(weeks=52),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
batch_source=driver_stats_source,
tags={"team": "driver_performance"},
)
部署成功后可以在Redshift看到离线特征库的 Spectuam schema 及库表,DDB中可以看到在线特征库的表
Redshift 离线特征库:
DDB在线特征库:
使用Feast SDK API进行特征库操作
连接特征库
安装部署完成后,在 python 代码中,可以方便的通过加载注册的 repository 路径,来连接到特征库及特征组
在 repository 中注册的特征组,也可以直接 import 实例化
from datetime import datetime, timedelta
import pandas as pd
from feast import FeatureStore
from driver_repo import driver, driver_stats_fv
fs = FeatureStore(repo_path="./")
>>> print(fs)
<feast.feature_store.FeatureStore object at 0x7f48d47098d0>
>>> print(driver_stats_fv)
{
"spec": {
"name": "driver_hourly_stats",
"entities": [
"driver_id"
],
"features": [
{
"name": "conv_rate",
"valueType": "FLOAT"
},
{
"name": "acc_rate",
"valueType": "FLOAT"
},
{
"name": "avg_daily_trips",
"valueType": "INT64"
}
],
"tags": {
"team": "driver_performance"
},
"ttl": "31449600s",
"batchSource": {
"type": "BATCH_REDSHIFT",
"eventTimestampColumn": "event_timestamp",
"createdTimestampColumn": "created",
"redshiftOptions": {
"table": "feast_driver_hourly_stats"
},
"dataSourceClassType": "feast.infra.offline_stores.redshift_source.RedshiftSource"
},
"online": true
},
"meta": {}
}
离线特征数据提取
通过 Feast get_historical_features API,可以抽取离线特征库数据用于离线训练或特征组合
features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]
entity_df = pd.DataFrame(
{
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=datetime.now() - timedelta(days=3),
end=datetime.now(),
periods=3,
)
],
"driver_id": [1001, 1002, 1003],
}
)
training_df = fs.get_historical_features(
features=features, entity_df=entity_df
).to_df()
如上我们抽取特征标识(entity 字段为 driver_id)为 1001,1002,1003, 时间版本为最近 3 天的离线特征库数据
>>> training_df
event_timestamp driver_id conv_rate acc_rate
0 2022-07-04 02:33:54.114 1001 0.036082 0.707744
1 2022-07-05 14:33:54.114 1002 0.522306 0.983233
2 2022-07-07 02:33:54.114 1003 0.734294 0.034062
离线特征组合
多个特征组需要联合并抽取作为模型训练时,get_historical_features 可以指定多个特征 view 的 features,基于event_timestamp 做 point-in-time 关联,从而得到同一时间版本的离线特征组合的数据
feast_features = [
"zipcode_features:city",
"zipcode_features:state",
"zipcode_features:location_type",
"zipcode_features:tax_returns_filed",
"zipcode_features:population",
"zipcode_features:total_wages",
"credit_history:credit_card_due",
"credit_history:mortgage_due",
"credit_history:student_loan_due",
"credit_history:vehicle_loan_due",
"credit_history:hard_pulls",
"credit_history:missed_payments_2y",
"credit_history:missed_payments_1y",
"credit_history:missed_payments_6m",
"credit_history:bankruptcies",
]
training_df = self.fs.get_historical_features(
entity_df=entity_df, features=feast_features
).to_df()
如上代码示例,在抽取离线特征时,关联了 credit_history 和 zipcode_features 两个离线特征库的相应特征字段,Feast 会在后台拼接Redshift Sql 关联对应的库表及 event_timestamp 等条件
离线特征数据同步在线特征库
通过Feast 提供的 materialize cli
,可以将指定时间版本的 Redshift 离线特征数据同步到 DynamoDB 的在线特征库中
materialize-incremental cli 会记录该 repository 特征库下每次同步的增量时间版本,因此每次执行会把自上次执行至今的最新数据增量同步到 DynamoDB
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME
Materializing 1 feature views to 2022-07-07 08:00:03+00:00 into the sqlite online
store.
driver_hourly_stats from 2022-07-06 16:25:47+00:00 to 2022-07-07 08:00:03+00:00:
100%|████████████████████████████████████████████| 5/5 [00:00<00:00, 592.05it/s]
当然也可以使用 materialize
显式指定开始时间(startdt)和截止时间(enddt), feast会将指定时间版本的离线特征库数据同步到在线特征库
feast materialize 2022-07-13T00:00:00 2022-07-19T00:00:00
Materializing 1 feature views from 2022-07-13 00:00:00+00:00 to 2022-07-19 00:00:00+00:00 into the dynamodb online store.
driver_hourly_stats:
100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 51.18it/s]
在线特征查询
>>> online_features = fs.get_online_features(
features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
).to_dict()
>>> print(pd.DataFrame.from_dict(online_features))
acc_rate conv_rate driver_id
0 0.179407 0.984951 1001
1 0.023422 0.069323 1002
Feast offline store on Spark 方案
上文我们看到的是Feast 依托AWS Redshift作为离线特征库存储和特征抽取的方案,虽然安装部署简介明快,上手方便,但Redshift定位是云服务数据仓库,虽然在sql兼容性、扩展性上优秀,但灵活性不足,如:
- 离线特征抽取必须要指定 event_timestamp 版本,无法直接查询最新 snapshot
- point-in-time 关联查询直接拼接partition over分组sql并下压,海量数据情况下,多历史版本的特征库time travel抽取时会膨胀数倍,存在性能瓶颈
Feast自0.19版本开始,支持Spark作为离线特征库历史数据提取,版本查询,同步在线特征库的计算框架
Spark作为高性能分布式计算引擎,在海量数据场景下性能优异,且使用Spark时,Feast FeatureView的DataSource既可以是指向Hive中的表,也可以是指向对象存储上的文件,通过Hive表可以兼容诸如Hudi、iceberg等多种数据湖架构。
同时,通过Spark 离线特征库抽取的特征数据,Feast将其封装为Spark DataFrame,从而可以方便的加载到S3分布式存储,因而也避免了Pandas DataFrame保存在本地磁盘的存储空间问题。
Feast point-in-time correct join Spark 实现
point-in-time correct join,根据源码来看,使用pySpark+SparkSQL实现,因此整体思路和Redshift类似:
- 将entity_df由DataFrame转化为Spark DataFrame,并注册成临时表
- 根据用户指定要关联的features,找到对应的FeatureView,进而找到底层的DataSource和相关的元数据
- 根据以上信息,即query_context,通过jinjia渲染一个SparkSQL,并提交给Spark集群计算
- 计算完成的结果就是实现point-in-time correct join之后的training dataset
Feast offline store on AWS EMR安装部署
AWS EMR是全托管的hadoop大数据集群,提供了良好的弹性伸缩,高可用,存算分离等特性,且通过EMRFS原生集成AWS S3云存储,用于承载Feast的Spark离线特征库具有天然的亲和力。
以下详细介绍Feast Spark离线特征库在AWS EMR的安装部署步骤及使用方法
启动AWS EMR集群
AWS EMR的启动方法本文不再赘述,感兴趣的同学可以参阅AWS EMR文档
此处选择emr 6.5版本,Spark 3.1.2
Offline store on EMR特征库配置
我们在emr主节点上可以feast init 特征库,从而直接利用AWS EMR上spark与S3的原生集成,通过emrfs读写S3数据湖上各种格式文件,不再需要hadoop s3开源lib的支持
feast init my_project后,在该特征库的yaml配置文件中,指定Feast spark的对应参数即可:
project: feast_spark_project
registry: data/registry.db
provider: local
offline_store:
type: spark
spark_conf:
spark.master: yarn
spark.ui.enabled: "true"
spark.eventLog.enabled: "true"
spark.sql.catalogImplementation: "hive"
spark.sql.parser.quotedRegexColumnNames: "true"
spark.sql.session.timeZone: "UTC"
配置完成后,通过feast apply cli同样部署到EMR spark
注:在EMR master节点上pyspark lib路径需要在环境变量中设置,以便feast找到spark的home目录及相应配置
source /etc/spark/conf/spark-env.sh
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH"
Feast on Spark 离线特征库元数据
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource,)
driver_hourly_stats= SparkSource(
name="driver_hourly_stats",
query="SELECT event_timestamp as ts, created_timestamp as created, conv_rate,conv_rate,conv_rate FROM emr_feature_store.driver_hourly_stats",
event_timestamp_column="ts",
created_timestamp_column="created"
)
Feast的sparkSource提供了query, table,及原始raw文件路径几种初始化方法,本文中使用query方式。
需要注意query方式中,需要指定event timestamp field特征字段以便Feast识别作为point-in-time cross join时间版本抽取及特征join的依据
Feast Spark offline store 执行
配置Spark作为Feast offline store后,通过AWS EMR上spark history UI,可以清楚的看到其get_historical_features方法,底层Feast使用SparkSQL 创建临时视图,拼接event time join的sql,并查询上文中source数据湖上hive库表等各个步骤的业务逻辑:
跟踪Spark history UI上,Spark Sql的各个query可以看到,Feast的get_historical_features方法执行时,会构造临时表entity_dataframe,即用户调用get_historical_features方法时,传入的样本列表。再构建driver_hourly_stats_base,即需要join及point-in-time查询的即样例特征时序表
== Parsed Logical Plan ==
'CreateViewStatement [driver_hourly_stats__cleaned], (
WITH driver_hourly_stats__entity_dataframe AS (
SELECT
driver_id,
entity_timestamp,
driver_hourly_stats__entity_row_unique_id
FROM entity_dataframe
GROUP BY
driver_id,
entity_timestamp,
driver_hourly_stats__entity_row_unique_id
),
driver_hourly_stats__base AS (
SELECT
subquery.*,
entity_dataframe.entity_timestamp,
entity_dataframe.driver_hourly_stats__entity_row_unique_id
FROM driver_hourly_stats__subquery AS subquery
INNER JOIN driver_hourly_stats__entity_dataframe AS entity_dataframe
ON TRUE
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - 86400 * interval '1' second
AND subquery.driver_id = entity_dataframe.driver_id
),
后续的subquery、dedup及cleaned子查询,会基于以上的两张基础表,进行基于特征标识字段driver_id和时序时间戳字段event_timestamp的分组排序,剔重等操作,最后join样本列表临时表entity_dataframe,整个流程与Redshift上基本一致
driver_hourly_stats__subquery AS (
SELECT
ts as event_timestamp,
created as created_timestamp,
driver_id AS driver_id,
conv_rate as conv_rate,
acc_rate as acc_rate
FROM (SELECT driver_id,event_timestamp as ts, created_timestamp as created, conv_rate,acc_rate,avg_daily_trips FROM emr_feature_store.driver_hourly_stats)
WHERE ts <= '2022-07-25T03:27:05.903000'
AND ts >= '2022-07-21T03:27:05.903000'
),
driver_hourly_stats__dedup AS (
SELECT
driver_hourly_stats__entity_row_unique_id,
event_timestamp,
MAX(created_timestamp) as created_timestamp
FROM driver_hourly_stats__base
GROUP BY driver_hourly_stats__entity_row_unique_id, event_timestamp
),
driver_hourly_stats__latest AS (
SELECT
event_timestamp,
created_timestamp,
driver_hourly_stats__entity_row_unique_id
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY driver_hourly_stats__entity_row_unique_id
ORDER BY event_timestamp DESC,created_timestamp DESC
) AS row_number
FROM driver_hourly_stats__base
INNER JOIN driver_hourly_stats__dedup
USING (driver_hourly_stats__entity_row_unique_id, event_timestamp, created_timestamp)
)
WHERE row_number = 1
)
API结果返回可以to_df为Spark的Dataframe,从而实现remote 存储离线特征库抽取结果数据的操作,这也从另一方面解决了原有Redshift离线特征存储,特征抽取只能返回pandas Dataframe的劣势,在大数据量离线特征场景下更有优势
总结
综上所述,Feast 框架整体架构和在 AWS 的构建是非常简洁明快的,对构建MLOps平台的用户而言,其主要有价值的优势如下:
- 同时提供了离线,在线特征库,离线-在线特征库快照版本同步功能
- 轻量级,快速部署使用, 代码即配置,feast apply 即可部署到AWS
- 通过 repository 文件系统隔离特征库,方便MLOps多租户多CICD协同开发
- API 抽象程度高,贴近 AI/ML 算法工程师业务语言
对于海量离线特征数据抽取时point-in-time cross join的版本查询数据膨胀的业界难点,Feast也可以通过on EMR Spark的构建方式,优化解决其性能问题
参考资料
AWS Sagemaker Feature Store: https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html
Feast官方:https://docs.feast.dev/getting-started/architecture-and-components/overview
AWS EMR集群部署:https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html
本篇作者