Amazon Web Services ブログ

Amazon SageMaker ワークフローによるスケーラブルなエンドツーエンド ETL パイプラインのオーケストレーション

本記事は 2026 年 2 月 9 日 に公開された「Orchestrate end-to-end scalable ETL pipeline with Amazon SageMaker workflows」を翻訳したものです。

Amazon SageMaker Unified Studio は、データエンジニアとデータサイエンティストがエンドツーエンドのデータおよび機械学習 (ML) ワークフローで協働できるワークスペースです。SageMaker Unified Studio は、Amazon Managed Workflows for Apache Airflow (Amazon MWAA) との統合により、複数の AWS サービスにまたがる複雑なデータワークフローをオーケストレーションできます。プロジェクトオーナーは共有環境を作成し、チームメンバーと協力してワークフローを開発・デプロイでき、パイプライン実行も一元的に把握できます。統合されたアプローチでデータパイプラインを一貫して効率的に実行でき、プロセス全体を把握できます。チームはデータおよび ML プロジェクトでスムーズに協働できます。

本記事では、SageMaker Unified Studio ワークフローを使って、コードベースのアプローチで ETL (抽出・変換・ロード) パイプラインを構築・管理する方法を紹介します。Amazon EMRAWS GlueAmazon Redshift、Amazon MWAA などの AWS サービスを使い、単一の統合インターフェースでデータ準備からオーケストレーションまで、データ処理の各工程を扱う方法を示します。

ユースケース例: E コマースプラットフォームの顧客行動分析

ある E コマース (EC) 企業が顧客の取引データを分析し、顧客サマリーレポートを作成したいとします。データは複数のソースから取得されます。

  • CSV ファイルに保存された顧客プロファイルデータ
  • JSON 形式の取引履歴
  • 半構造化ログファイル形式の Web サイトクリックストリームデータ

同社は以下を実施したいと考えています。

  • これらのソースからデータを抽出する。
  • データをクレンジング・変換する。
  • 品質チェックを実施する。
  • 処理済みデータをデータウェアハウスにロードする。
  • パイプラインを毎日実行するようにスケジュールする。

ソリューション概要

以下の図は、本記事で実装するアーキテクチャを示しています。

This architecture diagram illustrates a comprehensive, end-to-end data processing pipeline built on AWS services, orchestrated through Amazon SageMaker Unified Studio. The pipeline demonstrates best practices for data ingestion, transformation, quality validation, advanced processing, and analytics.

ワークフローは以下の手順で構成されます。

  1. Amazon Simple Storage Service (Amazon S3) バケットを作成し、顧客データ、取引履歴、クリックストリームログ用に整理されたフォルダ構造でデータリポジトリを構築し、SageMaker Unified Studio と連携するためのアクセスポリシーを設定します。
  2. AWS Glue ジョブを使って S3 バケットからデータを抽出します。
  3. AWS Glue と Amazon EMR Serverless でデータをクレンジング・変換します。
  4. AWS Glue Data Quality でデータ品質の検証を実施します。
  5. 処理済みデータを Amazon Redshift Serverless にロードします。
  6. Identity Center ベースのドメインで SageMaker Unified Studio を使ってワークフロー環境を作成・管理します。

注: Amazon SageMaker Unified Studio は 2 つのドメイン設定モデルをサポートします。IAM Identity Center (IdC) ベースドメインIAM ロールベースドメインです。IAM ベースドメインはロールドリブンのアクセス管理とビジュアルワークフローを実現しますが、本記事は特に Identity Center ベースドメインにフォーカスします。このドメインでは、ユーザーは IdC 経由で認証し、プロジェクトはプロジェクトロールと ID ベースの認可を使ってデータとリソースにアクセスします。

前提条件

開始前に、以下のリソースを用意してください。

Amazon SageMaker Unified Studio ドメインの設定

本ソリューションでは、us-east-1 AWS リージョンに SageMaker Unified Studio ドメインが必要です。SageMaker Unified Studio は複数のリージョンで利用可能ですが、本記事では一貫性のため us-east-1 を使用します。サポートされるリージョンの一覧は Amazon SageMaker Unified Studio がサポートされるリージョン を参照してください。

ドメインを設定する手順は以下のとおりです。

  1. AWS マネジメントコンソールにサインインし、Amazon SageMaker に移動して、左のナビゲーションペインから Domains セクションを開きます。
  2. SageMaker コンソールで Create domain を選択し、Quick setup を選択します。
  3. 「No VPC has been specifically set up for use with Amazon SageMaker Unified Studio」というメッセージが表示された場合は Create VPC を選択します。AWS CloudFormation スタックにリダイレクトされます。すべての設定をデフォルト値のまま、Create stack を選択します。
  4. Quick setup settingsName にドメイン名 (たとえば etl-ecommerce-blog-demo) を入力します。選択された設定を確認します。
  5. Continue を選択して次に進みます。
  6. Create IAM Identity Center user ページで、SSO ユーザー (IAM Identity Center のアカウント) を作成するか、既存の SSO ユーザーを選択して Amazon SageMaker Unified Studio にログインします。ここで選択した SSO は、Amazon SageMaker Unified Studio で管理者として使われます。
  7. Create domain を選択します。

詳細な手順は SageMaker ドメインの作成Amazon SageMaker Unified Studio でのデータのオンボーディング を参照してください。

# Amazon SageMaker Domain Details Interface This screenshot shows the Amazon SageMaker domain details page for "etl-ecommerce-blog-demo

