AWS Big Data Blog

Orchestrate end-to-end scalable ETL pipeline with Amazon SageMaker workflows

Amazon SageMaker Unified Studio serves as a collaborative workspace where data engineers and scientists can work together on end-to-end data and machine learning (ML) workflows. SageMaker Unified Studio specializes in orchestrating complex data workflows across multiple AWS services through its integration with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Project owners can create shared environments where team members jointly develop and deploy workflows, while maintaining oversight of pipeline execution. This unified approach makes sure data pipelines run consistently and efficiently, with clear visibility into the entire process, making it seamless for teams to collaborate on sophisticated data and ML projects.

This post explores how to build and manage a comprehensive extract, transform, and load (ETL) pipeline using SageMaker Unified Studio workflows through a code-based approach. We demonstrate how to use a single, integrated interface to handle all aspects of data processing, from preparation to orchestration, by using AWS services including Amazon EMR, AWS Glue, Amazon Redshift, and Amazon MWAA. This solution streamlines the data pipeline through a single UI.

Example use case: Customer behavior analysis for an ecommerce platform

Let’s consider a real-world scenario: An e-commerce company wants to analyze customer transactions data to create a customer summary report. They have data coming from multiple sources:

  • Customer profile data stored in CSV files
  • Transaction history in JSON format
  • Website clickstream data in semi-structured log files

The company wants to do the following:

  • Extract data from these sources
  • Clean and transform the data
  • Perform quality checks
  • Load the processed data into a data warehouse
  • Schedule this pipeline to run daily

Solution overview

The following diagram illustrates the architecture that you implement in this post.

This architecture diagram illustrates a comprehensive, end-to-end data processing pipeline built on AWS services, orchestrated through Amazon SageMaker Unified Studio. The pipeline demonstrates best practices for data ingestion, transformation, quality validation, advanced processing, and analytics.

The workflow consists of the following steps:

  1. Establish a data repository by creating an Amazon Simple Storage Service (Amazon S3) bucket with an organized folder structure for customer data, transaction history, and clickstream logs, and configure access policies for seamless integration with SageMaker Unified Studio.
  2. Extract data from the S3 bucket using AWS Glue jobs.
  3. Use AWS Glue and Amazon EMR Serverless to clean and transform the data.
  4. Implement data quality validation using AWS Glue Data Quality.
  5. Load the processed data into Amazon Redshift Serverless.
  6. Create and manage the workflow environment using SageMaker Unified Studio with Identity Center–based domains.

Note: Amazon SageMaker Unified Studio supports two domain configuration models: IAM Identity Center (IdC)–based domains and IAM role–based domains. While IAM-based domains enable role-driven access management and visual workflows, this post specifically focuses on Identity Center–based domains, where users authenticate via IdC and projects access data and resources using project roles and identity-based authorization.

Prerequisites

Before beginning, ensure you have the following resources:

Configure Amazon SageMaker Unified Studio domain

This solution requires SageMaker Unified Studio domain in the us-east-1 AWS Region. Although SageMaker Unified Studio is available in multiple Regions, this post uses us-east-1 for consistency. For a complete list of supported Regions, refer to Regions where Amazon SageMaker Unified Studio is supported.

Complete the following steps to configure your domain:

  1. Sign in to the AWS Management Console, navigate to Amazon SageMaker, and open the Domains section from the left navigation pane.
  2. On the SageMaker console, choose Create domain, then choose Quick setup.
  3. If the message “No VPC has been specifically set up for use with Amazon SageMaker Unified Studio” appears, select Create VPC. The process redirects to an AWS CloudFormation stack. Leave all settings at their default values and select Create stack.
  4. Under Quick setup settings, for Name, enter a domain name (for example, etl-ecommerce-blog-demo). Review the selected configurations.
  5. Choose Continue to proceed.
  6. On the Create IAM Identity Center user page, create an SSO user (account with IAM Identity Center) or select an existing SSO user to log in to the Amazon SageMaker Unified Studio. The SSO selected here is used as the administrator in the Amazon SageMaker Unified Studio.
  7. Choose Create domain.

For detailed instructions, see Create a SageMaker domain and Onboarding data in Amazon SageMaker Unified Studio.

# Amazon SageMaker Domain Details Interface This screenshot shows the Amazon SageMaker domain details page for "etl-ecommerce-blog-demo

After you have created a domain, popup will appear with the message: “Your domain has been created! You can now log in to Amazon SageMaker Unified Studio”. You can close the popup for now.

Create a project

In this section, we create a project to serve as a collaborative workspace for teams to work on business use cases. Complete the following steps:

  1. Choose Open Unified Studio and sign in with your SSO credentials using the Sign in with SSO option.
  2. Choose Create project.
  3. Name the project (for example, ETL-Pipeline-Demo) and create it using the All capabilities project profile.
  4. Choose Continue.
  5. Keep the default values for the configuration parameters and choose Continue.
  6. Choose Create project.

