AWS Machine Learning Blog

Scale ML feature ingestion using Amazon SageMaker Feature Store

Amazon SageMaker Feature Store is a purpose-built solution for machine learning (ML) feature management. It helps data science teams reuse ML features across teams and models, serves features for model predictions at scale with low latency, and train and deploy new models more quickly and effectively.

As you learn about how to use a feature store, you may come across many examples that use very simple scenarios involving a few hundred or a few thousand rows of feature data. Although those examples help you get started, they don’t answer the question of what happens when your feature groups need to be fed with millions or hundreds of millions of rows nightly for a production use case. Will your feature store scale? Are there ways to shrink the job duration to meet production batch cycle constraints?

Feature Store can scale from initial experimentation using a few small feature groups up to thousands of feature groups and hundreds of thousands of features. Data scientists can easily create new feature groups and ingest features using a single line of code from a Pandas DataFrame or a Spark DataFrame. As you move beyond a quick proof of concept, Amazon SageMaker lets you ingest large amounts of feature data and develop scalable and repeatable feature pipelines without needing an army of ML engineers or infrastructure teams to develop scalable code or manage persistent clusters.

In this post, we provide an overview of ingestion options for Feature Store, and the benefits and use cases for each one. We also provide a GitHub repo that demonstrates several of these approaches, so you can try them out in your own AWS account. If you’re new to Feature Store, you may want to review Understanding the key capabilities of Amazon SageMaker Feature Store for additional background before diving into the rest of this post.

Want to learn more about Amazon SageMaker Feature Store? These posts might interest you:

Ingestion use cases

Before getting into solution details, let’s review three distinct use cases for feature ingestion, each with its own requirements and runtime context:

  • Experimenting with new ML features – In this scenario, the runtime context is typically a Jupyter notebook, and the data scientist uses packages like Pandas to explore the data, implement features, and try them out in new models. Key ingestion requirements are ease of use and quick ingestion of small to medium size feature sets. For this use case, Feature Store provides a simple ingest method that uses Python multiprocessing to ingest features from a Pandas DataFrame with one line of code. Data scientists that prefer operating in a Spark environment can easily ingest Spark DataFrames as well.
  • Streaming feature ingestion – For some use cases like intelligent clickstream analysis, feature values are created on the fly based on near-real-time data streaming from sources like Amazon Kinesis and Apache Kafka. Performance and throughput are critical, and ingestion of the resulting features is done in parallel from multiple threads. The SageMaker PutRecord API is the core method used for streaming ingestion, and the runtime context is typically Apache Flink, an AWS Lambda function, or even Spark Streaming. For a deeper discussion and a code example of streaming ingestion, see Using streaming ingestion with Amazon SageMaker Feature Store to make ML-backed decisions in near-real time.
  • Bulk feature ingestion as part of a batch pipeline – Many customers rely primarily on batch feature pipelines, running hourly, daily, weekly, or even monthly, depending on availability of fresh source data. Ingestion requirements for batch feature pipelines focus on performance, throughput, job duration, cost, and ability to deal with large numbers of large files. Instead of Jupyter notebooks, you can use batch processing jobs on clusters of machines capable of mass parallelism. The rest of this post takes you through the details of how to handle bulk feature ingestion at scale.

No matter what the ingestion context, SageMaker performs data and schema validation at ingestion time to ensure that data quality is maintained. These validations identify any data that doesn’t conform to the defined data types and that the input records contain all features. SageMaker automatically replicates feature data to the offline store within a few minutes of ingestion. The offline store enables ad hoc querying of feature value history, and flexible dataset extraction for model training and batch scoring.

Feature ingestion APIs

SageMaker provides two key APIs for feature ingestion: PutRecord and FeatureGroup.ingest.

PutRecord is the core API, allowing you to ingest a single feature record to a specific feature group. A feature record is a set of name-value pairs with feature values provided as strings. PutRecord is designed for low latency, scales up to high throughput workloads, is available from multiple languages (including Python and Java), and works well in both batch and streaming workloads.

To make coding easier in Python, SageMaker provides a method to ingest an entire Pandas DataFrame into a target feature group. The ingest method uses PutRecord, and lets the caller control the amount of client-side parallelism through MaxProcesses and MaxWorkers parameters.

The FeatureGroup.ingest API works great for experimentation in notebooks. It is also a powerful ingredient for using the full horsepower of a given machine when performing bulk ingestion at scale on many machines.

Options for scaling feature ingestion beyond a single machine

For lightweight ingestion tasks, like a small feature group needing 100,000 records daily, any implementation approach will probably yield good enough performance. However, when your feature groups have thousands of features, or when each ingestion job must process hundreds of millions of new rows, it’s important to pay close attention to the solution architecture. You may have strict job duration windows that dictate the maximum time available to ingest all the features for a given hour or day.

