AWS Machine Learning Blog

Using streaming ingestion with Amazon SageMaker Feature Store to make ML-backed decisions in near-real time

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

Businesses are increasingly using machine learning (ML) to make near-real time decisions, such as placing an ad, assigning a driver, recommending a product, or even dynamically pricing products and services. ML models make predictions given a set of input data known as features, and data scientists easily spend more than 60% of their time designing and building these features. Furthermore, highly accurate predictions depend on timely access to feature values that change quickly over time, adding even more complexity to the job of building a highly available and accurate solution. For example, a model for a ride sharing app can choose the best price for a ri­de from the airport, but only if it knows the number of ride requests received in the past 10 minutes and the number of passengers projected to land in the next 10 minutes. A routing model in a call center app can pick the best available agent for an incoming call, but it is only effective if it knows the customer’s latest web session clicks.

Although the business value of near real time ML predictions is enormous, the architecture required to deliver them reliably, securely, and with good performance is complicated. Solutions need high-throughput updates and low latency retrieval of the most recent feature values in milliseconds, something most data scientists are not prepared to deliver. As a result, some enterprises have spent millions of dollars inventing their own proprietary infrastructure for feature management. Other firms have limited their ML applications to simpler patterns like batch scoring until ML vendors provide more comprehensive off-the-shelf solutions for online feature stores.

To address these challenges, Amazon SageMaker Feature Store provides a fully managed central repository for ML features, making it easy to securely store and retrieve features, without having to build and maintain your own infrastructure. Amazon SageMaker Feature Store lets you define groups of features, use batch ingestion and streaming ingestion, retrieve the latest feature values with single-digit millisecond latency for highly accurate online predictions, and extract point-in-time correct datasets for training. Instead of building and maintaining these infrastructure capabilities, you get a fully managed service that scales as your data grows, enables sharing features across teams, and lets your data scientists focus on building great ML models aimed at game-changing business use cases. Teams can now deliver robust features once, and reuse them many times in a variety of models that may be built by different teams.

This post walks through a complete example of how you can couple streaming feature engineering with Amazon SageMaker Feature Store to make ML-backed decisions in near-real time. We show a credit card fraud detection use case that updates aggregate features from a live stream of transactions and uses low-latency feature retrievals to help detect fraudulent transactions. Try it out for yourself by visiting our code repo.

AWS Region Link
us-east-1 (N. Virgnia)
us-east-2 (Ohio)
us-west-1 (N. California)
eu-west-1 (Dublin)
ap-northeast-1 (Tokyo)

Credit card fraud use case

Stolen credit card numbers can be bought in bulk on the dark web from previous leaks or hacks of organizations that store this sensitive data. Fraudsters buy these card lists and attempt to make as many transactions as possible with the stolen numbers until the card is blocked. These fraud attacks typically happen in a short time frame, and this can be easily spotted in historical transactions because the velocity of transactions during the attack differs significantly from the cardholder’s usual spending pattern.

The following table shows a sequence of transactions from one credit card where the cardholder first has a genuine spending pattern and then experiences a fraud attack starting on November 4th.

cc_num trans_time amount fraud_label
…1248 Nov-01 14:50:01 10.15 0
… 1248 Nov-02 12:14:31 32.45 0
… 1248 Nov-02 16:23:12 3.12 0
… 1248 Nov-04 02:12:10 1.01 1
… 1248 Nov-04 02:13:34 22.55 1
… 1248 Nov-04 02:14:05 90.55 1
… 1248 Nov-04 02:15:10 60.75 1
… 1248 Nov-04 13:30:55 12.75 0

For this post, we train an ML model to spot this kind of behavior by engineering features that describe an individual card’s spending pattern, such as the number of transactions or the average transaction amount from that card in a certain time window. This model protects cardholders from fraud at the point of sale by detecting and blocking suspicious transactions before the payment can complete. The model makes predictions in a low-latency, real-time context and relies on receiving up-to-the-minute feature calculations, so it can respond to an ongoing fraud attack. In a real-world scenario, features related to cardholder spending patterns would only form part of the model’s feature set, and we can include information about the merchant, the cardholder, the device used to make the payment, and any other data that may be relevant to detecting fraud.