Project creation might take a few minutes. After the project is created, the environment will be configured for data access and processing.

Integrate S3 bucket with SageMaker Unified Studio

To enable external data processing within SageMaker Unified Studio, configure integration with an S3 bucket. This section walks through the steps to set up the S3 bucket, configure permissions, and integrate it with the project.

Create and configure S3 bucket

Complete the following steps to create your bucket:

  1. In a new browser tab, open the AWS Management Console and search for S3.
  2. On the Amazon S3 console, choose Create Bucket .
  3. Create a bucket named ecommerce-raw-layer-bucket-demo-<Account-ID>-us-east-1. For detailed instructions, see create a general-purpose Amazon S3 bucket for storage.
  4. Create the following folder structure in the bucket. For detailed instructions, see Creating a folder:
    • raw/customers/
    • raw/transactions/
    • raw/clickstream/
    • processed/
    • analytics/

Upload sample data

In this section, we upload sample ecommerce data that represents a typical business scenario where customer behavior, transaction history, and website interactions need to be analyzed together.

The raw/customers/customers.csv file contains customer profile information, including registration details. This structured data will be processed first to establish the customer dimension for our analytics.

customer_id,name,email,registration_date
1,John Doe,john.doe@example.com,2022-01-15
2,Jane Smith,jane.smith@example.com,2022-02-20
3,Robert Johnson,robert.j@example.com,2022-01-30
4,Emily Brown,emily.b@example.com,2022-03-05
5,Michael Wilson,michael.w@example.com,2022-02-10

The raw/transactions/transactions.json file contains purchase transactions with nested product arrays. This semi-structured data will be flattened and joined with customer data to analyze purchasing patterns and customer lifetime value.

[
{"transaction_id": "t1001", "customer_id": 1, "amount": 125.99, "date": "2023-01-10", "items": ["product1", "product2"]},
{"transaction_id": "t1002", "customer_id": 2, "amount": 89.50, "date": "2023-01-12", "items": ["product3"]},
{"transaction_id": "t1003", "customer_id": 1, "amount": 45.25, "date": "2023-01-15", "items": ["product2"]},
{"transaction_id": "t1004", "customer_id": 3, "amount": 210.75, "date": "2023-01-18", "items": ["product1", "product4", "product5"]},
{"transaction_id": "t1005", "customer_id": 4, "amount": 55.00, "date": "2023-01-20", "items": ["product3", "product6"]}
]

The raw/clickstream/clickstream.csv file captures user website interactions and behavior patterns. This time-series data will be processed to understand customer journey and conversion funnel analytics.

timestamp,customer_id,page,action
2023-01-10T10:15:23,1,homepage,view
2023-01-10T10:16:45,1,product_page,view
2023-01-10T10:18:12,1,product_page,add_to_cart
2023-01-10T10:20:30,1,checkout,view
2023-01-10T10:22:15,1,checkout,purchase
2023-01-12T14:30:10,2,homepage,view
2023-01-12T14:32:20,2,product_page,view
2023-01-12T14:35:45,2,product_page,add_to_cart
2023-01-12T14:40:12,2,checkout,view
2023-01-12T14:42:30,2,checkout,purchase

raw

For detailed instructions on uploading files to Amazon S3, refer to the Uploading objects.

Configure CORS policy