In these larger scale use cases, you need to maximize PutRecord throughput by taking advantage of a multi-instance cluster and ensuring high resource utilization to minimize job duration.

There are endless ways to implement feature ingestion pipelines, including AWS Glue, Amazon EMR, AWS Batch, and Amazon SageMaker Processing jobs, to name a few. Feature Store gives you complete flexibility to decide which works best for you, based on your own specific requirements. Some customers are establishing new feature pipelines. Others are extending existing feature pipelines, adding a feature ingestion step to make the features available in Feature Store. Many customers have existing pipelines that produce as output a set of files containing final features. SageMaker gives you the flexibility to integrate with each of these options.

Use SageMaker Processing jobs for feature ingestion

When choosing an approach for scalable feature ingestion, SageMaker Processing jobs offer a simple yet powerful option. SageMaker Processing provides a simplified, managed experience to run feature engineering workloads, and do so at scale. You simply provide a Python script, point to the input data source, and pick the number and type of machines to use. SageMaker does the rest, without requiring expensive and time-consuming management of custom ML infrastructure.

The scripts can be simple Python code. Alternatively, if your data scientists are comfortable using Spark, they can easily use the power of Spark without having to know how to create or manage Spark clusters. With SageMaker Processing, you can easily scale up (larger instance types) and out (more instances in the cluster) to meet demands for increasing scale of feature data, growing number of input files, and extent of feature transformations.

The following diagram shows how this approach works. A set of input files is on the left, each containing feature data to be ingested, or raw data to be transformed and ingested. A target feature group is on the right, with both an online store for the latest feature values and an offline store to hold feature history over time. In the center, a SageMaker Processing job automatically distributes processing across a set of machines. Within each machine instance, the job uses a configurable set of Python processes and workers to generate high throughput parallel calls to quickly and efficiently load large amounts of feature data into Feature Store. Need your nightly job to finish more quickly? Need to move from 100 MB of nightly features up to 100 GB? Simply pick more powerful instance types, or increase the number of instances. Feature Store can keep up just fine.

Let’s look at the coding involved. The following is a few key lines of code from a sample feature ingestion script:

# assemble the dataframe from multiple input files
df = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)

# optionally apply feature transformations
df = apply_transforms(df)

# ingest feature values, taking advantage of available 
# instance resources
fg = FeatureGroup(name=args.feature_group_name,
                  sagemaker_session=sm_session)
fg.ingest(data_frame=df, max_processes=args.num_processes, 
          max_workers=args.num_workers, wait=True)

SageMaker Processing provides a subset of input files to each of the machine instances for the job using Amazon Simple Storage Service (Amazon S3) key sharding. The script combines those inputs to a single Pandas DataFrame, applies a set of transformations, and ingests the resulting features using client-side parallel calls.

Now let’s look at sample code for launching the processing job using a built-in processor object:

proc = SKLearnProcessor(framework_version='0.20.0', role=role,
                        instance_type='ml.m5.xlarge',
                        instance_count=2)
proc.run(code='./feature_ingest.py', 
         arguments = ['--num_processes', '8', 
                      '--feature_group_name', feature_group_name,
                     '--region_name', region],
          inputs=[ProcessingInput(s3_data_type='S3Prefix', 
                     source=s3_uri_prefix, 
                     s3_data_distribution_type='ShardedByS3Key',
                     destination='/opt/ml/processing/input')])

A simple helper function can collapse this into a single line of code for launching a feature ingestion job:

ingest_features(script, s3_uri_prefix, feature_group_name, 
                instance_type, num_instances, num_processes, 
                num_workers)

To explore the complete code example, and even try it out in your own account, see the GitHub repo.

Conclusion

Feature Store provides a purpose-built feature management solution to help organizations scale ML development across business units and data science teams. SageMaker gives you the flexibility to choose from many different options for implementing feature pipelines to ingest feature data at scale. In this post, we explained several feature ingestion scenarios, and showed how SageMaker Processing jobs provide a simple yet scalable way to deploy repeatable batch ingestion. Give it a try, and let us know what you think in the comments.


About the Authors

Mark Roy is a Principal Machine Learning Architect for AWS, helping customers design and build AI/ML solutions. Mark’s work covers a wide range of ML use cases, with a primary interest in computer vision, deep learning, and scaling ML across the enterprise. He has helped companies in many industries, including insurance, financial services, media and entertainment, healthcare, utilities, and manufacturing. Mark holds six AWS certifications, including the ML Specialty Certification. Prior to joining AWS, Mark was an architect, developer, and technology leader for over 25 years, including 19 years in financial services.

Anoop Sanka focuses on ML at AWS. He works with customers to build scalable machine learning solutions on AWS.