Because our use case relies on profiling an individual card’s spending patterns, it’s crucial that we can identify credit cards in a transaction stream. Most publicly available fraud detection datasets don’t provide this information, so we use the Python Faker library to generate a set of transactions covering a 5-month period. This dataset contains 5.4 million transactions spread across 10,000 unique (and fake) credit card numbers and is intentionally imbalanced to match the reality of credit card fraud (only 0.25% of the transactions are fraudulent). We vary the number of transactions per day per card, as well as the transaction amounts. See our code repo for more detail.

Overview of the solution

We want our fraud detection model to classify credit card transactions by noticing a burst of recent transactions that differs significantly from the cardholder’s usual spending pattern. Sounds simple enough, but how do we build it?

The following diagram shows our overall solution architecture. We feel that this same pattern will work well for a variety of streaming aggregation use cases. At a high level, the pattern involves the following five pieces. We dive into more detail on these in subsequent sections:

  1. Feature store – We use Amazon SageMaker Feature Store to provide a repository of features with high-throughput writes and secure low-latency reads, using feature values that are organized into multiple feature groups.
  2. Batch ingestion – Batch ingestion takes labeled historical credit card transactions and creates the aggregate features and ratios needed for training the fraud detection model. We use an Amazon SageMaker Processing job and the built-in Spark container to calculate aggregate weekly counts and transaction amount averages and ingest them into the feature store for use in online inference.
  3. Model training and deployment – This aspect of our solution is straightforward. We use Amazon SageMaker to train a model using the built-in XGBoost algorithm on aggregated features created from historical transactions. The model is deployed to a SageMaker endpoint, where it handles fraud detection requests on live transactions.
  4. Streaming ingestion – An Amazon Kinesis Data Analytics application calculates aggregated features from a transaction stream, and an AWS Lambda function updates the online feature store.
  5. Streaming predictions – Lastly, we make fraud predictions on a stream of transactions, using AWS Lambda to pull aggregate features from the online feature store. We use the latest feature data to calculate transaction ratios and then call the fraud detection endpoint.

Feature store

ML models rely on well-engineered features coming from a variety of data sources, with transformations as simple as calculations, or as complicated as a multi-step pipeline that takes hours of compute time and complex coding. Amazon SageMaker Feature Store enables the reuse of these features across teams and models which improves data scientist productivity, speeds time to market, and ensures consistency of model input.

Each feature inside SageMaker Feature Store is organized into a logical grouping called a feature group. You decide which feature groups you need for your models. Each one can have dozens, hundreds, or even thousands of features. Feature groups are managed and scaled independently, but they’re all available for search and discovery across teams of data scientists responsible for many independent ML models and use cases.

ML models often require features from multiple feature groups. A key aspect of a feature group is how often its feature values need to be updated or materialized for downstream training or inference. You refresh some features hourly, nightly, or weekly, and a subset of features must be streamed to the feature store in near-real time. Streaming all feature updates would lead to unnecessary complexity, and could even lower the quality of data distributions by not giving you the chance to remove outliers.

In our use case, we create a feature group called cc-agg-batch-fg for aggregated credit card features updated in batch, and one called cc-agg-fg for streaming features. The batch feature group is updated nightly, and provides aggregate features looking back over a one-week time window. Recalculating one-week aggregations on streaming transactions does not offer meaningful signals, and would be a waste of resources.

Conversely, our cc-agg-fg feature group must be updated in a streaming fashion, because it offers the latest transaction counts and average transaction amounts looking back over a 10-minute time window. Without streaming aggregation, we could not spot the typical fraud attack pattern of a rapid sequence of purchases.