To allow access from the SageMaker Unified Studio domain portal, update the Cross-Origin Resource Sharing (CORS) configuration of the bucket:

  1. On the bucket’s Permissions tab, choose Edit under Cross-origin resource sharing (CORS).
    permission
  2. Enter the following CORS policy and replace domainUrl with the SageMaker Unified Studio domain URL (for example, https://<domain-id>.sagemaker.us-east-1.on.aws ). The URL can be found at the top of the domain details page on the SageMaker Unified Studio console.
    [
        {
            "AllowedHeaders": [
                "*"
            ],
            "AllowedMethods": [
                "PUT",
                "GET",
                "POST",
                "DELETE",
                "HEAD"
            ],
            "AllowedOrigins": [
                "domainUrl"
            ],
            "ExposeHeaders": [
                "x-amz-version-id"
            ]
        }
    ]

For detailed information, see Adding Amazon S3 data and gain access using the project role.

Grant Amazon S3 access to SageMaker project role

To enable SageMaker Unified Studio to access the external Amazon S3 location, the corresponding AWS Identity and Access Management (IAM) project role must be updated with the required permissions. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Search for the project role using the last segment of the project role Amazon Resource Name (ARN). This information is located on the Project overview page in SageMaker Unified Studio (for example, datazone_usr_role_1a2b3c45de6789_abcd1efghij2kl).
    project detail
  3. Choose the project role to open the role details page.
  4. On the Permissions tab, choose Add permissions, then choose Create inline policy.
  5. Use the JSON editor to create a policy that grants the project role access to the Amazon S3 location
  6. In the JSON policy below, replace the placeholder values with your actual environment details:
    • Replace <BUCKET_PREFIX> with the prefix of S3 bucket name (for example, ecommerce-raw-layer)
    • Replace <AWS_REGION> with the AWS Region where your AWS Glue Data Quality rulesets are created (for example, us-east-1)
    • Replace <AWS_ACCOUNT_ID> with your AWS account ID
  7. Paste the updated JSON policy into the JSON editor.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ETLBucketListAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation"
                ],
                "Resource": "arn:aws:s3:::<BUCKET_PREFIX>-*"
            },
            {
                "Sid": "ETLObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": "arn:aws:s3:::<BUCKET_PREFIX>-*/*"
            },
            {
                "Sid": "GlueDataQualityPublish",
                "Effect": "Allow",
                "Action": [
                    "glue:PublishDataQuality"
                ],
     "Resource":"arn:aws:glue:<AWS_REGION>:<AWS_ACCOUNT_ID>:dataQualityRuleset/*"
            }
        ]
    }
  8. Choose Next.
  9. Enter a name for the policy (for example, etl-rawlayer-access), then choose Create policy.
  10. Choose Add permissions again, then choose Create inline policy.
  11. In the JSON editor, create a second policy to manage S3 Access Grants:Replace <BUCKET_PREFIX> with the prefix of S3 bucket name (for example, ecommerce-raw-layer) and paste this JSON policy.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "S3AGLocationManagement",
                "Effect": "Allow",
                "Action": [
                    "s3:CreateAccessGrantsLocation",
                    "s3:DeleteAccessGrantsLocation",
                    "s3:GetAccessGrantsLocation"
                ],
                "Resource": [
                    "arn:aws:s3:*:*:access-grants/default/*"
                ],
                "Condition": {
                    "StringLike": {
                        "s3:accessGrantsLocationScope": "s3://<BUCKET_PREFIX>-*/*"
                    }
                }
            },
            {
                "Sid": "S3AGPermissionManagement",
                "Effect": "Allow",
                "Action": [
                    "s3:CreateAccessGrant",
                    "s3:DeleteAccessGrant"
                ],
                "Resource": [
                    "arn:aws:s3:*:*:access-grants/default/location/*",
                    "arn:aws:s3:*:*:access-grants/default/grant/*"
                ],
                "Condition": {
                    "StringLike": {
                        "s3:accessGrantScope": "s3://<BUCKET_PREFIX>-*/*"
                    }
                }
            }
        ]
    }
  12. Choose Next.
  13. Enter a name for the policy (for example, s3-access-grants-policy), then choose Create policy.

create policy

For detailed information about S3 Access Grants, see Adding Amazon S3 data.

Add S3 bucket to project

