AWS Machine Learning Blog

Schedule an Amazon SageMaker Data Wrangler flow to process new data periodically using AWS Lambda functions

Data scientists can spend up to 80% of their time preparing data for machine learning (ML) projects. This preparation process is largely undifferentiated and tedious work, and can involve multiple programming APIs and custom libraries. Announced at AWS re:Invent 2020, Amazon SageMaker Data Wrangler reduces the time it takes to aggregate and prepare data for ML from weeks to minutes. With Data Wrangler, you can simplify the process of data preparation and feature engineering. You can complete each step of the data preparation workflow, including data selection, cleansing, exploration, and visualization, from a single visual interface. For more information about how to prepare your datasets for ML training, inference, or other use cases, see Introducing Amazon SageMaker Data Wrangler, a Visual Interface to Prepare Data for Machine Learning.

Data Wrangler natively connects data sources such as Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon Redshift, and Snowflake. Data Wrangler also integrates to multiple SageMaker features like SageMaker Clarify, Feature Store, and Pipelines. The Data Wrangler UI has been launched as part of SageMaker Studio, which is the primary IDE for SageMaker. You can explore your data within the Data Wrangler UI and create a data flow (.flow file) that defines an exportable series of ML data preparation steps, ready for integration with training and inference ML workflows. After you create a preprocessing flow based on your sample data, you need a way to transfer the pipeline logic into your production workflow to handle incoming data periodically.

This post demonstrates how to schedule your data preparation to run automatically using AWS Lambda and an existing Data Wrangler .flow file. Lambda is a serverless compute service that lets you run your code with the right execution power and zero administration.

Steps required to schedule your Data Wrangler flow to run regularly:

  1. Export your Data Wrangler .flow file as a SageMaker processing script.
  2. Create a Lambda function from the processing script.
  3. Schedule the Lambda function defined in the previous step to run using Amazon EventBridge.
  4. Optionally, you can parameterize the Data Wrangler .flow file based on the modularity required (which we demonstrate in this post).

This post assumes you have an existing .flow file from your processing step using Data Wrangler.

Export your Data Wrangler .flow file

We use an existing .flow file that was generated from two data sources (Amazon S3 and Athena) for demonstration purposes. You can use any existing .flow file to follow along with this post.

  1. Choose the .flow file (right click).
  2. On the Open With menu, choose Flow.
  3. On the Export tab, choose Save to S3.

The notebook can run the processing job and save the output to an Amazon S3 location.

Data Wrangler exports the entire processing steps into a notebook. The notebook can run the processing job and save the output to an Amazon S3 location. To run this processing job on a regular basis, let’s say daily, monthly, or quarterly, we need to set up a Lambda function and schedule it for the required frequency. Lambda lets you run code without provisioning or managing servers.

  1. The first step to prepare the Lambda function is to export the notebook into an executable Python file.

After we have the Python script, we can modify the script to create a Lambda function.

  1. Open the .py script and remove comments that were automatically generated from the notebook, and rename the script lambda_function.py.
  2. Bring all the import statements at the beginning of the script and add one helper function to the script, lambda_handler().

This is the main Python function for the Lambda function that is used to schedule the .flow file.

See the following code:

def lambda_handler(event, context):
    """ main handler function for Lambda """

    job_info = run_flow()

    api_response = {
        'statusCode': 200,
        'event': event,
        'job_info': job_info
    }

    return api_response
  1. Wrap the auto-generated code within a function, run_flow().

This is to call the function we defined in the previous step.

  1. Modify one line where iam_role is defined. We need to find the SageMaker execution role and update iam_role with the ARN for that role.
  2. Add one more line at the end of run_flow() function as the following:
return job_result

See the following code:

def run_flow():

    data_sources = []

    data_sources.append(ProcessingInput(
        input_name="Inpatient_Claim",
        dataset_definition=DatasetDefinition(
            local_path="/opt/ml/processing/Inpatient_Claim",
            data_distribution_type="FullyReplicated",
            # You can override below to point to other database or use different queries
            athena_dataset_definition=AthenaDatasetDefinition(
                catalog="AwsDataCatalog",
                database="cms",
                query_string="select * from inpatient_claim where clm_from_dt between 20080100 and 20080200",
                output_s3_uri="s3://sagemaker-us-east-1-123456789/athena/Inpatient_Claim/",
                output_format="PARQUET"
            )
        )
    ))

    data_sources.append(ProcessingInput(
        source="s3://sagemaker-us-east-1-123456789/DW/DE1_0_2008_Beneficiary_Summary_File_Sample_20.csv",  # You can override this to point to other dataset on S3
        destination="/opt/ml/processing/2008_Beneficiary_Summary",
        input_name="2008_Beneficiary_Summary",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated"
    ))

    # SageMaker session
    sess = sagemaker.Session()
    print(f"Data Wrangler sagemaker session: {sess}")
    
    # You can configure this with your own bucket name, e.g.
    # bucket = <my-own-storage-bucket>
    bucket = sess.default_bucket()
    print(f"Data Wrangler export storage bucket: {bucket}")

    # unique flow export ID
    flow_export_id = f"{time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
    flow_export_name = f"flow-{flow_export_id}"

    # Output name is auto-generated from the select node's ID + output name from the flow file.
    output_name = "8b392709-d2c4-4b8e-bdda-e75b2d14f35e.default"

    s3_output_prefix = f"export-{flow_export_name}/output"
    s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
    print(f"Flow S3 export result path: {s3_output_path}")

    s3_processing_output = ProcessingOutput(
        output_name=output_name,
        source="/opt/ml/processing/output",
        destination=s3_output_path,
        s3_upload_mode="EndOfJob"
    )

    # name of the flow file which should exist in the current notebook working directory
    flow_file_name = "cms.flow"

    with open(flow_file_name) as f:
        flow = json.load(f)

    # Upload flow to S3
    s3_client = boto3.client("s3")
    s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_export_name}.flow")

    flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_export_name}.flow"

    print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

    # Input - Flow: cms.flow
    flow_input = ProcessingInput(
        source=flow_s3_uri,
        destination="/opt/ml/processing/flow",
        input_name="flow",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated"
    )

    print(f"ProcessinInput defined")

    # IAM role for executing the processing job.
    iam_role = 'arn:aws:iam::123456789:role/service-role/AmazonSageMaker-ExecutionRole-20191002T141534'  # sagemaker.get_execution_role()

    # Unique processing job name. Please give a unique name every time you re-execute processing jobs
    processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

    # Data Wrangler Container URL.
    container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"

    # Processing Job Instance count and instance type.
    instance_count = 2
    instance_type = "ml.m5.4xlarge"

    # Size in GB of the EBS volume to use for storing data during processing
    volume_size_in_gb = 30

    # Content type for each output. Data Wrangler supports CSV as default and Parquet.
    output_content_type = "CSV"

    # Network Isolation mode; default is off
    enable_network_isolation = False

    # Output configuration used as processing job container arguments 
    output_config = {
        output_name: {
            "content_type": output_content_type
        }
    }

    processor = Processor(
        role=iam_role,
        image_uri=container_uri,
        instance_count=instance_count,
        instance_type=instance_type,
        network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
        sagemaker_session=sess
    )
    print(f"Processor defined")

    # Start Job

    processor.run(
        inputs=[flow_input] + data_sources, 
        outputs=[s3_processing_output],
        arguments=[f"--output-config '{json.dumps(output_config)}'"],
        wait=False,
        logs=False,
        job_name=processing_job_name
    )
    s3_job_results_path = f"s3://{bucket}/{s3_output_prefix}/{processing_job_name}"
    print(f"Job results are saved to S3 path: {s3_job_results_path}")

    job_result = sess.wait_for_processing_job(processing_job_name)
    print(job_result)
    return job_result

Create a Lambda function

Now that we have the required Python script, we can create a Lambda function. For instructions on creating a function using the Python runtime, see Building Lambda functions with Python. For more information about getting started with Lambda, see Run a Serverless “Hello, World!” with AWS Lambda.

The Lambda runtime dependency can be any package, module, or other assembly dependency that isn’t included with the Lambda runtime environment for your function’s code. If your code has a dependency on standard Python math or logging libraries or Boto3, you don’t need to include the libraries in your .zip file. However, some ML-related packages, for example Pandas, NumPy, and SageMaker, need to be packaged within the Lambda .zip file.

  1. Run the following commands to include the SageMaker package within the Lambda .zip file:
# Install the Amazon SageMaker modules in the specific folder
$ pip install sagemaker --target sagemaker-installation
# Remove tests and cache (to reduce size)
$ find ./sagemaker-installation -type d -name "tests" -exec rm -rfv {} +
$ find ./sagemaker-installation -type d -name "__pycache__" -exec rm -rfv {} 

# create a zipfile to be used by the AWS Lambda function 
$ zip -r lambda-deployment.zip sagemaker-installation