By isolating features that are recalculated nightly, we can improve ingestion throughput for our streaming features. Separation lets us optimize the ingestion for each group independently. When designing for your use cases, keep in mind that models requiring features from a large number of feature groups may want to make multiple retrievals from the feature store in parallel to avoid adding excessive latency to a real time prediction workflow.

The feature groups for our use case are seen in the following diagram.

Each feature group must have one feature used as a record identifier (for this post, the credit card number). The record identifier acts as a primary key for the feature group, enabling fast lookups as well as joins across feature groups. An event time feature is also required, which enables the feature store to track the history of feature values over time. This becomes important when looking back at the state of features at a specific point in time.

In each feature group, we track the number of transactions per unique credit card and its average transaction amount. The only difference between our two groups is the time window used for aggregation. We use a 10-minute window for streaming aggregation, and a 1-week window for batch aggregation.

With Amazon SageMaker Feature Store, you have the flexibility to create feature groups that are offline only, online only, or both online and offline. An online store provides high-throughput writes and low-latency retrievals of feature values, ideal for online inference. An offline store is provided using Amazon S3, giving firms a highly scalable repository, with a full history of feature values, partitioned by feature group. The offline store is ideal for training and batch scoring use cases.

When you enable a feature group to provide both online and offline stores, SageMaker automatically synchronizes feature values to an offline store, continuously appending the latest values to give you a full history of values over time. Another benefit of feature groups that are both online and offline is to help avoid the problem of training and inference skew. SageMaker lets you feed both training and inference with the same transformed feature values, ensuring consistency to drive more accurate predictions. The focus in our post is to demonstrate online feature streaming, so we implemented online-only feature groups.

Batch ingestion

To materialize our batch features, we create a feature pipeline that is run as an Amazon SageMaker Processing job that is executed nightly. The job has two responsibilities: producing the dataset for training our model, and populating the batch feature group with the most up-to-date values for aggregate 1-week features, as shown in the following diagram:

Each historical transaction used in the training set is enriched with aggregated features for the specific credit card involved in the transaction. We look back over two separate sliding time windows: 1 week back, and the preceding 10 minutes. The actual features used to train the model include the following ratios of these aggregated values:

  • amt_ratio1 = avg_amt_last_10m / avg_amt_last_1w
  • amt_ratio2 = transaction_amount / avg_amt_last_1w
  • count_ratio = num_trans_last_10m / num_trans_last_1w

For example, the third ratio is the transaction count from the prior 10 minutes divided by the transaction count from the last week. Our ML model can learn patterns of normal activity versus fraudulent activity from these ratios, rather than relying on raw counts and transaction amounts. Spending patterns on different cards vary greatly, so normalized ratios provide a better signal to the model than the aggregated amounts themselves.

You may be wondering why our batch job is computing features with a 10-minute lookback. Isn’t that only relevant for online inference? We need the 10-minute window on historical transactions to create an accurate training dataset. This is critical for ensuring consistency with the 10-minute streaming window that will be used in near real time to support online inference.

The resulting training dataset from the processing job can be saved directly as a CSV for model training, or it can be bulk ingested into an offline feature group that can be used for other models and by other data science teams to address a wide variety of other use cases. For example, we can create and populate a feature group called cc-transactions-fg. Our training job can then pull a specific training dataset based on the needs for our specific model, selecting specific date ranges and a subset of features of interest. This approach enables multiple teams to reuse feature groups and maintain fewer feature pipelines, leading to significant cost savings and productivity improvements over time. This example notebook demonstrates the pattern of using SageMaker Feature Store as a central repository that data scientists can extract training datasets from.

In addition to creating a training dataset, we use the PutRecord API to put the 1-week feature aggregations into the online feature store nightly. The following code demonstrates putting a record into an online feature group given specific feature values, including a record identifier and an event time:

record = [{'FeatureName': 'cc_num', 
              'ValueAsString': str(cc_num)},
             {'FeatureName':'avg_amt_last_1w', 
              'ValueAsString': str(avg_amt_last_1w)},
             {'FeatureName':'num_trans_last_1w', 
              'ValueAsString': str(num_trans_last_1w)}]