ドメインを作成すると、「Your domain has been created! You can now log in to Amazon SageMaker Unified Studio」というポップアップが表示されます。いったんこのポップアップは閉じて構いません。

プロジェクトの作成

ビジネスユースケースに対応する協働ワークスペースとなるプロジェクトを作成します。以下の手順を実施します。

  1. Open Unified Studio を選択し、Sign in with SSO オプションで SSO 資格情報を使ってサインインします。
  2. Create project を選択します。
  3. プロジェクト名を入力 (たとえば ETL-Pipeline-Demo) し、All capabilities プロジェクトプロファイルで作成します。
  4. Continue を選択します。
  5. 設定パラメータはデフォルト値のまま、Continue を選択します。
  6. Create project を選択します。

プロジェクトの作成には数分かかる場合があります。作成が完了すると、データアクセスと処理用の環境が設定されます。

S3 バケットと SageMaker Unified Studio の統合

SageMaker Unified Studio で外部データを処理するため、S3 バケットとの統合を設定します。S3 バケットのセットアップ、権限設定、プロジェクトとの統合手順を説明します。

S3 バケットの作成・設定

バケットを作成する手順は以下のとおりです。

  1. 新しいブラウザタブで AWS マネジメントコンソールを開き、S3 を検索します。
  2. Amazon S3 コンソールで Create Bucket を選択します。
  3. ecommerce-raw-layer-bucket-demo-<Account-ID>-us-east-1 という名前でバケットを作成します。詳細な手順は ストレージ用の汎用 Amazon S3 バケットの作成を参照してください。
  4. バケット内に以下のフォルダ構造を作成します。詳細な手順は フォルダの作成 を参照してください。
    • raw/customers/
    • raw/transactions/
    • raw/clickstream/
    • processed/
    • analytics/

サンプルデータのアップロード

顧客行動、取引履歴、Web サイトのやり取りをまとめて分析する典型的なビジネスシナリオを表すサンプル EC データをアップロードします。

raw/customers/customers.csv ファイルには、登録情報を含む顧客プロファイル情報が入っています。分析用の顧客ディメンションを確立するため最初に処理される構造化データです。

customer_id,name,email,registration_date
1,John Doe,john.doe@example.com,2022-01-15
2,Jane Smith,jane.smith@example.com,2022-02-20
3,Robert Johnson,robert.j@example.com,2022-01-30
4,Emily Brown,emily.b@example.com,2022-03-05
5,Michael Wilson,michael.w@example.com,2022-02-10

raw/transactions/transactions.json ファイルには、ネストされた製品配列を持つ購買取引が含まれます。半構造化データをフラット化し、顧客データと結合して購買パターンと顧客生涯価値の分析に使います。

[
{"transaction_id": "t1001", "customer_id": 1, "amount": 125.99, "date": "2023-01-10", "items": ["product1", "product2"]},
{"transaction_id": "t1002", "customer_id": 2, "amount": 89.50, "date": "2023-01-12", "items": ["product3"]},
{"transaction_id": "t1003", "customer_id": 1, "amount": 45.25, "date": "2023-01-15", "items": ["product2"]},
{"transaction_id": "t1004", "customer_id": 3, "amount": 210.75, "date": "2023-01-18", "items": ["product1", "product4", "product5"]},
{"transaction_id": "t1005", "customer_id": 4, "amount": 55.00, "date": "2023-01-20", "items": ["product3", "product6"]}
]

raw/clickstream/clickstream.csv ファイルは、ユーザーの Web サイト操作と行動パターンを記録します。カスタマージャーニーとコンバージョンファネルの分析に使う時系列データです。

timestamp,customer_id,page,action
2023-01-10T10:15:23,1,homepage,view
2023-01-10T10:16:45,1,product_page,view
2023-01-10T10:18:12,1,product_page,add_to_cart
2023-01-10T10:20:30,1,checkout,view
2023-01-10T10:22:15,1,checkout,purchase
2023-01-12T14:30:10,2,homepage,view
2023-01-12T14:32:20,2,product_page,view
2023-01-12T14:35:45,2,product_page,add_to_cart
2023-01-12T14:40:12,2,checkout,view
2023-01-12T14:42:30,2,checkout,purchase

raw

Amazon S3 にファイルをアップロードする詳細な手順は、オブジェクトのアップロード を参照してください。

CORS ポリシーの設定