# add flow file and lambda_function.py
$ zip -g lambda-deployment.zip lambda_function.py cms.flow

Now you can use this .zip file to create a Lambda function.

  1. On the Lambda console, create a function with the option Author from scratch.
  2. Upload your .zip file.

You can fulfill the requirement for NumPy within the Lambda runtime environment via Lambda layers.

  1. Add a base Python3.8 Scipy1x layer provided by AWS, which can be found via the Lambda function console.

Schedule your Lambda function using EventBridge

After you create and test your Lambda function, it’s time to schedule your function using EventBridge. EventBridge is a serverless event bus service that makes it easy to connect your applications with data from a variety of sources. For this post, we schedule data to be processed every hour and saved to a specific S3 bucket.

  1. On the Amazon EventBridge console, on the Rules page, choose Create rule.
  2. For Name, enter a name.
  3. For Description, enter an optional description.
  4. For Define pattern, select Schedule.
  5. Set the fixed rate to every 1 hour.
  6. For Target, choose Lambda function.
  7. For Function, choose Schedule_Flow.

Alternatively, we can deploy the Lambda function and schedule using EventBridge with a serverless template file (see the following code). For an example on deploying a serverless template, see Amazon EventBridge Scheduled to AWS Lambda. The required template file and an example lambda_function.py are available on the GitHub repo.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Create a Lambda function to run the processing job and schedule from a cron job in EventBridge

Resources:
  ScheduledFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: 
      Handler: lambda_function.lambda_handler
      Runtime: python3.8
      MemorySize: 128
            
      Events:
        ScheduledFunction:
          Type: Schedule
          Properties:
            Schedule: rate(1 hour)

Additional parameterization of the .flow file

Direct export of the .flow file works on the same data source and same query. But you can parameterize the exported script from the .flow file to handle changes in a data source, for example, to point to a different S3 bucket or different file, or change the query to pull incremental data from other data sources. In our preceding example code, we had the following query pull data from an Athena table:

query_string = "select * from inpatient_claim where clm_from_dt between 20080100 and 20080200"

We can easily generate this query string with an additional step using the generate_query() function.

We can call this generate_query() function within the run_flow() function to create a new query during runtime.

We can also specify the output S3 bucket instead of using the default one, and parameterize the other auto-generated output paths if needed.

Clean up

When you’re finished, we recommend deleting all the AWS resources created in this demo to avoid additional recurring costs.

  1. Delete the EventBridge rule.
  2. Delete the Lambda function.
  3. Delete the .flow files and output files in the S3 bucket.
  4. Shut down Data Wrangler.

Conclusion

In this post, we demonstrated how to automate a Data Wrangler flow to run on a schedule using a Lambda function with EventBridge. Additionally, we showed how we can parameterize the .flow file for different data sources or make changes in the query using custom functions within the Lambda function script. Most importantly, we showed how to automate an entire ML preprocessing workflow with a limited number of lines (about 10) of code.

To get started with Data Wrangler, see Introducing Amazon SageMaker Data Wrangler, a Visual Interface to Prepare Data for Machine Learning. You can also explore advanced topics like cross-account access for Data Wrangler or connecting to Snowflake as a data source. For the latest information on Data Wrangler, see the product page.

Data Wrangler makes it easy to prepare, process, explore without much experience with scripting languages like Python or Spark. With native connection to Feature Store, Processing, and Pipelines, the end-to-end ML model development and deployment is more user-friendly and intuitive.

Disclaimer

For our ML use case, we used the public data from cms.gov to create our demo .flow file to show an example where real data can come at regular intervals with a similar data model requiring a scheduled preprocessing job.

Citation

Centers for Medicare & Medicaid Services. (2018). 2019 Health Insurance Exchange Public Use Files (Medicare Claims Synthetic Public Use Files) [Data file and code book]. Retrieved from https://www.cms.gov/Research-Statistics-Data-and-Systems/Downloadable-Public-Use-Files/SynPUFs/DE_Syn_PUF

“Medicare Claims Synthetic Public Use Files” is made available under the Open Database License. Any rights in individual contents of the database are licensed under the Database Contents License.


About the Authors

Jayeeta Ghosh is a Data Scientist within ML ProServe, who works on AI/ML projects for AWS customers and helps solve customer business problems across industries using deep learning and cloud expertise.

Chenyang (Peter) Liu is a Senior Software Engineer on the Amazon SageMaker Data Wrangler team. He is passionate about low-code machine learning systems with state-of-art techniques. In his spare time, he is a foodie and enjoys road trips.