After you add policies to the project role for access to the Amazon S3 resources, complete the following steps to integrate the S3 bucket with the SageMaker Unified Studio project:

  1. In SageMaker Unified Studio, open the project you created under Your projects.
    your projects
  2. Choose Data in the navigation pane.
  3. Select Add and then Add S3 location.
    add s3
  4. Configure the S3 location:
    1. For Name, enter a descriptive name (for example, E-commerce_Raw_Data).
    2. For S3 URI, enter your bucket URI (for example, s3://ecommerce-raw-layer-bucket-demo-<Account-ID>-us-east-1/).
    3. For AWS Region, enter your Region (for this example, us-east-1).
    4. Leave Access role ARN blank.
    5. Click Add S3 Location
  5. Wait for the integration to complete.
  6. Verify the S3 location appears in your project’s data catalog (on the Project overview page, on the Data tab, locate the Buckets pane to view the buckets and folders).

add

This process connects your S3 bucket to SageMaker Unified Studio, making your data ready for analysis.

Create notebook for job scripts

Before you can create the data processing jobs, you must set up a notebook to develop the scripts that will generate and process your data. Complete the following steps:

  1. In SageMaker Unified Studio, on the top menu, under Build, choose JupyterLab.
  2. Choose Configure Space and choose the instance type ml.t3.xlarge. This makes sure your JupyterLab instance has at least 4 vCPUs and 4 GiB of memory.
  3. Choose Configure and Start Space or Save and Restart to launch your environment.
  4. Wait a few moments for the instance to be ready.
  5. Choose File, New, and Notebook to create a new notebook.
  6. Set Kernel as Python 3, Connection type as PySpark, and Compute as Project.spark.compatibility.
    jupyter
  7. In the notebook, enter the following script to use later for your AWS Glue job. This script processes raw data from three sources in the S3 data lake, standardizes dates, and converts data types before saving the cleaned data in Parquet format for optimal storage and querying.
  8. Replace <Bucket-Name> with the name of actual S3 bucket in script:
    import sys
    from awsglue.transforms import *
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    from pyspark.sql import functions as F
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    # Customers
    customer_df = (
        spark.read
        .option("header", "true")
        .csv("s3://<Bucket-Name>/raw/customers/")
        .withColumn("registration_date", F.to_date("registration_date"))
        .withColumn("processed_at", F.current_timestamp())
    )
    customer_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/customers/"
    )
    # Transactions
    transaction_df = (
        spark.read
        .json("s3://<Bucket-Name>/raw/transactions/")
        .withColumn("date", F.to_date("date"))
        .withColumn("customer_id", F.col("customer_id").cast("int"))
        .withColumn("processed_at", F.current_timestamp())
    )
    transaction_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/transactions/"
    )
    # Clickstream
    clickstream_df = (
        spark.read
        .option("header", "true")
        .csv("s3://<Bucket-Name>/raw/clickstream/")
        .withColumn("customer_id", F.col("customer_id").cast("int"))
        .withColumn("timestamp", F.to_timestamp("timestamp"))
        .withColumn("processed_at", F.current_timestamp())
    )
    clickstream_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/clickstream/"
    )
    print("Data processing completed successfully")
    job.commit()

    This script processes customer, transaction, and clickstream data from the raw layer in Amazon S3 and saves it as Parquet files in the processed layer.

  9. Choose File, Save Notebook As, and save the file as shared/etl_initial_processing_job.ipynb.
    jupyter2

Create notebook for AWS Glue Data Quality

After you create the initial data processing script, the next step is to set up a notebook to perform data quality checks using AWS Glue. These checks help validate the integrity and completeness of your data before further processing. Complete the following steps:

  1. Choose File, New, and Notebook to create a new notebook.
  2. Set Kernel as Python 3, Connection type as PySpark, and Compute as Project.spark.compatibility.
    select-kernel
  3. In this new notebook, add the data quality check script using the AWS Glue EvaluateDataQuality method. Replace <Bucket-Name> with the name of actual S3 bucket in script:
    from datetime import datetime
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsgluedq.transforms import EvaluateDataQuality
    from awsglue.transforms import SelectFromCollection
    
    # ---------------- Glue setup ----------------
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init("GlueDQJob", {})
    
    # ---------------- Constants ----------------
    RUN_DATE = datetime.utcnow().strftime("%Y-%m-%d")
    year, month, day = RUN_DATE.split("-")
    OUTPUT_PATH = "s3://<Bucket-Name>/data-quality-results"
    
    # ---------------- Tables and Rules ----------------
    tables = {
        "customers": ["s3://<Bucket-Name>/processed/customers/",
                      ["IsComplete \"customer_id\"", "IsUnique \"customer_id\"", "IsComplete \"email\""]],
        "transactions": ["s3://<Bucket-Name>/processed/transactions/",
                         ["IsComplete \"transaction_id\"", "IsUnique \"transaction_id\""]],
        "clickstream": ["s3://<Bucket-Name>/processed/clickstream/",
                        ["IsComplete \"customer_id\"", "IsComplete \"action\""]]
    }
    
    # ---------------- Process Each Table ----------------
    for table, (path, rules) in tables.items():
        df = glueContext.create_dynamic_frame.from_options("s3", {"paths":[path]}, "parquet")
        results = EvaluateDataQuality().process_rows(
            frame=df,
            ruleset=f"Rules = [{', '.join(rules)}]",
            publishing_options={"dataQualityEvaluationContext": table}
        )
        rows = SelectFromCollection.apply(results, key="rowLevelOutcomes", transformation_ctx="rows").toDF()
        rows = rows.drop("DataQualityRulesPass", "DataQualityRulesFail", "DataQualityRulesSkip")
    
        # Write passed/failed rows
        for status, colval in [("pass","Passed"), ("fail","Failed")]:
            tmp = rows.filter(rows.DataQualityEvaluationResult.contains(colval))
            if tmp.count() > 0:
                tmp.write.mode("append").parquet(
            f"{OUTPUT_PATH}/{table}/status=dq_{status}/Year={year}/Month={month}/Date={day}"
                )
    print("Data Quality checks completed and written to S3")
    job.commit()
  4. Choose File, Save Notebook As, and save the file as shared/etl_data_quality_job.ipynb.

Create and test AWS Glue jobs