SageMaker Unified Studio ドメインポータルからのアクセスを許可するため、バケットの Cross-Origin Resource Sharing (CORS) 設定を更新します。

  1. バケットの Permissions タブで、Cross-origin resource sharing (CORS)Edit を選択します。

    permission
  2. 以下の CORS ポリシーを入力し、domainUrl を SageMaker Unified Studio ドメイン URL (たとえば https://<domain-id>.sagemaker.us-east-1.on.aws) に置き換えます。URL は SageMaker Unified Studio コンソールのドメイン詳細ページの上部にあります。
    [
        {
            "AllowedHeaders": [
                "*"
            ],
            "AllowedMethods": [
                "PUT",
                "GET",
                "POST",
                "DELETE",
                "HEAD"
            ],
            "AllowedOrigins": [
                "domainUrl"
            ],
            "ExposeHeaders": [
                "x-amz-version-id"
            ]
        }
    ]

詳細は Amazon S3 データの追加とプロジェクトロールを使用したアクセス を参照してください。

SageMaker プロジェクトロールへの Amazon S3 アクセス付与

SageMaker Unified Studio が外部 Amazon S3 のロケーションにアクセスできるようにするには、対応する AWS Identity and Access Management (IAM) プロジェクトロールに必要な権限を追加する必要があります。以下の手順を実施します。

  1. IAM コンソールで、ナビゲーションペインの Roles を選択します。
  2. プロジェクトロール Amazon Resource Name (ARN) の最後のセグメントでプロジェクトロールを検索します。この情報は、SageMaker Unified Studio の Project overview ページにあります (たとえば datazone_usr_role_1a2b3c45de6789_abcd1efghij2kl)。

    project detail
  3. プロジェクトロールを選択して、ロールの詳細ページを開きます。
  4. Permissions タブで Add permissions を選択し、Create inline policy を選択します。
  5. JSON エディタを使って、Amazon S3 ロケーションへのアクセスをプロジェクトロールに付与するポリシーを作成します。
  6. 以下の JSON ポリシーで、プレースホルダー値を実際の環境の値に置き換えます。
    • <BUCKET_PREFIX> を S3 バケット名のプレフィックス (たとえば ecommerce-raw-layer) に置き換え
    • <AWS_REGION> を AWS Glue Data Quality ルールセットを作成する AWS リージョン (たとえば us-east-1) に置き換え
    • <AWS_ACCOUNT_ID> を AWS アカウント ID に置き換え
  7. 更新した JSON ポリシーを JSON エディタにペーストします。
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ETLBucketListAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation"
                ],
                "Resource": "arn:aws:s3:::<BUCKET_PREFIX>-*"
            },
            {
                "Sid": "ETLObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": "arn:aws:s3:::<BUCKET_PREFIX>-*/*"
            },
            {
                "Sid": "GlueDataQualityPublish",
                "Effect": "Allow",
                "Action": [
                    "glue:PublishDataQuality"
                ],
     "Resource":"arn:aws:glue:<AWS_REGION>:<AWS_ACCOUNT_ID>:dataQualityRuleset/*"
            }
        ]
    }
  8. Next を選択します。
  9. ポリシー名 (たとえば etl-rawlayer-access) を入力し、Create policy を選択します。
  10. 再度 Add permissions を選択し、Create inline policy を選択します。
  11. JSON エディタで、S3 Access Grants を管理する 2 つ目のポリシーを作成します。<BUCKET_PREFIX> を S3 バケット名のプレフィックス (たとえば ecommerce-raw-layer) に置き換えて、以下の JSON ポリシーをペーストします。
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "S3AGLocationManagement",
                "Effect": "Allow",
                "Action": [
                    "s3:CreateAccessGrantsLocation",
                    "s3:DeleteAccessGrantsLocation",
                    "s3:GetAccessGrantsLocation"
                ],
                "Resource": [
                    "arn:aws:s3:*:*:access-grants/default/*"
                ],
                "Condition": {
                    "StringLike": {
                        "s3:accessGrantsLocationScope": "s3://<BUCKET_PREFIX>-*/*"
                    }
                }
            },
            {
                "Sid": "S3AGPermissionManagement",
                "Effect": "Allow",
                "Action": [
                    "s3:CreateAccessGrant",
                    "s3:DeleteAccessGrant"
                ],
                "Resource": [
                    "arn:aws:s3:*:*:access-grants/default/location/*",
                    "arn:aws:s3:*:*:access-grants/default/grant/*"
                ],
                "Condition": {
                    "StringLike": {
                        "s3:accessGrantScope": "s3://<BUCKET_PREFIX>-*/*"
                    }
                }
            }
        ]
    }
  12. Next を選択します。
  13. ポリシー名 (たとえば s3-access-grants-policy) を入力し、Create policy を選択します。

create policy

S3 Access Grants の詳細は Amazon S3 データの追加 を参照してください。

プロジェクトへの S3 バケットの追加