event_time_feature = {
                 'FeatureName': 'trans_time',
                 'ValueAsString': str(int(round(time.time())))}
record.append(event_time_feature)
response = feature_store_client.put_record(
    FeatureGroupName=’cc-agg-batch-fg’, Record=record)

ML engineers often build a separate version of feature engineering code for online features based on the original code written by data scientists for model training. This can deliver the desired performance, but is an extra development step and introduces more chance for training and inference skew. In our use case, we show how using SQL for aggregations can enable a data scientist to provide the same code for both batch and streaming.

Streaming ingestion

Amazon SageMaker Feature Store delivers single-digit millisecond retrieval of pre-calculated features, and it can also play an effective role in solutions requiring streaming ingestion. Our use case demonstrates both. Weekly lookback is handled as a pre-calculated feature group, materialized nightly as shown earlier. Now let’s dive into how we calculate features aggregated on the fly over a 10-minute window and ingest them into the feature store for later online inference.

You can perform streaming ingestion by tapping into an Apache Kafka topic or an Amazon Kinesis Data Stream, applying feature transformation and aggregation, and pushing the result to the feature store. For teams comfortable with Java, Apache Flink is a popular framework for streaming aggregation. However, for data scientists with limited Java skills, SQL is a much more accessible option.

In our use case, we listen to a Kinesis data stream of credit card transactions, and use a simple Kinesis Data Analytics SQL application to create aggregate features. An AWS Lambda function ingests those features into the feature store for subsequent use at inference time. Establishing the SQL app is straightforward. You choose a source stream, define a SQL query, and identify a destination (for our use case, a Lambda function).

To produce aggregate counts and average amounts looking back over a 10-minute window, we use the following SQL query on the input stream:

cc_num amount datetime num_trans_last_10m avg_amt_last_10m
…1248 50.00 Nov-01,22:01:00 1 74.99
…9843 99.50 Nov-01,22:02:30 1 99.50
…7403 100.00 Nov-01,22:03:48 1 100.00
…1248 200.00 Nov-01,22:03:59 2 125.00
…0732 26.99 Nov01, 22:04:15 1 26.99
…1248 50.00 Nov-01,22:04:28 3 100.00
…1248 500.00 Nov-01,22:05:05 4 200.00
SELECT STREAM "cc_num",
               COUNT(*) OVER LAST_10_MINUTES,
               AVG("amount") OVER LAST_10_MINUTES
FROM transactions WINDOW LAST_10_MINUTES AS (PARTITION BY "cc_num" RANGE INTERVAL '10' MINUTE PRECEDING)

In this example, notice that the final row has a count of four transactions in the last 10 minutes from the credit card ending with 1248, and a corresponding average transaction amount of $200.00. The SQL query is consistent with the one used to drive creation of our training dataset, helping to avoid training and inference skew.

As transactions stream into the SQL app, the app sends the aggregate results to our Lambda function, as shown in the following diagram. The Lambda function takes these features and populates the cc-agg-fg feature group.

Updating feature values in the feature store from Lambda is done using a simple call to the PutRecord API. The following is the core piece of Python code for storing the aggregate features:

record = [{'FeatureName': 'cc_num', 
           'ValueAsString': str(cc_num)},
          {'FeatureName':'avg_amt_last_10m', 
           'ValueAsString': str(avg_amt_last_10m)},
          {'FeatureName':'num_trans_last_10m', 
           'ValueAsString': str(num_trans_last_10m)},
          {'FeatureName': 'evt_time', 
           'ValueAsString': str(int(round(time.time())))}]
featurestore_runtime.put_record(FeatureGroupName='cc-agg-fg', 
                                Record=record)

We prepare the record as a list of named value pairs, including the current time as the event time. The SageMaker Feature Store API ensures that this new record follows the schema that we identified when we created the feature group. If a record for this primary key already existed, it is now overwritten in the online store.

Streaming predictions

Now that we have streaming ingestion keeping the feature store up to date with the latest feature values, let’s look at how we make fraud predictions.