Jobs in SageMaker Unified Studio enable scalable, flexible ETL pipelines using AWS Glue. This section walks through creating and testing data processing jobs for efficient and governed data transformation.

Create initial data processing job

This job performs the first processing job in the ETL pipeline, transforming raw customer, transaction, and clickstream data and writing the cleaned output to Amazon S3 in Parquet format. Complete the following steps to create the job:

  1. In SageMaker Unified Studio, go to your project.
  2. On the top menu, choose Build, and under Data Analysis & Integration, choose Data processing jobs.
    navbar on smus
  3. Choose Create job from notebooks.
  4. Under Choose project files, choose Browse files.
  5. Locate and select etl_initial_processing_job.ipynb (the notebook saved earlier in JupyterLab), then choose Select and Next.
    select the notebook
  6. Configure the job settings:
    1. For Name, enter a name (for example, job-1).
    2. For Description, enter a description (for example, Initial ETL job for customer data processing).
    3. For IAM Role, choose the project role (default).
    4. For Type, choose Spark.
    5. For AWS Glue version, use version 5.0.
    6. For Language, choose Python.
    7. For Worker type, use G.1X.
    8. For Number of Instances, set to 10.
    9. For Number of retries, set to 0.
    10. For Job timeout, set to 480.
    11. For Compute connection, choose project.spark.compatibility.
    12. Under Advanced settings, turn on Continuous logging.

    advanced setting

  7. Leave the remaining settings as default, then choose Submit.

After the job is created, a confirmation message will appear indicating that job-1 was created successfully.

Create AWS Glue Data Quality job

This job runs data quality checks on the transformed datasets using AWS Glue Data Quality. Rulesets validate completeness and uniqueness for key fields. Complete the following steps to create the job:

  1. In SageMaker Unified Studio, go to your project.
  2. On the top menu, choose Build, and under Data Analysis & Integration, choose Data processing jobs.
  3. Choose Create job, Code-based job, and Create job from files.
  4. Under Choose project files, choose Browse files.
  5. Locate and select etl_glue_data_quality.ipynb, then choose Select and Next.
  6. Configure the job settings:
  7. For Name, enter a name (for example, job-2).
  8. For Description, enter a description (for example, Data quality checks using AWS Glue Data Quality).
  9. For IAM Role, choose the project role.
  10. For Type, choose Spark.
  11. For AWS Glue version, use version 5.0.
  12. For Language, choose Python.
  13. For Worker type, use G.1X.
  14. For Number of Instances, set to 10.
  15. For Number of retries, set to 0.
  16. For Job timeout, set to 480.
  17. For Compute connection, choose project.spark.compatibility.
  18. Under Advanced settings, turn on Continuous logging.
  19. Leave the remaining settings as default, then choose Submit.

After the job is created, a confirmation message will appear indicating that job-2 was created successfully.

Test AWS Glue jobs

Test both jobs to make sure they execute successfully:

  1. In SageMaker Unified Studio, go to your project.
  2. On the top menu, choose Build, and under Data Analysis & Integration, choose Data processing jobs.
  3. Select job-1 and choose Run job.
  4. Monitor the job execution and verify it completes successfully.
  5. Similarly, select job-2 and choose Run job.
  6. Monitor the job execution and verify it completes successfully.

Add EMR Serverless compute

In the ETL pipeline, we use EMR Serverless to perform compute-intensive transformations and aggregations on large datasets. It automatically scales resources based on workload, offering high performance with simplified operations. By integrating EMR Serverless with SageMaker Unified Studio, you can simplify the process of running Spark jobs interactively using Jupyter notebooks in a serverless environment.

This section walks through the steps to configure EMR Serverless compute within SageMaker Studio and use it for executing distributed data processing jobs.

Configure EMR Serverless in SageMaker Unified Studio

To use EMR Serverless for processing in the project, follow these steps:

  1. In the navigation pane on Project Overview, choose Compute.
  2. On the Data processing tab, choose Add compute and Create new compute resources.
  3. Select EMR Serverless and choose Next.
  4. Configure EMR Serverless settings:
  5. For Compute name, enter a name (for example, etl-emr-serverless).
  6. For Description, enter a description (for example, EMR Serverless for advanced data processing).
  7. For Release label, choose emr-7.8.0.
  8. For Permission mode, choose Compatibility.
  9. Choose Add Compute to complete the setup.

After it’s configured, the EMR Serverless compute will be listed with the deployment status Active.

emr serverless

Create and run notebook with EMR Serverless