プロジェクトロールに Amazon S3 リソースへのアクセス用ポリシーを追加したら、以下の手順で S3 バケットを SageMaker Unified Studio プロジェクトに統合します。

  1. SageMaker Unified Studio で、Your projects から作成したプロジェクトを開きます。

    your projects
  2. ナビゲーションペインで Data を選択します。
  3. Add を選び、続いて Add S3 location を選択します。

    add s3
  4. S3 ロケーションを設定します。
    1. Name に分かりやすい名前 (たとえば E-commerce_Raw_Data) を入力します。
    2. S3 URI にバケット URI (たとえば s3://ecommerce-raw-layer-bucket-demo-<Account-ID>-us-east-1/) を入力します。
    3. AWS Region にリージョン (本例では us-east-1) を入力します。
    4. Access role ARN は空のままにします。
    5. Add S3 Location を選択します。
  5. 統合が完了するまで待ちます。
  6. プロジェクトのデータカタログに S3 ロケーションが表示されることを確認します (Project overview ページの Data タブで Buckets ペインを開き、バケットとフォルダを確認します)。

add

S3 バケットが SageMaker Unified Studio に接続され、データを分析に使える状態になります。

ジョブスクリプト用ノートブックの作成

データ処理ジョブを作成する前に、データの生成と処理を行うスクリプトを開発するためのノートブックをセットアップします。以下の手順を実施します。

  1. SageMaker Unified Studio の上部メニューで Build の下の JupyterLab を選択します。
  2. Configure Space を選択し、インスタンスタイプ ml.t3.xlarge を選びます。JupyterLab インスタンスに少なくとも 4 vCPU と 4 GiB メモリが確保されます。
  3. Configure and Start Space または Save and Restart を選択して環境を起動します。
  4. インスタンスの準備完了まで少し待ちます。
  5. FileNewNotebook を選び、新しいノートブックを作成します。
  6. Kernel を Python 3、Connection type を PySpark、ComputeProject.spark.compatibility に設定します。

    jupyter
  7. ノートブックに以下のスクリプトを入力します。AWS Glue ジョブで後から使います。このスクリプトは、S3 データレイクの 3 つのソースから生データを処理し、日付を標準化してデータ型を変換し、最適なストレージとクエリのためにクレンジング済みデータを Parquet 形式で保存します。
  8. スクリプト内の <Bucket-Name>実際の S3 バケット名に置き換えます。
    import sys
    from awsglue.transforms import *
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    from pyspark.sql import functions as F
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    # Customers
    customer_df = (
        spark.read
        .option("header", "true")
        .csv("s3://<Bucket-Name>/raw/customers/")
        .withColumn("registration_date", F.to_date("registration_date"))
        .withColumn("processed_at", F.current_timestamp())
    )
    customer_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/customers/"
    )
    # Transactions
    transaction_df = (
        spark.read
        .json("s3://<Bucket-Name>/raw/transactions/")
        .withColumn("date", F.to_date("date"))
        .withColumn("customer_id", F.col("customer_id").cast("int"))
        .withColumn("processed_at", F.current_timestamp())
    )
    transaction_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/transactions/"
    )
    # Clickstream
    clickstream_df = (
        spark.read
        .option("header", "true")
        .csv("s3://<Bucket-Name>/raw/clickstream/")
        .withColumn("customer_id", F.col("customer_id").cast("int"))
        .withColumn("timestamp", F.to_timestamp("timestamp"))
        .withColumn("processed_at", F.current_timestamp())
    )
    clickstream_df.write.mode("overwrite").parquet(
        "s3://<Bucket-Name>/processed/clickstream/"
    )
    print("Data processing completed successfully")
    job.commit()

    Amazon S3 の raw レイヤーにある顧客、取引、クリックストリームのデータを処理し、processed レイヤーに Parquet ファイルとして保存します。

  9. FileSave Notebook As を選び、shared/etl_initial_processing_job.ipynb として保存します。

    jupyter2

AWS Glue Data Quality 用ノートブックの作成

初期データ処理スクリプトを作成したら、次のステップとして AWS Glue でデータ品質チェックを実施するノートブックをセットアップします。データ品質チェックにより、後続処理の前にデータの整合性と完全性を検証できます。以下の手順を実施します。

  1. FileNewNotebook を選び、新しいノートブックを作成します。
  2. Kernel を Python 3、Connection type を PySpark、ComputeProject.spark.compatibility に設定します。

    select-kernel
  3. この新しいノートブックに、AWS Glue の EvaluateDataQuality メソッドを使ったデータ品質チェックスクリプトを追加します。スクリプト内の <Bucket-Name>実際の S3 バケット名に置き換えます。
    from datetime import datetime
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsgluedq.transforms import EvaluateDataQuality
    from awsglue.transforms import SelectFromCollection
    
    # ---------------- Glue setup ----------------
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init("GlueDQJob", {})
    
    # ---------------- Constants ----------------
    RUN_DATE = datetime.utcnow().strftime("%Y-%m-%d")
    year, month, day = RUN_DATE.split("-")
    OUTPUT_PATH = "s3://<Bucket-Name>/data-quality-results"
    
    # ---------------- Tables and Rules ----------------
    tables = {
        "customers": ["s3://<Bucket-Name>/processed/customers/",
                      ["IsComplete \"customer_id\"", "IsUnique \"customer_id\"", "IsComplete \"email\""]],
        "transactions": ["s3://<Bucket-Name>/processed/transactions/",
                         ["IsComplete \"transaction_id\"", "IsUnique \"transaction_id\""]],
        "clickstream": ["s3://<Bucket-Name>/processed/clickstream/",
                        ["IsComplete \"customer_id\"", "IsComplete \"action\""]]
    }
    
    # ---------------- Process Each Table ----------------
    for table, (path, rules) in tables.items():
        df = glueContext.create_dynamic_frame.from_options("s3", {"paths":[path]}, "parquet")
        results = EvaluateDataQuality().process_rows(
            frame=df,
            ruleset=f"Rules = [{', '.join(rules)}]",
            publishing_options={"dataQualityEvaluationContext": table}
        )
        rows = SelectFromCollection.apply(results, key="rowLevelOutcomes", transformation_ctx="rows").toDF()
        rows = rows.drop("DataQualityRulesPass", "DataQualityRulesFail", "DataQualityRulesSkip")
    
        # Write passed/failed rows
        for status, colval in [("pass","Passed"), ("fail","Failed")]:
            tmp = rows.filter(rows.DataQualityEvaluationResult.contains(colval))
            if tmp.count() > 0:
                tmp.write.mode("append").parquet(
            f"{OUTPUT_PATH}/{table}/status=dq_{status}/Year={year}/Month={month}/Date={day}"
                )
    print("Data Quality checks completed and written to S3")
    job.commit()
  4. FileSave Notebook As を選び、shared/etl_data_quality_job.ipynb として保存します。

AWS Glue ジョブの作成とテスト

SageMaker Unified Studio のジョブにより、AWS Glue を使ったスケーラブルで柔軟な ETL パイプラインを実現できます。効率的でガバナンスの効いたデータ変換に向けて、データ処理ジョブの作成とテスト手順を説明します。