We create a second Lambda function that uses a Kinesis data stream as a trigger. For each new transaction event, we retrieve batch and streaming features from SageMaker Feature Store, calculate ratios, and invoke the SageMaker model endpoint to make the prediction as shown in the following diagram.

We use the following code to retrieve feature values on demand from the feature store before calling the SageMaker model endpoint:

featurestore_runtime =  
        boto3.client(service_name='sagemaker-featurestore-runtime')
response = featurestore_runtime.get_record(
		FeatureGroupName=feature_group_name, 
        RecordIdentifierValueAsString=record_identifier_value)

Finally, with the model input feature vector assembled, we call the model endpoint to predict if a specific credit card transaction is fraudulent:

sagemaker_runtime =  
    boto3.client(service_name='runtime.sagemaker')
request_body = ','.join(features)
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=ENDPOINT_NAME,
    ContentType='text/csv',
    Body=request_body)
probability = json.loads(response['Body'].read().decode('utf-8'))

In the example above, the model came back with a probability of 98% that the specific transaction was fraudulent, and it was able to leverage near real-time aggregated input features, based on the most recent 10 minutes of transactions on that credit card.

Seeing it work end to end

To demonstrate the full end-to-end workflow of our solution, we simply send credit card transactions into our Kinesis data stream. Our automated streaming feature aggregation takes over from there, maintaining a near real time view of transaction counts and amounts in SageMaker Feature Store, with a sliding 10-minute lookback window. These features are combined with the 1-week aggregate features that were already ingested to the feature store in batch, letting us make fraud predictions on each transaction.

We send a single transaction from three different credit cards. We then simulate a fraud attack on a fourth credit card by sending many back-to-back transactions in seconds. The output from our Lambda function is shown below. As expected, the first three one-off transactions are predicted as NOT FRAUD. Of the 10 fraudulent transactions, the first is predicted as NOT FRAUD, and the rest are all correctly identified as FRAUD. Notice how the aggregate features are kept current, helping drive more accurate predictions.

Conclusion

We have shown how Amazon SageMaker Feature Store can play a key role in the solution architecture for critical operational workflows that need streaming aggregation and low latency inference. With an enterprise-ready feature store in place, you can use both batch ingestion and streaming ingestion to feed feature groups, and access feature values on demand to perform online predictions for significant business value. ML features can now be shared at scale across many teams of data scientists and thousands of ML models, improving data consistency, model accuracy, and data scientist productivity. Amazon SageMaker Feature Store is available now, and you can try out this entire example. Let us know what you think.


About the Authors

Paul Hargis is an AI/ML Specialist, Solutions Architect at Amazon Web Services (AWS). Prior to this role, he was lead architect for Amazon Exports and Expansions helping amazon.com improve experience for international shoppers. Paul likes to help customers expand their machine learning initiatives to solve real-world problems. He is married and has one daughter who runs in Cross Country and Track teams in high school.

Megan Leoni is an AI/ML Specialist Solutions Architect for AWS helping customers across Europe, Middle East, and Africa design and implement ML Solutions. Prior to joining AWS, Megan worked as a data scientist building and deploying real time fraud detection models.

Mark Roy is a Principal Machine Learning Architect for AWS, helping AWS customers design and build AI/ML solutions. Mark’s work covers a wide range of ML use cases, with a primary interest in computer vision, deep learning, and scaling ML across the enterprise. He has helped companies in many industries, including Insurance, Financial Services, Media and Entertainment, Healthcare, Utilities, and Manufacturing. Mark holds 6 AWS certifications, including the ML Specialty Certification. Prior to joining AWS, Mark was an architect, developer, and technology leader for 25+ years, including 19 years in financial services.

Arunprasath Shankar is an Artificial Intelligence and Machine Learning (AI/ML) Specialist Solutions Architect with AWS, helping global customers scale their AI solutions effectively and efficiently in the cloud. In his spare time, Arun enjoys watching sci-fi movies and listening to classical music.