AWS Machine Learning Blog

Build machine learning-ready datasets from the Amazon SageMaker offline Feature Store using the Amazon SageMaker Python SDK

Amazon SageMaker Feature Store is a purpose-built service to store and retrieve feature data for use by machine learning (ML) models. Feature Store provides an online store capable of low-latency, high-throughput reads and writes, and an offline store that provides bulk access to all historical record data. Feature Store handles the synchronization of data between the online and offline stores.

Because model development is an iterative process, customers will frequently query the offline store and build various datasets for model training. Currently, there are several ways to access features in the offline store, including running SQL queries with Amazon Athena or using Spark SQL in Apache Spark. However, these patterns require writing ad hoc (and sometimes complex) SQL statements, which isn’t always suitable for the data scientist persona.

Feature Store recently extended the SageMaker Python SDK to make it easier to create datasets from the offline store. With this release, you can use a new set of methods in the SDK to create datasets without writing SQL queries. These new methods support common operations such as time travel, filtering duplicate records, and joining multiple feature groups while ensuring point-in-time accuracy.

In this post, we demonstrate how to use the SageMaker Python SDK to build ML-ready datasets without writing any SQL statements.

Solution overview

To demonstrate the new functionality, we work with two datasets: leads and web marketing metrics. These datasets can be used to build a model that predicts if a lead will convert into a sale given marketing activities and metrics captured for that lead.

The leads data contains information on prospective customers who are identified using Lead_ProspectID. The features for a lead (for example, LeadSource) can be updated over time, which results in a new record for that lead. The Lead_EventTime represents the time in which each record is created. The following screenshot shows an example of this data.

The web marketing metrics data tracks the engagement metrics for a lead, where each lead is identified using the Web_ProspectID. The Web_EventTime represents the time in which the record was created. Unlike the leads feature group, there is only one record per lead in this feature group. The following screenshot shows an example of this data.

We walk through the key parts of the sagemaker-feature-store-offline-sdk.ipynb notebook, which demonstrates the following steps:

  1. Create a dataset from a feature group.
  2. Join multiple feature groups.
  3. Create a point-in-time join between a feature group and a dataset based on a set of events at specific timestamps.
  4. Retrieve feature history within a specific time range.
  5. Retrieve features as of a specific timestamp.

Prerequisites

You need the following prerequisites:

git clone https://github.com/aws-samples/amazon-sagemaker-feature-store-offline-queries.git

We assume a feature group for the leads data has been created using the existing FeatureGroup.create method, and can be referenced using the variable base_fg. For more information on feature groups, refer to Create Feature Groups.

Create a dataset from a feature group

To create a dataset using the SageMaker SDK, we use the new FeatureStore class, which contains the create_dataset method. This method accepts a base feature group that may be joined with other feature groups or DataFrames. We start by providing the leads feature group as the base and an Amazon Simple Storage Service (Amazon S3) path to store the dataset:

from sagemaker.feature_store.feature_store import FeatureStore
feature_store = FeatureStore(sagemaker_session=feature_store_session)
ds1_builder = feature_store.create_dataset (base=base_fg,
output_path=f"s3://{s3_bucket_name}/dataset_query_results",)

The create_dataset method returns a DatasetBuilder object, which can be used to generate a dataset from one or multiple feature groups (which we demonstrate in the next section). To create a simple dataset consisting of only the leads features, we invoke the to_csv_file method. This runs a query in Athena to retrieve the features from the offline store, and saves the results to the specified S3 path.

csv, query = ds1_builder.to_csv_file()
# Show S3 location of CSV file
print(f'CSV file: {csv}')

Join multiple feature groups

With the SageMaker SDK, you can easily join multiple feature groups to build a dataset. You can also perform join operations between an existing Pandas DataFrame to a single or multiple feature groups. The base feature group is an important concept for joins. The base feature group is the feature group that has other feature groups or the Pandas DataFrame joined to it.

While creating the dataset using the create_dataset function, we use the with_feature_group method, which performs an inner join between the base feature group and another feature group using the record identifier and the target feature name in the base feature group. In our example, the base feature group is the leads feature group, and the target feature group is the web marketing feature group. The with_feature_group method accepts the following arguments:

  • feature_group – This is the feature group we are joining with. In our code sample, the target feature group is created by using the web marketing dataset.
  • target_feature_name_in_base – The name of the feature in the base feature group that we’re using as a key in the join. We use Lead_ProspectID as the record identifier for the base feature group.
  • included_feature_names – This is the list of the feature names of the base feature group. We use this field to specify the features that we want to include in the dataset.

The following code shows an example of creating a dataset by joining the base feature group with the target feature group:

join_builder = feature_store.create_dataset(base=base_fg, 
output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID",
"LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits",
"AttendedMarketingEvent","OrganicSearch",
"ViewedAdvertisement",],)