初期データ処理ジョブの作成

ETL パイプラインの最初の処理ジョブとして、生の顧客、取引、クリックストリームデータを変換し、クレンジング済みの出力を Parquet 形式で Amazon S3 に書き込みます。ジョブを作成する手順は以下のとおりです。

  1. SageMaker Unified Studio でプロジェクトを開きます。
  2. 上部メニューで Build を選択し、Data Analysis & Integration の下の Data processing jobs を選択します。

    navbar on smus
  3. Create job from notebooks を選択します。
  4. Choose project files の下の Browse files を選択します。
  5. etl_initial_processing_job.ipynb (先ほど JupyterLab で保存したノートブック) を見つけて選択し、Select、続いて Next を選択します。

    select the notebook
  6. ジョブ設定を構成します。
    1. Name に名前 (たとえば job-1) を入力します。
    2. Description に説明 (たとえば Initial ETL job for customer data processing) を入力します。
    3. IAM Role でプロジェクトロール (デフォルト) を選択します。
    4. TypeSpark を選択します。
    5. AWS Glue version はバージョン 5.0 を使用します。
    6. LanguagePython を選択します。
    7. Worker type は G.1X を使用します。
    8. Number of Instances は 10 に設定します。
    9. Number of retries は 0 に設定します。
    10. Job timeout は 480 に設定します。
    11. Compute connectionproject.spark.compatibility を選択します。
    12. Advanced settingsContinuous logging をオンにします。

    advanced setting

  7. 残りの設定はデフォルトのまま、Submit を選択します。

ジョブが作成されると、job-1 が正常に作成されたことを示す確認メッセージが表示されます。

AWS Glue Data Quality ジョブの作成

変換済みデータセットに対して AWS Glue Data Quality によるデータ品質チェックを実行します。ルールセットで主要フィールドの完全性と一意性を検証します。ジョブを作成する手順は以下のとおりです。

  1. SageMaker Unified Studio でプロジェクトを開きます。
  2. 上部メニューで Build を選び、Data Analysis & Integration の下の Data processing jobs を選択します。
  3. Create jobCode-based jobCreate job from files を選択します。
  4. Choose project files の下の Browse files を選択します。
  5. etl_glue_data_quality.ipynb を見つけて選択し、Select、続いて Next を選択します。
  6. ジョブ設定を構成します。
  7. Name に名前 (たとえば job-2) を入力します。
  8. Description に説明 (たとえば Data quality checks using AWS Glue Data Quality) を入力します。
  9. IAM Role でプロジェクトロールを選択します。
  10. TypeSpark を選択します。
  11. AWS Glue version はバージョン 5.0 を使用します。
  12. LanguagePython を選択します。
  13. Worker type は G.1X を使用します。
  14. Number of Instances は 10 に設定します。
  15. Number of retries は 0 に設定します。
  16. Job timeout は 480 に設定します。
  17. Compute connectionproject.spark.compatibility を選択します。
  18. Advanced settingsContinuous logging をオンにします。
  19. 残りの設定はデフォルトのまま、Submit を選択します。

ジョブが作成されると、job-2 が正常に作成されたことを示す確認メッセージが表示されます。

AWS Glue ジョブのテスト

両方のジョブをテストして、正常に実行されることを確認します。

  1. SageMaker Unified Studio でプロジェクトを開きます。
  2. 上部メニューで Build を選び、Data Analysis & Integration の下の Data processing jobs を選択します。
  3. job-1 を選び、Run job を選択します。
  4. ジョブの実行状況を監視し、正常に完了することを確認します。
  5. 同様に、job-2 を選び、Run job を選択します。
  6. ジョブの実行状況を監視し、正常に完了することを確認します。

EMR Serverless コンピュートの追加

ETL パイプラインでは、大規模データセットに対する計算負荷の高い変換と集計に EMR Serverless を使います。ワークロードに応じてリソースが自動スケールし、運用がシンプルなまま高いパフォーマンスを発揮します。EMR Serverless を SageMaker Unified Studio と統合することで、サーバーレス環境で Jupyter ノートブックから Spark ジョブを対話的に実行できます。

SageMaker Studio 内で EMR Serverless コンピュートを設定し、分散データ処理ジョブの実行に使う手順を説明します。

SageMaker Unified Studio での EMR Serverless の設定

プロジェクトで EMR Serverless を使って処理を行うには、以下の手順を実施します。

  1. Project Overview のナビゲーションペインで Compute を選択します。
  2. Data processing タブで Add computeCreate new compute resources を選択します。
  3. EMR Serverless を選び、Next を選択します。

  4. EMR Serverless の設定を行います。
  5. Compute name に名前 (たとえば etl-emr-serverless) を入力します。
  6. Description に説明 (たとえば EMR Serverless for advanced data processing) を入力します。
  7. Release labelemr-7.8.0 を選択します。
  8. Permission modeCompatibility を選択します。
  9. Add Compute を選択してセットアップを完了します。

設定が完了すると、EMR Serverless コンピュートはデプロイステータス Active で一覧に表示されます。

emr serverless

EMR Serverless でのノートブックの作成と実行