After you create the EMR Serverless compute, you can run PySpark-based data transformation jobs using a Jupyter notebook to perform large-scale data transformations. This job reads cleaned customer, transaction, and clickstream datasets from Amazon S3, performs aggregations and scoring, and writes the final analytics outputs back to Amazon S3 in both Parquet and CSV formats.Complete the following steps to create a notebook for EMR Serverless processing:

  1. On the top menu, under Build, choose JupyterLab.
  2. Choose File, New, and Notebook.
  3. Set Kernel as Python 3, Connection type as PySpark, and Compute as emr-s.etl-emr-serverless.
    compute
  4. Enter the following PySpark script to run your data transformation job on EMR Serverless. Provide the name of your S3 bucket:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.appName("CustomerAnalytics").getOrCreate()
    
    customers = spark.read.parquet("s3://<bucket-name>/processed/customers/")
    transactions = spark.read.parquet("s3://<bucket-name>/processed/transactions/")
    clickstream = spark.read.parquet("s3://<bucket-name>/processed/clickstream/")
    
    customer_spending = transactions.groupBy("customer_id").agg(
        F.count("transaction_id").alias("total_transactions"),
        F.sum("amount").alias("total_spent"),
        F.avg("amount").alias("avg_transaction_value"),
        F.datediff(F.current_date(), F.max("date")).alias("days_since_last_purchase")
    )
    
    customer_engagement = clickstream.groupBy("customer_id").agg(
        F.count("*").alias("total_clicks"),
        F.countDistinct("page").alias("unique_pages_visited"),
        F.count(F.when(F.col("action") == "purchase", 1)).alias("purchase_actions"),
        F.count(F.when(F.col("action") == "add_to_cart", 1)).alias("add_to_cart_actions")
    )
    
    customer_analytics = customers.join(customer_spending, on="customer_id", how="left").join(
        customer_engagement, on="customer_id", how="left")
    
    customer_analytics = customer_analytics.na.fill(0, [
        "total_transactions", "total_spent", "total_clicks", 
        "unique_pages_visited", "purchase_actions", "add_to_cart_actions"
    ])
    
    customer_analytics = customer_analytics.withColumn(
        "customer_value_score",
        (F.col("total_spent") * 0.5) + (F.col("total_transactions") * 0.3) + (F.col("purchase_actions") * 0.2)
    )
    
    customer_analytics.write.mode("overwrite").parquet("s3://<bucket-name>/analytics/customer_analytics/")
    
    customer_summary = customer_analytics.select(
        "customer_id", "name", "email", "registration_date", 
        "total_transactions", "total_spent", "avg_transaction_value",
        "days_since_last_purchase", "total_clicks", "purchase_actions",
        "customer_value_score"
    )
    
    customer_summary.write.mode("overwrite").option("header", "true").csv("s3://<bucket-name>/analytics/customer_summary/")
    
    print("EMR processing completed successfully")
  5. Choose File, Save Notebook As, and save the file as shared/emr_data_transformation_job.ipynb.
  6. Choose Run Cell to run the script.
  7. Monitor the Script execution and verify it completes successfully.
  8. Monitor the Spark job execution and ensure it completes without errors.

emr run

Add Redshift Serverless compute

With Redshift Serverless, users can run and scale data warehouse workloads without managing infrastructure. It is ideal for analytics use cases where data needs to be queried from Amazon S3 or integrated into a centralized warehouse. In this step, you add Redshift Serverless to the project for loading and querying processed customer analytics data generated in earlier stages of the pipeline. For more information about Redshift Serverless, see Amazon Redshift Serverless.

Set up Redshift Serverless compute in SageMaker Unified Studio

Complete the following steps to set up Redshift Serverless compute:

  1. In SageMaker Unified Studio, choose the Compute tab within your project workspace (ETL-Pipeline-Demo).
  2. On the SQL analytics tab, choose Add compute, then choose Create new compute resources to begin configuring your compute environment.
  3. Select Amazon Redshift Serverless.
  4. Configure the following:
    1. For Compute name, enter a name (for example, ecommerce_data_warehouse).
    2. For Description, enter a description (for example, Redshift Serverless for data warehouse).
    3. For Workgroup name, enter a name (for example, redshift-serverless-workgroup).
    4. For Maximum capacity, set to 512 RPUs.
    5. For Database name, enter dev.
  5. Choose Add Compute to create the Redshift Serverless resource.
    Redshift

After the compute is created, you can test the Amazon Redshift connection.

  1. On the Data warehouse tab, confirm that redshift.ecommerce_data_warehouse is listed.
    compute-redshift
  2. Choose the compute: redshift.ecommerce_data_warehouse.
  3. On the Permissions tab, copy the IAM role ARN. You use this for the Redshift COPY command in the next step.
    iam-role

Create and execute querybook to load data into Amazon Redshift

