AWS Machine Learning Blog

Deploying your own data processing code in an Amazon SageMaker Autopilot inference pipeline

The machine learning (ML) model-building process requires data scientists to manually prepare data features, select an appropriate algorithm, and optimize its model parameters. It involves a lot of effort and expertise. Amazon SageMaker Autopilot removes the heavy lifting required by this ML process. It inspects your dataset, generates several ML pipelines, and compares their performance to produce a leaderboard of candidate pipelines. Each candidate pipeline is a combination of data preprocessing steps, an ML algorithm, and its optimized hyperparameters. You can easily deploy any of these candidate pipelines to use for real-time prediction or batch prediction.

But what if you want to preprocess the data before invoking Amazon SageMaker Autopilot? For example, you might have a dataset with several features and need customized feature selection to remove irrelevant variables before using it to train a model in an Autopilot job. Then you need to incorporate your custom processing code into the pipeline when deploying it to a real-time endpoint or for batch processing. This post shows you how to customize an Autopilot inference pipeline with your own data processing code. The code from this post is available in the GitHub repo.

Solution overview

The solution of customizing a pipeline that combines custom feature selection with Autopilot models includes the following steps:

  1. Prepare a dataset with 100 features as the example dataset for this post and upload it to Amazon Simple Storage Service (Amazon S3).
  2. Train the feature selection model and prepare the dataset using sagemaker-scikit-learn-container to feed to Autopilot.
  3. Configure and launch the Autopilot job.
  4. Create an inference pipeline that combines feature selection with the Autopilot models.
  5. Make predictions with the inference pipeline.

The following diagram outlines the architecture of this workflow.

Preparing and uploading the dataset

First, we generate a regression dataset using sklearn.datasets.make_regression. Set the number of features to 100. Five of these features are informative. The 100 variable names are indexed as x_i and the name of the target variable is y:

X, y = make_regression(n_features = 100, n_samples = 1500, n_informative = 5, random_state=0)
df_X = pd.DataFrame(X).rename(columns=lambda x: 'x_'+ str(x))
df_y = pd.DataFrame(y).rename(columns=lambda x: 'y')
df = pd.concat([df_X, df_y], axis=1)

The following screenshot shows the data generated. You upload this dataset to Amazon S3 to use in later steps.

Training the feature selection model and preparing the dataset

Feature selection is the process of selecting a subset of the most relevant features on which to train an ML model. This simplification shortens training time and reduces the chance of overfitting. The sklearn.feature_selection module contains several feature selection algorithms. For this post, we use the following:

  • feature_selection.RFE – The recursive feature elimination (RFE) algorithm selects features by recursively considering smaller and smaller sets of features. First, the estimator is trained on the initial set of features and the importance of each feature is obtained. Then, the least important features are pruned from the current set of features. We use Epsilon-Support Vector Regression (sklearn.svm.SVR) as our learning estimator for RFE.
  • feature_selection.SelectKBest – The SelectKBest algorithm selects the k features that have the highest scores of a specified metric. We use mutual information and f regression as the score functions—both methods measure the dependency between variables. For more information about f regression and mutual information, see Feature Selection.

We stack these three feature selection algorithms into one sklearn.pipeline.Pipeline. RFE by default eliminates 50% of the total features. We use SelectKBest to select the top 30 features using the f_regression method and reduce the number of features to 10 using the mutual_info_regression method. Note that the feature selection algorithms used here are for demonstration purposes only. You can update the script to incorporate feature selection algorithm of your choice.

We also create a Python script for feature selection. In the following code example, we build a sklearn Pipeline object that implements the method we described:

'''Feature selection pipeline'''
feature_selection_pipe = pipe = Pipeline([
                 ('svr', RFE(SVR(kernel="linear"))),
                 ('f_reg',SelectKBest(f_regression, k=30)),
                ('mut_info',SelectKBest(mutual_info_regression, k=10))
                ])
feature_selection_pipe.fit(X_train,y_train)

To provide visibility on which features are selected, we use the following script to generate and save the names of selected features as a list:

  '''Save selected feature names'''
    feature_names = concat_data.columns[:-1]
    feature_names = feature_names[pipe.named_steps['svr'].get_support()]
    feature_names = feature_names[pipe.named_steps['f_reg'].get_support()]
    feature_names = feature_names[pipe.named_steps['mut_info'].get_support()]

We use the Amazon SageMaker SKLearn Estimator with a feature selection script as an entry point. The script is very similar to a training script you might run outside of Amazon SageMaker, but you can access useful properties about the training environment through various environment variables, such as SM_MODEL_DIR, which represents the path to the directory inside the container to write model artifacts to. These artifacts are uploaded to the Amazon S3 output path by the Amazon SageMaker training job. After training is complete, we save model artifacts and selected column names for use during inference to SM_MODEL_DIR. See the following code:

    joblib.dump(feature_selection_pipe, os.path.join(args.model_dir, "model.joblib"))
    ...
    joblib.dump(feature_selection_pipe, os.path.join(args.model_dir, "selected_feature_names.joblib"))

Although we use feature selection algorithms in this post, you can customize and add additional data preprocessing code, such as code for data imputation or other forms of data cleaning, to this entry point script.

Now that our feature selection model is properly fitted, we transform the raw input data to the training dataset with selected features. To use Amazon SageMaker batch transform to directly process the raw data and store back to Amazon S3, enter the following code:

# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer_output = os.path.join('s3://',bucket, prefix, 'Feature_selection_output/')
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    output_path = transformer_output,
    assemble_with = 'Line',
    accept = 'text/csv')
    
transformer.transform(train_input, content_type='text/csv') 

The notebook contains an additional step that adds the selected column names as headers to the generated CSV data files.

Configuring and launching the Autopilot job

The output from batch transform is the new training dataset for Autopilot. The new dataset has 10 features. To use Autopilot, we simply provide our new dataset and choose the target column to be y. Autopilot automatically inspects our dataset and runs several candidates to determine the optimal combination of data preprocessing steps, ML algorithms, and hyperparameters. Before launching the Autopilot job, we define the job input configuration, output configuration, and stopping criteria:

input_data_config = [{
      'DataSource': {
        'S3DataSource': {
          'S3DataType': 'S3Prefix',
          'S3Uri': 's3://{}/{}/training_data_new'.format(bucket,prefix)
        }
      },
      'TargetAttributeName': 'y'
    }
  ]

output_data_config = {
    'S3OutputPath': 's3://{}/{}/autopilot_job_output'.format(bucket,prefix)
  }

AutoML_Job_Config = {
    'CompletionCriteria': {
            'MaxCandidates': 50,
            'MaxAutoMLJobRuntimeInSeconds': 1800
        }
  }

Then we call the create_auto_ml_job API to launch the Autopilot job:

sm = boto3.Session().client(service_name='sagemaker',region_name=region)
timestamp_suffix = strftime('%d-%H-%M-%S', gmtime())

auto_ml_job_name = 'automl-blog' + timestamp_suffix
print('AutoMLJobName: ' + auto_ml_job_name)

sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name,
                      InputDataConfig=input_data_config,
                      OutputDataConfig=output_data_config,
                      AutoMLJobConfig = AutoML_Job_Config,
                      RoleArn=role)

Creating an inference pipeline that combines feature selection with Autopilot models

So far, we have created a model that takes raw data with 100 features and selects the 10 most relevant features. We also used Autopilot to create data processing and ML models to predict y. We now combine the feature selection model with Autopilot models to create an inference pipeline. After defining the models and assigning names, we create a PipelineModel that points to our preprocessing and prediction models. The pipeline.py file is available on GitHub. See the following code:

sklearn_image = sklearn_preprocessor.image_name
container_1_source = os.path.join("s3://", 
                                  sagemaker_session.default_bucket(), 
                                  sklearn_preprocessor.latest_training_job.job_name,
                                  "sourcedir.tar.gz"
                                 )
inference_containers = [
        {
            'Image': sklearn_image,
            'ModelDataUrl': sklearn_preprocessor.model_data,
            'Environment': {
                'SAGEMAKER_SUBMIT_DIRECTORY':container_1_source,
                'SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT': "text/csv",
                'SAGEMAKER_PROGRAM':'sklearn_feature_selection.py'
            }
        }]

inference_containers.extend(best_candidate['InferenceContainers'])

response = sagemaker.create_model(
        ModelName=pipeline_name,
        Containers=inference_containers,
        ExecutionRoleArn=role)

We then deploy the pipeline model to a single endpoint:

response = sagemaker.create_endpoint(
        EndpointName=pipeline_endpoint_name,
        EndpointConfigName=pipeline_endpoint_config_name,
    )

Making predictions with the inference pipeline

We can test our pipeline by sending data for prediction. The pipeline accepts raw data, transforms it using the feature selection model, and creates a prediction using the models Autopilot generated.

First, we define a payload variable that contains the data we want to send through the pipeline. We use the first five rows of the training data as our payload. Then we define a predictor using our pipeline endpoint, send the payload to the predictor, and print the model prediction:

from sagemaker.predictor import RealTimePredictor, csv_serializer
from sagemaker.content_types import CONTENT_TYPE_CSV
predictor = RealTimePredictor(
    endpoint=pipeline_endpoint_name,
    serializer=csv_serializer,
    sagemaker_session=sagemaker_session,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_CSV)

predictor.content_type = 'text/csv'
predictor.predict(test_data.to_csv(sep=',', header=True, index=False)).decode('utf-8')

Our Amazon SageMaker endpoint returns one prediction for each corresponding row of the data sent. See the following code:

'-102.248855591\n-165.823532104\n115.50453186\n111.306632996\n5.91651535034'

Deleting the endpoint

When we are finished with the endpoint, we delete it to save cost:

sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=pipeline_endpoint_name)

Conclusions

In this post, we demonstrated how to customize an Autopilot inference pipeline with your own data processing code. We first trained a feature selection model and converted our raw data using the trained feature selection model. Then we launched an Amazon SageMaker Autopilot job that automatically trained and tuned the best ML models for our regression problem. We also built an inference pipeline that combined feature selection with the Autopilot models. Lastly, we made predictions with the inference pipeline. For more about Amazon SageMaker Pilot, please see Amazon SageMaker Autopilot.


About the Authors

Qingwei Li is a Machine Learning Specialist at Amazon Web Services. He received his Ph.D. in Operations Research after he broke his advisor’s research grant account and failed to deliver the Nobel Prize he promised. Currently he helps customers in financial service and insurance industry build machine learning solutions on AWS. In his spare time, he likes reading and teaching.

 

 

 

Piali Das is a Senior Software Engineer in the AWS SageMaker Autopilot team. She previously contributed to building SageMaker Algorithms. She enjoys scientific programming in general and has developed an interest in machine learning and distributed systems.