EMR Serverless コンピュートを作成したら、Jupyter ノートブックで PySpark ベースのデータ変換ジョブを実行し、大規模データ変換を行えます。クレンジング済みの顧客、取引、クリックストリームデータセットを Amazon S3 から読み込み、集計とスコアリングを行い、最終的な分析出力を Parquet と CSV の両形式で Amazon S3 に書き戻します。EMR Serverless 処理用ノートブックを作成する手順は以下のとおりです。

  1. 上部メニューで Build の下の JupyterLab を選択します。
  2. FileNewNotebook を選択します。
  3. Kernel を Python 3、Connection type を PySpark、Computeemr-s.etl-emr-serverless に設定します。

    compute
  4. EMR Serverless でデータ変換ジョブを実行するため、以下の PySpark スクリプトを入力します。S3 バケット名を指定します。
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.appName("CustomerAnalytics").getOrCreate()
    
    customers = spark.read.parquet("s3://<bucket-name>/processed/customers/")
    transactions = spark.read.parquet("s3://<bucket-name>/processed/transactions/")
    clickstream = spark.read.parquet("s3://<bucket-name>/processed/clickstream/")
    
    customer_spending = transactions.groupBy("customer_id").agg(
        F.count("transaction_id").alias("total_transactions"),
        F.sum("amount").alias("total_spent"),
        F.avg("amount").alias("avg_transaction_value"),
        F.datediff(F.current_date(), F.max("date")).alias("days_since_last_purchase")
    )
    
    customer_engagement = clickstream.groupBy("customer_id").agg(
        F.count("*").alias("total_clicks"),
        F.countDistinct("page").alias("unique_pages_visited"),
        F.count(F.when(F.col("action") == "purchase", 1)).alias("purchase_actions"),
        F.count(F.when(F.col("action") == "add_to_cart", 1)).alias("add_to_cart_actions")
    )
    
    customer_analytics = customers.join(customer_spending, on="customer_id", how="left").join(
        customer_engagement, on="customer_id", how="left")
    
    customer_analytics = customer_analytics.na.fill(0, [
        "total_transactions", "total_spent", "total_clicks", 
        "unique_pages_visited", "purchase_actions", "add_to_cart_actions"
    ])
    
    customer_analytics = customer_analytics.withColumn(
        "customer_value_score",
        (F.col("total_spent") * 0.5) + (F.col("total_transactions") * 0.3) + (F.col("purchase_actions") * 0.2)
    )
    
    customer_analytics.write.mode("overwrite").parquet("s3://<bucket-name>/analytics/customer_analytics/")
    
    customer_summary = customer_analytics.select(
        "customer_id", "name", "email", "registration_date", 
        "total_transactions", "total_spent", "avg_transaction_value",
        "days_since_last_purchase", "total_clicks", "purchase_actions",
        "customer_value_score"
    )
    
    customer_summary.write.mode("overwrite").option("header", "true").csv("s3://<bucket-name>/analytics/customer_summary/")
    
    print("EMR processing completed successfully")
  5. FileSave Notebook As を選び、shared/emr_data_transformation_job.ipynb として保存します。
  6. Run Cell を選択してスクリプトを実行します。
  7. スクリプトの実行状況を監視し、正常に完了することを確認します。
  8. Spark ジョブの実行状況を監視し、エラーなく完了することを確認します。

emr run

Redshift Serverless コンピュートの追加

Redshift Serverless により、インフラを管理せずにデータウェアハウスワークロードを実行・スケールできます。Amazon S3 からデータをクエリしたり、中央ウェアハウスに統合したりする分析ユースケースに適しています。本ステップでは、パイプラインの前段階で生成された処理済みの顧客分析データをロード・クエリするため、Redshift Serverless をプロジェクトに追加します。Redshift Serverless の詳細は Amazon Redshift Serverless を参照してください。

SageMaker Unified Studio での Redshift Serverless コンピュートのセットアップ

Redshift Serverless コンピュートをセットアップする手順は以下のとおりです。

  1. SageMaker Unified Studio のプロジェクトワークスペース (ETL-Pipeline-Demo) で Compute タブを選択します。
  2. SQL analytics タブで Add compute、続いて Create new compute resources を選択し、コンピュート環境の設定を開始します。
  3. Amazon Redshift Serverless を選択します。
  4. 以下を設定します。
    1. Compute name に名前 (たとえば ecommerce_data_warehouse) を入力します。
    2. Description に説明 (たとえば Redshift Serverless for data warehouse) を入力します。
    3. Workgroup name に名前 (たとえば redshift-serverless-workgroup) を入力します。
    4. Maximum capacity は 512 RPU に設定します。
    5. Database namedev を入力します。
  5. Add Compute を選択して Redshift Serverless リソースを作成します。

    Redshift

コンピュートを作成したら、Amazon Redshift 接続をテストできます。

  1. Data warehouse タブで、redshift.ecommerce_data_warehouse が表示されていることを確認します。

    compute-redshift
  2. コンピュート redshift.ecommerce_data_warehouse を選択します。
  3. Permissions タブで IAM ロール ARN をコピーします。次のステップで Redshift の COPY コマンドに使います。

    iam-role

クエリブックの作成・実行による Amazon Redshift へのデータロード