In this step, you create a SQL script to load the processed customer summary data from Amazon S3 into a Redshift table. This enables centralized analytics for customer segmentation, lifetime value calculations, and marketing campaigns. Complete the following steps:

  1. On the Build menu, under Data Analysis & Integration, choose Query editor.
  2. Enter the following SQL into the querybook to create the customer_summary table in the public schema:
    -- Create customer_summary table in public schema
    CREATE TABLE IF NOT EXISTS public.customer_summary (
        customer_id INT PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100),
        registration_date DATE,
        total_transactions INT,
        total_spent DECIMAL(10, 2),
        avg_transaction_value DECIMAL(10, 2),
        days_since_last_purchase INT,
        total_clicks INT,
        purchase_actions INT,
        customer_value_score DECIMAL(10, 2)
    );
  3. Choose Add SQL to add a new SQL script.
  4. Enter the following SQL into the querybook
    TRUNCATE TABLE customer_summary;

    Note: We truncate the customer_summary table to remove existing records and ensure a clean, duplicate-free reload of the latest aggregated data from S3 before running the COPY command.

  5. Choose Add SQL to add a new SQL script.
  6. Enter the following SQL to load the data into Redshift Serverless from your S3 bucket. Provide the name of your S3 bucket and IAM role ARN for Amazon Redshift:
    -- Load data from S3 (replace with your bucket name and IAM role)
    COPY public.customer_summary FROM 's3://<bucket-name>/analytics/customer_summary/'
    IAM_ROLE 'arn:aws:iam::<Account-ID>:role/<your-redshift-role>'
    FORMAT AS CSV
    IGNOREHEADER 1
    REGION 'us-east-1';
  7. In the Query Editor, configure the following:
    1. Connection: redshift.ecommerce_data_warehouse
    2. Database: dev
    3. Schema: public

    query

  8. Choose Choose to apply the connection settings.
  9. Choose Run Cell for each cell to create the customer_summary table in the public schema and then load data from Amazon S3.
  10. Choose Actions, Save, name the querybook final_data_product, and choose Save changes.

This completes the creation and execution of the Redshift data product using the querybook.

Create and manage the workflow environment

This section describes how to create a shared workflow environment and define a code-based workflow that automates a customer data pipeline using Apache Airflow within SageMaker Unified Studio. Shared environments facilitate collaboration among project members and centralized workflow management.

Create the workflow environment

Workflow environments must be created by project owners. After they’re created, members of the project can sync and use the workflows. Only project owners can update or delete workflow environments. Complete the following steps to create the workflow environment:

  1. Choose Compute for your project.
  2. On the Workflow environments tab, choose Create.
  3. Review the configuration parameters and choose Create workflow environment.
  4. Wait for the environment to be fully provisioned before proceeding It will take around 20 minutes to provision.

workflow

Create the code-based workflow

When the workflow environment is ready, define a code-based ETL pipeline using Airflow. This pipeline automates daily processing tasks across services like AWS Glue, EMR Serverless, and Redshift Serverless.

  1. On the Build menu, under Orchestration, choose Workflows.
  2. Choose Create new workflow, then choose Create workflow in code editor.
  3. Configure Space and choose the instance type ml.t3.xlarge. This ensures your JupyterLab instance has at least 4 vCPUs and 4 GiB of memory.
  4. Choose Configure and Restart Space to launch your environment.

sample_dag

The following script defines a daily scheduled ETL workflow that automates several actions:

  • Initial data transformation using AWS Glue
  • Data quality validation using AWS Glue (EvaluateDataQuality)
  • Advanced data processing with EMR Serverless using a Jupyter notebook
  • Loading transformed results into Redshift Serverless from a querybook
  1. Replace the default DAG template with the following definition, ensuring that job names and input paths match the actual names used in your project:
    from datetime import datetime
    from airflow import DAG
    from airflow.decorators import dag
    from airflow.utils.dates import days_ago
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    from workflows.airflow.providers.amazon.aws.operators.sagemaker_workflows import NotebookOperator
    from sagemaker_studio import Project
    # Get SageMaker Studio project IAM role
    project = Project()
    default_args = {
        'owner': 'data_engineer',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1
    }
    @dag(
        dag_id='customer_etl_pipeline',
        default_args=default_args,
        schedule_interval='@daily',
        start_date=days_ago(1),
        is_paused_upon_creation=False,
        tags=['etl', 'customer-analytics'],
        catchup=False
    )
    def customer_etl_pipeline():
        # Step 1: Initial data transformation using Glue
        initial_transformation = GlueJobOperator(
            task_id='initial_transformation',
            job_name='job-1',
            iam_role_arn=project.iam_role,
        )
        # Step 2: Data quality checks using Glue DQ
        data_quality_check = GlueJobOperator(
            task_id='data_quality_check',
            job_name='job-6',
            iam_role_arn=project.iam_role,
        )
        # Step 3: EMR Serverless notebook processing
        emr_processing = NotebookOperator(
            task_id='emr_processing',
            input_config={
                "input_path": "emr_data_transformation_job.ipynb",
                "input_params": {}
            },
            output_config={"output_formats": ['NOTEBOOK']},
            poll_interval=10,
        )
        # Step 4: Load to Redshift notebook
        redshift_load = NotebookOperator(
            task_id='redshift_load',
            input_config={
                "input_path": "final_data_product.sqlnb",
                "input_params": {}
            },
            output_config={"output_formats": ['NOTEBOOK']},
            poll_interval=10,
        )
        # Task dependencies
        initial_transformation >> data_quality_check >> emr_processing >> redshift_load
    # Instantiate DAG
    customer_etl_dag = customer_etl_pipeline()
  2. Choose File, Save python file, name the file shared/workflows/dags/customer_etl_pipeline.py, and choose Save.