You can extend the join operations to include multiple feature groups by adding the with_feature_group method at the end of the preceding code example and defining the required arguments for the new feature group. You can also perform join operations with an existing DataFrame by defining the base to be your existing Pandas DataFrame and joining with the interested feature groups. The following code sample shows how to create dataset with an existing Pandas DataFrame and an existing feature group:

ds2_builder = feature_store.create_dataset(
base=new_records_df2, # Pandas DataFrame
event_time_identifier_feature_name="Lead_EventTime",
record_identifier_feature_name="Lead_ProspectID",
output_path=f"s3://{s3_bucket_name}/dataset_query_results",).with_feature_group(
base_fg, "Lead_ProspectID", ["LeadSource"])

For more examples on these various configurations, refer to Create a Dataset from your Feature Groups.

Create a point-in-time join

One of the most powerful capabilities of this enhancement is to perform point-in-time joins simply and without the need to write complex SQL code. When building ML models, data scientists need to avoid data leakage or target leakage, which is accidentally using data during model training that wouldn’t be available at the time of prediction. For instance, if we’re trying to predict credit card fraud, we should exclude transactions that arrive after the fraudulent charge we’re trying to predict, otherwise the trained model could use this post-fraud information to alter the model, making it generalize less well.

Retrieval of point-in-time accurate feature data requires you to supply an entity DataFrame that provides a set of record IDs (or primary key) and corresponding event times that serve as the cutoff time for the event. This retrieval mechanism is sometimes referred to as row-level time travel, because it allows a different time constraint to be applied for each row key. To perform point-in-time joins with the SageMaker SDK, we use the Dataset Builder class and provide the entity DataFrame as the base argument to the constructor.

In the following code, we create a simple entity DataFrame with two records. We set the event times, used to indicate the cutoff time, near the middle of the time series data (mid-January 2023):

# Create Events (entity table) dataframe to pass Timestamp for Point-in-Time Join
events = [['2023-01-20T00:00:00Z', record_id1],
['2023-01-15T00:00:00Z', record_id2]]
df_events = pd.DataFrame(events, columns=['Event_Time', 'Lead_ProspectID'])

When we use the point_in_time_accurate_join functionality with the create_dataset call, the internal query excludes all records with timestamps later then the cutoff times supplied, returning the latest feature values that would have been available at the time of the event:

# Create Dataset Builder using point-in-time-accurate-join function

pit_builder = feature_store.create_dataset(
base=df_events,
event_time_identifier_feature_name='Event_Time',
record_identifier_feature_name='Lead_ProspectID',
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results"
).with_feature_group(base_fg, "Lead_ProspectID"
).point_in_time_accurate_join(
).with_number_of_recent_records_by_record_identifier(1)

Notice that there are only two records in the DataFrame returned by the point-in-time join. This is because we only submitted two record IDs in the entity DataFrame, one for each Lead_ProspectID we want to retrieve. The point-in-time criteria specifies that a record’s event time (stored in the Lead_Eventtime field) must contain a value that is less than the cutoff time.

Additionally, we instruct the query to retrieve only the latest record that meets this criteria because we have applied the with_number_of_recent_records_by_record_identifier method. When used in conjunction with the point_in_time_accurate_join method, this allows the caller to specify how many records to return from those that meet the point-in-time join criteria.

Compare point-in-time join results with Athena query results

To verify the output returned by the SageMaker SDK point_in_time_accurate_join function, we compare it to the result of an Athena query. First, we create a standard Athena query using a SELECT statement tied to the specific table created by the Feature Store runtime. This table name can be found by referencing the table_name field after instantiating the athena_query from the FeatureGroup API:

SELECT * FROM "sagemaker_featurestore"."off_sdk_fg_lead_1682348629" 
WHERE "off_sdk_fg_lead_1682348629"."Lead_ProspectID" = '5e84c78f-6438-4d91-aa96-b492f7e91029'

The Athena query doesn’t contain any point-in-time join semantics, so it returns all records that match the specified record_id (Lead_ProspectID).

Next, we use the Pandas library to sort the Athena results by event times for easy comparison. The records with timestamps later than the event times specified in the entity DataFrame (for example, 2023-01-15T00:00:00Z) submitted to the point_in_time_accurate_join don’t show up in the point-in-time results. Because we additionally specified that we only want a single record from the preceding create_dataset code, we only get the latest record prior to the cutoff time. By comparing the SageMaker SDK results with the Athena query results, we see that the point-in-time join function returned the proper records.

Therefore, we have confidence that we can use the SageMaker SDK to perform row-level time travel and avoid target leakage. Furthermore, this capability works across multiple feature groups that may be refreshed on completely different timelines.

Retrieve feature history within a specific time range

We also want to demonstrate the use of specifying a time range window when joining the feature groups to form a dataset. The time window is defined using with_event_time_range, which accepts two inputs, starting_timestamp and ending_timestamp, and returns a dataset builder object. In our code sample, we set the retrieval time window for 1 full day from 2022-07-01 00:00:00 until 2022-07-02 00:00:00.

The following code shows how to create a dataset with the specified event time window while joining the base feature group with the target feature group:

# Setup Event Time window: seconds of unix epoch time
# Start at 07/01/2022 and set time window to one day
start_ts = 1656633600
time_window = 86400
# Using hard-coded timestamps from dataset, then adding time window
datetime_start = datetime.fromtimestamp(start_ts)
datetime_end = datetime.fromtimestamp(start_ts+time_window)
print(f'Setting retrieval time window: {datetime_start} until {datetime_end}')
time_window_builder = (feature_store.create_dataset(
base=base_fg, output_path=f"s3://{s3_bucket_name}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base="Lead_ProspectID",
included_feature_names=["Web_ProspectID","LastCampaignActivity","PageViewsPerVisit",
"TotalTimeOnWebsite","TotalWebVisits","AttendedMarketingEvent",
"OrganicSearch","ViewedAdvertisement",],)
.with_event_time_range(starting_timestamp=datetime_start,ending_timestamp=datetime_end))

We also confirm the difference between the sizes of the dataset created using with_event_time_range by exporting to a Pandas DataFrame with the to_dataframe() method and displaying the data. Notice how the result set has only a fraction of the original 10,020 records, because it only retrieves records whose event_time is within the 1-day time period.

Retrieve features as of a specific timestamp

The DatasetBuilder as_of method retrieves features from a dataset that meet a timestamp-based constraint, which the caller provides as an argument to the function. This mechanism is useful for scenarios such as rerunning experiments on previously collected data, backtesting time series models, or building a dataset from a previous state of the offline store for data auditing purposes. This functionality is sometimes referred to as time travel because it essentially rolls back the data store to an earlier date and time. This time constraint is also referred to as the cutoff timestamp.

In our sample code, we first create the cutoff timestamp by reading the write_time value for the last record written to the Feature Store, the one written with put_record. Then we provide this cutoff timestamp to the DatasetBuilder as an argument to the as_of method:

# Create dataset using as-of timestamp
print(f'using cut-off time: {asof_cutoff_datetime}')
as_of_builder = feature_store.create_dataset(
base=base_fg,
output_path=f"s3://{s3_bucket_name}/{s3_prefix}/dataset_query_results").with_feature_group(
feature_group=target_fg,
target_feature_name_in_base='Lead_ProspectID',
included_feature_names=['Web_ProspectID','Web_EventTime',
'TotalWebVisits']).as_of(asof_cutoff_datetime)

It’s important to note that the as_of method applies the time constraint to the internal write_time field, which is automatically generated by Feature Store. The write_time field represents the actual timestamp when the record is written to the data store. This is different than other methods like point-in-time-accurate-join and with_event_time_range that use the client-provided event_time field as a comparator.

Clean up

Be sure to delete all the resources created as part of this example to avoid incurring ongoing charges. This includes the feature groups and the S3 bucket containing the offline store data.

SageMaker Python SDK experience vs. writing SQL

The new methods in the SageMaker Python SDK allow you to quickly create datasets and move to the training step quickly during the ML lifecycle. To show the time and effort that can be saved, let’s examine a use case where we need to join two feature groups while retrieving the features within a specified time frame. The following figure compares the Python queries on the offline Feature Store vs. SQL used to create the dataset behind a Python query.

As you can see, the same operation of joining two feature groups requires you to create a long, complex SQL query, whereas it can be accomplished using just the with_feature_group and with_event_time_range methods in the SageMaker Python SDK.

Conclusion

The new offline store methods in the Python SageMaker SDK allow you to query your offline features without having to write complex SQL statements. This provides a seamless experience for customers who are accustomed to writing Python code during model development. For more information about feature groups, refer to Create a Dataset From Your Feature Groups and Feature Store APIs: Feature Group.

The full example in this post can be found in the GitHub repository. Give it a try and let us know your feedback in the comments.


About the Authors

Paul Hargis has focused his efforts on machine learning at several companies, including AWS, Amazon, and Hortonworks. He enjoys building technology solutions and teaching people how to leverage them. Paul likes to help customers expand their machine learning initiatives to solve real-world problems. Prior to his role at AWS, he was lead architect for Amazon Exports and Expansions, helping amazon.com improve the experience for international shoppers.

Mecit Gungor is an AI/ML Specialist Solution Architect at AWS helping customers design and build AI/ML solutions at scale. He covers a wide range of AI/ML use cases for Telecommunication customers and currently focuses on Generative AI, LLMs, and training and inference optimization. He can often be found hiking in the wilderness or playing board games with his friends in his free time.

Tony Chen is a Machine Learning Solutions Architect at AWS, helping customers design scalable and robust machine learning capabilities in the cloud. As a former data scientist and data engineer, he leverages his experience to help tackle some of the most challenging problems organizations face with operationalizing machine learning.

Sovik Kumar Nath is an AI/ML solution architect with AWS. He has extensive experience in end-to-end designs and solutions for machine learning; business analytics within financial, operational, and marketing analytics; healthcare; supply chain; and IoT. Outside work, Sovik enjoys traveling and watching movies.