本ステップでは、処理済みの顧客サマリーデータを Amazon S3 から Redshift テーブルにロードする SQL スクリプトを作成します。顧客セグメンテーション、生涯価値の算出、マーケティングキャンペーン向けの分析が可能になります。以下の手順を実施します。

  1. Build メニューの Data Analysis & Integration の下で Query editor を選択します。
  2. クエリブックに以下の SQL を入力し、public スキーマに customer_summary テーブルを作成します。
    -- Create customer_summary table in public schema
    CREATE TABLE IF NOT EXISTS public.customer_summary (
        customer_id INT PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100),
        registration_date DATE,
        total_transactions INT,
        total_spent DECIMAL(10, 2),
        avg_transaction_value DECIMAL(10, 2),
        days_since_last_purchase INT,
        total_clicks INT,
        purchase_actions INT,
        customer_value_score DECIMAL(10, 2)
    );
  3. Add SQL を選択して新しい SQL スクリプトを追加します。
  4. クエリブックに以下の SQL を入力します。
    TRUNCATE TABLE customer_summary;

    注: COPY コマンドを実行する前に、既存のレコードを削除し、S3 からの最新の集計データを重複なくクリーンに再ロードするため、customer_summary テーブルを TRUNCATE します。

  5. Add SQL を選択して新しい SQL スクリプトを追加します。
  6. 以下の SQL を入力し、S3 バケットから Redshift Serverless にデータをロードします。S3 バケット名と Amazon Redshift 用の IAM ロール ARN を指定します。
    -- Load data from S3 (replace with your bucket name and IAM role)
    COPY public.customer_summary FROM 's3://<bucket-name>/analytics/customer_summary/'
    IAM_ROLE 'arn:aws:iam::<Account-ID>:role/<your-redshift-role>'
    FORMAT AS CSV
    IGNOREHEADER 1
    REGION 'us-east-1';
  7. Query Editor で以下を設定します。
    1. Connection: redshift.ecommerce_data_warehouse
    2. Database: dev
    3. Schema: public

    query

  8. Choose を選択して接続設定を適用します。
  9. 各セルで Run Cell を選択し、public スキーマに customer_summary テーブルを作成してから、Amazon S3 からデータをロードします。
  10. ActionsSave を選び、クエリブック名を final_data_product として Save changes を選択します。

これで、クエリブックを使った Redshift データプロダクトの作成と実行は完了です。

ワークフロー環境の作成と管理

共有ワークフロー環境の作成と、SageMaker Unified Studio 内の Apache Airflow を使って顧客データパイプラインを自動化するコードベースのワークフローの定義方法を説明します。共有環境でプロジェクトメンバー間の協働とワークフローの一元管理を実現できます。

ワークフロー環境の作成

ワークフロー環境はプロジェクトオーナーが作成する必要があります。作成後、プロジェクトメンバーはワークフローを同期して利用できます。ワークフロー環境の更新・削除はプロジェクトオーナーのみが行えます。ワークフロー環境を作成する手順は以下のとおりです。

  1. プロジェクトの Compute を選択します。
  2. Workflow environments タブで Create を選択します。
  3. 設定パラメータを確認し、Create workflow environment を選択します。
  4. 環境のプロビジョニングが完了するまで待ちます。プロビジョニングには約 20 分かかります。

workflow

コードベースのワークフローの作成

ワークフロー環境の準備ができたら、Airflow を使ってコードベースの ETL パイプラインを定義します。AWS Glue、EMR Serverless、Redshift Serverless などのサービスにまたがる日次処理タスクを自動化するパイプラインを定義します。

  1. Build メニューの Orchestration の下で Workflows を選択します。
  2. Create new workflow を選び、Create workflow in code editor を選択します。
  3. Configure Space を選択し、インスタンスタイプ ml.t3.xlarge を選びます。JupyterLab インスタンスに少なくとも 4 vCPU と 4 GiB メモリが確保されます。
  4. Configure and Restart Space を選択して環境を起動します。

sample_dag

以下のスクリプトは、複数のアクションを自動化する日次スケジュールの ETL ワークフローを定義します。

  • AWS Glue による初期データ変換
  • AWS Glue (EvaluateDataQuality) によるデータ品質検証
  • Jupyter ノートブックを使った EMR Serverless での高度なデータ処理
  • クエリブックから Redshift Serverless への変換結果のロード
  1. デフォルトの DAG テンプレートを以下の定義に置き換えます。ジョブ名と入力パスは、プロジェクトで実際に使っているものに合わせてください。
    from datetime import datetime
    from airflow import DAG
    from airflow.decorators import dag
    from airflow.utils.dates import days_ago
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    from workflows.airflow.providers.amazon.aws.operators.sagemaker_workflows import NotebookOperator
    from sagemaker_studio import Project
    # Get SageMaker Studio project IAM role
    project = Project()
    default_args = {
        'owner': 'data_engineer',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1
    }
    @dag(
        dag_id='customer_etl_pipeline',
        default_args=default_args,
        schedule_interval='@daily',
        start_date=days_ago(1),
        is_paused_upon_creation=False,
        tags=['etl', 'customer-analytics'],
        catchup=False
    )
    def customer_etl_pipeline():
        # Step 1: Initial data transformation using Glue
        initial_transformation = GlueJobOperator(
            task_id='initial_transformation',
            job_name='job-1',
            iam_role_arn=project.iam_role,
        )
        # Step 2: Data quality checks using Glue DQ
        data_quality_check = GlueJobOperator(
            task_id='data_quality_check',
            job_name='job-6',
            iam_role_arn=project.iam_role,
        )
        # Step 3: EMR Serverless notebook processing
        emr_processing = NotebookOperator(
            task_id='emr_processing',
            input_config={
                "input_path": "emr_data_transformation_job.ipynb",
                "input_params": {}
            },
            output_config={"output_formats": ['NOTEBOOK']},
            poll_interval=10,
        )
        # Step 4: Load to Redshift notebook
        redshift_load = NotebookOperator(
            task_id='redshift_load',
            input_config={
                "input_path": "final_data_product.sqlnb",
                "input_params": {}
            },
            output_config={"output_formats": ['NOTEBOOK']},
            poll_interval=10,
        )
        # Task dependencies
        initial_transformation >> data_quality_check >> emr_processing >> redshift_load
    # Instantiate DAG
    customer_etl_dag = customer_etl_pipeline()
  2. FileSave python file を選び、ファイル名を shared/workflows/dags/customer_etl_pipeline.py として Save を選択します。