Deploy and run the workflow

Complete the following steps to run the workflow:

  1. On the Build menu, choose Workflows.
  2. Choose the workflow customer_etl_pipeline and choose Run.

scheduled

Running a workflow puts tasks together to orchestrate Amazon SageMaker Unified Studio artifacts. You can view multiple runs for a workflow by navigating to the Workflows page and choosing the name of a workflow from the workflows list table.

To share your workflows with other project members in a workflow environment, refer to Share a code workflow with other project members in an Amazon SageMaker Unified Studio workflow environment.

Monitor and troubleshoot the workflow

After your Airflow workflows are deployed in SageMaker Unified Studio, monitoring becomes essential for maintaining reliable ETL operations. The integrated Amazon MWAA environment provides comprehensive observability into your data pipelines through the familiar Airflow web interface, enhanced with AWS monitoring capabilities. The Amazon MWAA integration with SageMaker Unified Studio offers real-time DAG execution tracking, detailed task logs, and performance metrics to help you quickly identify and resolve pipeline issues. Complete the following steps to monitor the workflow:

  1. On the Build menu, choose Workflows.
  2. Choose the workflow customer_etl_pipeline.
  3. Choose View runs to see all executions.
  4. Choose a specific run to view detailed task status.

workflows-run

For each task, you can view the status (Succeeded, Failed, Running), start and end times, duration, and logs and outputs. The workflow is also visible in the Airflow UI, accessible through the workflow environment, where you can view the DAG graph, monitor task execution in real time, access detailed logs, and view the status.

  1. Go to Workflows and select the workflow named customer_etl_pipeline.
  2. From the Actions menu, choose Open in Airflow UI.

airflow-ui-smus

After the workflow completes successfully, you can query the data product in the query editor.

  • On the Build menu, under Data Analysis & Integration, choose Query editor.
  • Run select * from "dev"."public"."customer_summary"

query-editor

Observe the contents of the customer_summary table, including aggregated customer metrics such as total transactions, total spent, average transaction value, clicks, and customer value scores. This allows verification that the ETL and data quality pipelines loaded and transformed the data correctly.

Clean up

To avoid unnecessary charges, complete the following steps:

  1. Delete a workflow environment.
  2. If you no longer need it, delete the project.
  3. After you delete the project, delete the domain.

Conclusion

This post demonstrated how to build an end-to-end ETL pipeline using SageMaker Unified Studio workflows. We explored the complete development lifecycle, from setting up fundamental AWS infrastructure—including Amazon S3 CORS configuration and IAM permissions—to implementing sophisticated data processing workflows. The solution incorporates AWS Glue for initial data transformation and quality checks, EMR Serverless for advanced processing, and Redshift Serverless for data warehousing, all orchestrated through Airflow DAGs. This approach offers several key benefits: a unified interface that consolidates necessary tools, Python-based workflow flexibility, seamless AWS service integration, collaborative development through Git version control, cost-effective scaling through serverless computing, and comprehensive monitoring tools—all working together to create an efficient and maintainable data pipeline solution.

By using SageMaker Unified Studio workflows, you can accelerate your data pipeline development while maintaining enterprise-grade reliability and scalability. For more information about SageMaker Unified Studio and its capabilities, refer to the Amazon SageMaker Unified Studio documentation.


About the authors

Shubham Kumar

Shubham Kumar

Shubham is an Associate Delivery Consultant at AWS, specializing in big data, data lakes, data governance, as well as search and observability architectures. In his free time, Shubham enjoys traveling, spending quality time with his family, and writing fictional stories.

Shubham Purwar

Shubham Purwar

Shubham is an Analytics Specialist Solution Architect at AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar

Nitin Kumar

Nitin is a Cloud Engineer (ETL) at AWS, specialized in AWS Glue. In his free time, he likes to watch movies and spend time with his family.