ワークフローのデプロイと実行

ワークフローを実行する手順は以下のとおりです。

  1. Build メニューで Workflows を選択します。
  2. ワークフロー customer_etl_pipeline を選び、Run を選択します。

scheduled

ワークフローを実行すると、Amazon SageMaker Unified Studio のアーティファクトをオーケストレーションするタスクがまとめて実行されます。Workflows ページに移動し、ワークフロー一覧テーブルでワークフロー名を選択すると、ワークフローの複数回の実行履歴を確認できます。

ワークフロー環境でワークフローを他のプロジェクトメンバーと共有するには、Amazon SageMaker Unified Studio ワークフロー環境で他のプロジェクトメンバーとコードワークフローを共有する を参照してください。

ワークフローの監視とトラブルシューティング

SageMaker Unified Studio にデプロイした Airflow ワークフローを信頼性の高い ETL 運用として維持するには、監視が欠かせません。統合された Amazon MWAA 環境は、使い慣れた Airflow Web インターフェースを通じてデータパイプラインの状態を把握でき、AWS の監視機能で強化されています。Amazon MWAA と SageMaker Unified Studio の統合により、DAG 実行のリアルタイム追跡、タスクの詳細ログ、パフォーマンス指標を確認でき、パイプラインの問題を素早く特定・解決できます。ワークフローを監視する手順は以下のとおりです。

  1. Build メニューで Workflows を選択します。
  2. ワークフロー customer_etl_pipeline を選択します。
  3. View runs を選び、すべての実行を確認します。
  4. 特定の実行を選択して、タスクの詳細な状態を確認します。

workflows-run

タスクごとに、状態 (SucceededFailedRunning)、開始・終了時刻、実行時間、ログと出力を確認できます。ワークフローは Airflow UI でも確認できます。ワークフロー環境からアクセスでき、DAG グラフの表示、タスク実行のリアルタイム監視、詳細ログへのアクセス、状態の確認が可能です。

  1. Workflows に移動し、customer_etl_pipeline という名前のワークフローを選択します。
  2. Actions メニューで Open in Airflow UI を選択します。

airflow-ui-smus

ワークフローが正常に完了したら、クエリエディタでデータプロダクトをクエリできます。

  • Build メニューの Data Analysis & Integration の下で Query editor を選択します。
  • select * from "dev"."public"."customer_summary" を実行します。

query-editor

customer_summary テーブルの内容を確認します。総取引数、総利用額、平均取引額、クリック数、顧客価値スコアなどの集計された顧客指標が含まれます。ETL とデータ品質パイプラインがデータを正しくロード・変換したかを検証できます。

クリーンアップ

不要な料金を避けるため、以下の手順を実施してください。

  1. ワークフロー環境を削除します。
  2. 不要になった場合は、プロジェクトを削除します。
  3. プロジェクトを削除したあと、ドメインを削除します。

まとめ

本記事では、SageMaker Unified Studio ワークフローを使ってエンドツーエンドの ETL パイプラインを構築する方法を紹介しました。Amazon S3 の CORS 設定や IAM 権限など基礎的な AWS インフラのセットアップから、高度なデータ処理ワークフローの実装まで、開発ライフサイクル全体を通して解説しました。本ソリューションでは、初期データ変換と品質チェックに AWS Glue、高度な処理に EMR Serverless、データウェアハウジングに Redshift Serverless を組み合わせ、すべてを Airflow DAG でオーケストレーションしています。このアプローチは、必要なツールを集約する統一インターフェース、Python ベースのワークフローの柔軟性、AWS サービスのシームレスな統合、Git バージョン管理による協働開発、サーバーレス計算によるコスト効率の高いスケーリング、包括的な監視ツールなど、複数のメリットをもたらします。効率的で保守しやすいデータパイプラインを実現します。

SageMaker Unified Studio ワークフローを使うことで、エンタープライズレベルの信頼性とスケーラビリティを保ちつつ、データパイプライン開発を加速できます。SageMaker Unified Studio とその機能の詳細は Amazon SageMaker Unified Studio のドキュメントを参照してください。

著者について

Shubham Kumar

Shubham Kumar

Shubham は、AWS のアソシエイトデリバリーコンサルタントで、ビッグデータ、データレイク、データガバナンス、検索・監視アーキテクチャを専門としています。仕事以外の時間は、旅行や家族との時間、フィクション小説の執筆を楽しんでいます。

Shubham Purwar

Shubham Purwar

Shubham は、AWS の Cloud Engineer (ETL) で、AWS Glue を専門としています。仕事以外の時間は、映画鑑賞や家族との時間を楽しんでいます。

Nitin Kumar

Nitin Kumar

Nitin は、AWS のアナリティクススペシャリストソリューションアーキテクトです。仕事以外の時間は、家族との時間や世界各地への旅行を楽しんでいます。


この記事は Kiro が翻訳を担当し、Solutions Architect の Woosuk Choi がレビューしました。