Amazon Web Services ブログ

Amazon MWAA Serverless の紹介

本記事は、2025 年 11 月 17 日に公開された Introducing Amazon MWAA Serverless を翻訳したものです。翻訳はクラウドサポートエンジニアの山本が担当しました。

本日、AWS は Amazon Managed Workflows for Apache Airflow (MWAA) Serverless の提供を発表しました。これは MWAA の新しいデプロイメントオプションで、Apache Airflow 環境の運用オーバーヘッドを排除しながら、サーバーレススケーリングによってコスト最適化を実現します。この新しいサービスは、データエンジニアと DevOps チームがワークフローのオーケストレーションで直面する主な課題、つまり運用スケーラビリティ、コスト最適化、アクセス管理を解決します。

MWAA Serverless では、プロビジョニングされた容量を監視するのではなく、ワークフローロジックに集中できます。Airflow ワークフローをスケジュール実行またはオンデマンド実行で送信でき、実際に各タスクが実行されたコンピュート時間分のみ支払います。サービスが自動的にすべてのインフラストラクチャをスケーリングするため、負荷に関係なくワークフローを効率的に実行できます。

運用の簡素化に加えて、MWAA Serverless は AWS Identity and Access Management (IAM) による細粒度のアクセス制御を実現する新しいセキュリティモデルを導入しました。各ワークフローに独自の IAM 権限を設定でき、お客様の VPC 上で各タスクを実行できるため、個別の Airflow 環境を作成することなく、正確なセキュリティ制御を実装できます。このアプローチにより、セキュリティ管理のオーバーヘッドを大幅に削減しつつ、セキュリティ体制を強化できます。

この記事では、MWAA Serverless を使用してスケーラブルなワークフロー自動化ソリューションを構築およびデプロイする方法を紹介します。ワークフローの作成とデプロイ、Amazon CloudWatch を使用した監視の設定、既存の Apache Airflow DAGs (Directed Acyclic Graphs) をサーバーレス形式に変換する実践的な例を紹介します。また、サーバーレスワークフローを管理するためのベストプラクティスを探り、監視とログ記録の実装方法を紹介します。

MWAA Serverless の仕組み

MWAA Serverless は、ワークフロー定義を処理し、サービス管理の Airflow 環境で効率的に実行し、ワークフローの需要に基づいてリソースを自動的にスケーリングします。MWAA Serverless は、Amazon Elastic Container Service (Amazon ECS) エグゼキュータを使用して、各個別のタスクを独自の ECS Fargate コンテナ上で実行します。これは、お客様の VPC またはサービス管理の VPC のいずれかで行われます。それらのコンテナは、Airflow 3 Task API を使用して、割り当てられた Airflow クラスターと通信します。


図 1: Amazon MWAA のアーキテクチャ

MWAA Serverless は、タスクの分離によってセキュリティを強化するため、人気のあるオープンソース DAG Factory 形式に基づく宣言型の YAML 構成ファイルを使用します。これらのワークフロー定義を作成するには、2 つの選択肢があります。

この宣言的アプローチには 2 つの主な利点があります。まず、MWAA Serverless がワークフロー定義を YAML から読み取るため、ワークフローコードを実行せずにタスクのスケジューリングを決定できます。次に、MWAA Serverless はタスクが実行されるときにのみ実行権限を付与できるため、ワークフロー全体に広範な権限を必要としません。その結果、タスクの権限範囲が正確に限定され、時間制限される、より安全な環境を実現できます。

MWAA Serverless のサービス検討事項

MWAA Serverless には、サーバーレスとプロビジョニングされた MWAA デプロイメントを選択する際に考慮すべき、以下の制限があります:

  • オペレーターサポート
    • MWAA Serverless は、Amazon Provider Package のオペレーターのみをサポートしています。
    • カスタムコードやスクリプトを実行するには、以下のような AWS サービスを使用する必要があります。
  • ユーザーインターフェース
    • MWAA Serverless は Airflow UI を使用せずに動作します。
    • ワークフローの監視と管理には、Amazon CloudWatchAWS CloudTrail との統合を提供しています。

MWAA Serverless の利用

MWAA Serverless を使用するには、以下の前提条件とステップを完了してください。

前提条件

始める前に、次の要件が満たされていることを確認してください:

  • アクセスと権限
    • AWS アカウント
    • バージョン 2.31.38 以降の AWS Command Line Interface (AWS CLI) をインストール済み
    • IAMロールとポリシーを作成・変更するための適切な権限が必要です。これには以下の IAM 権限が含まれます:
      • airflow-serverless:CreateWorkflow
      • airflow-serverless:DeleteWorkflow
      • airflow-serverless:GetTaskInstance
      • airflow-serverless:GetWorkflowRun
      • airflow-serverless:ListTaskInstances
      • airflow-serverless:ListWorkflowRuns
      • airflow-serverless:ListWorkflows
      • airflow-serverless:StartWorkflowRun
      • airflow-serverless:UpdateWorkflow
      • iam:CreateRole
      • iam:DeleteRole
      • iam:DeleteRolePolicy
      • iam:GetRole
      • iam:PutRolePolicy
      • iam:UpdateAssumeRolePolicy
      • logs:CreateLogGroup
      • logs:CreateLogStream
      • logs:PutLogEvents
      • airflow:GetEnvironment
      • airflow:ListEnvironments
      • s3:DeleteObject
      • s3:GetObject
      • s3:ListBucket
      • s3:PutObject
      • s3:Sync
    • インターネット接続可能な Amazon Virtual Private Cloud (VPC) へのアクセス
  • 必要な AWS サービス – MWAA Serverless に加えて、以下の AWS サービスへのアクセス権が必要です:
    • 既存の Airflow 環境にアクセスするための Amazon MWAA
    • ログを表示するための Amazon CloudWatch
    • DAG と YAML ファイル管理のための Amazon S3
    • 権限制御のための AWS IAM
  • 開発環境
    • Python 3.12 以降がインストール済み
    • ワークフロー定義を保存するための Amazon Simple Storage Service (S3) バケット
    • YAML ファイル編集用のテキストエディタまたは IDE
  • その他の要件
    • Apache Airflow の概念に関する基本的な知識
    • YAML 構文の理解
    • AWS CLI コマンドの知識

注意: この記事を通して、お客様自身の値に置き換える必要があるサンプルの値を使用しています:

  • amzn-s3-demo-bucket をお客様の S3 バケット名に置き換えてください
  • 111122223333 をお客様の AWS アカウント番号に置き換えてください
  • us-east-2 をお客様が利用する AWS リージョンに置き換えてください。MWAA Serverless は複数の AWS リージョンで利用可能です。現在の提供状況については、リージョン別の AWS サービス一覧を確認してください。

サーバーレスワークフローの初期作成

まず、S3 オブジェクトのリストを取得し、そのリストを同じバケットのファイルに書き込む簡単なワークフローを定義しましょう。simple_s3_test.yaml という新しいファイルを次の内容で作成してください:

simples3test:
  dag_id: simples3test 
  schedule: 0 0 * * *
  tasks:
    list_objects:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator 
      bucket: 'amzn-s3-demo-bucket'
      prefix: ''
      retries: 0 
    create_object_list:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator 
      data: '{{ ti.xcom_pull(task_ids="list_objects", key="return_value") }}'
      s3_bucket: 'amzn-s3-demo-bucket'
      s3_key: 'filelist.txt'
      dependencies: [list_objects]

このワークフローを実行するには、上記のバケットを一覧表示し、書き込む権限を持つ実行ロールを作成する必要があります。このロールは、MWAA Serverless から引き受けられる必要もあります。以下の AWS CLI コマンドで、このロールとそれに関連するポリシーを作成します。

aws iam create-role \ 
--role-name mwaa-serverless-access-role \ 
--assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [ 
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [ 
            "airflow-serverless.amazonaws.com"
          ] 
        },
        "Action": "sts:AssumeRole"
      },
      {
        "Sid": "AllowAirflowServerlessAssumeRole",
        "Effect": "Allow",
        "Principal": {
          "Service": "airflow-serverless.amazonaws.com"
        },
        "Action": "sts:AssumeRole",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "${aws:PrincipalAccount}"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*"
          }
        }
      }
    ] 
  }'

 aws iam put-role-policy \ 
  --role-name mwaa-serverless-access-role \ 
  --policy-name mwaa-serverless-policy   \ 
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [ 
        {
            "Sid": "CloudWatchLogsAccess",
            "Effect": "Allow",
            "Action": [ 
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
             ],
            "Resource": "*"
        },
        {
            "Sid": "S3DataAccess",
            "Effect": "Allow",
            "Action": [ 
                "s3:ListBucket",
                "s3:GetObject",
                "s3:PutObject"
             ],
            "Resource": [ 
                "arn:aws:s3:::amzn-s3-demo-bucket",
                "arn:aws:s3:::amzn-s3-demo-bucket/*"
             ] 
        }
     ] 
}'

その後、YAML DAG を同じ S3 バケットにコピーし、上記の AWS CLI コマンドの実行結果に含まれる ARN に基づいてワークフローを作成します。

aws s3 cp "simple_s3_test.yaml" \ 
s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml 

aws mwaa-serverless create-workflow \ 
--name simple_s3_test \ 
--definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' \ 
--role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role \ 
--region us-east-2

最後のワークフローを作成する AWS CLI コマンドの出力は WorkflowARN の値を返し、この値を使用してワークフローを実行します:

aws mwaa-serverless start-workflow-run \ 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ 
--region us-east-2

出力には RunId の値が返され、その値を使用して実行したばかりのワークフローの実行状況を確認できます。

aws mwaa-serverless get-workflow-run \ 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ 
--run-id ABC123456789def \ 
--region us-east-2

YAML を変更する必要がある場合は、S3 にコピーし直して update-workflow コマンドを実行できます。

aws s3 cp "simple_s3_test.yaml" \ 
s3://amzn-s3-demo-bucket/yaml/simple_s3_test.yaml 

 aws mwaa-serverless update-workflow \ 
--workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ 
--definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml/simple_s3_test.yaml" }' \ 
--role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role \ 
--region us-east-2

Python DAGs を YAML 形式へ変換

AWS は、オープンソースの Airflow DAG プロセッサを使用して Python DAG を YAML DAG ファクトリ形式にシリアル化する変換ツールを公開しています。インストールするには、次のコマンドを実行します。

pip3 install python-to-yaml-dag-converter-mwaa-serverless
dag-converter convert source_dag.py --output output_yaml_folder

例えば、次の DAG を作成し、create_s3_objects.py という名前を付けます:

from datetime import datetime 
from airflow import DAG 
from airflow.models.param import Param 
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator 

default_args = {
   'start_date': datetime(2024, 1, 1),
   'retries': 0,
}

dag = DAG(
   'create_s3_objects',
   default_args=default_args,
   description='Create multiple S3 objects in a loop',
   schedule=None
)

# Set number of files to create 
LOOP_COUNT = 3 
s3_bucket = 'md-workflows-mwaa-bucket'
s3_prefix = 'test-files'

# Create multiple S3 objects using loop 
last_task=None 
for i in range(1, LOOP_COUNT + 1):  
   create_object = S3CreateObjectOperator(
       task_id=f'create_object_{i}',
       s3_bucket=s3_bucket,
       s3_key=f'{s3_prefix}/{i}.txt',
       data='{{ ds_nodash }}-{{ ts_nodash | lower }}',
       replace=True,
       dag=dag 
   )
   if last_task:
       last_task >> create_object 
   last_task = create_object

python-to-yaml-dag-converter-mwaa-serverless をインストールしたら、次のコマンドを実行します。

dag-converter convert "/path_to/create_s3_objects.py" --output "/path_to/yaml/"

出力は次のようになります:

YAML validation successful, no errors found

YAML written to /path_to/yaml/create_s3_objects.yaml

実行結果の YAML は次のようになります。

create_s3_objects:
  dag_id: create_s3_objects 
  params: {}
  default_args:
    start_date: '2024-01-01'
    retries: 0 
  schedule: None 
  tasks:
    create_object_1:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator 
      aws_conn_id: aws_default 
      data: '{{ ds_nodash }}-{{ ts_nodash | lower }}'
      encrypt: false 
      outlets: [] 
      params: {}
      priority_weight: 1 
      replace: true 
      retries: 0 
      retry_delay: 300.0 
      retry_exponential_backoff: false 
      s3_bucket: md-workflows-mwaa-bucket 
      s3_key: test-files/1.txt 
      task_id: create_object_1 
      trigger_rule: all_success 
      wait_for_downstream: false 
      dependencies: [] 
    create_object_2:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator 
      aws_conn_id: aws_default 
      data: '{{ ds_nodash }}-{{ ts_nodash | lower }}'
      encrypt: false 
      outlets: [] 
      params: {}
      priority_weight: 1 
      replace: true 
      retries: 0 
      retry_delay: 300.0 
      retry_exponential_backoff: false 
      s3_bucket: md-workflows-mwaa-bucket 
      s3_key: test-files/2.txt 
      task_id: create_object_2 
      trigger_rule: all_success 
      wait_for_downstream: false 
      dependencies: [create_object_1] 
    create_object_3:
      operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator 
      aws_conn_id: aws_default 
      data: '{{ ds_nodash }}-{{ ts_nodash | lower }}'
      encrypt: false 
      outlets: [] 
      params: {}
      priority_weight: 1 
      replace: true 
      retries: 0 
      retry_delay: 300.0 
      retry_exponential_backoff: false 
      s3_bucket: md-workflows-mwaa-bucket 
      s3_key: test-files/3.txt 
      task_id: create_object_3 
      trigger_rule: all_success 
      wait_for_downstream: false 
      dependencies: [create_object_2] 
  catchup: false 
  description: Create multiple S3 objects in a loop 
  max_active_runs: 16 
  max_active_tasks: 16 
  max_consecutive_failed_dag_runs: 0

DAG の解析後に YAML 変換が行われるため、DAG 内のタスクを作成する for 文が最初に実行され、結果の静的な3つのタスクリストがその依存関係とともに YAML ドキュメントに書き込まれることに注意してください。

MWAA 環境の DAG を MWAA Serverless に移行する

プロビジョニングされた MWAA 環境を活用して、ワークフローを開発およびテストした後、サーバーレスで効率的にスケールして実行できます。さらに、MWAA 環境が互換性のある MWAA Serverless オペレーターを使用している場合は、その環境のすべての DAG を一度に変換できます。最初のステップは、MWAA Serverless が MWAA Execution ロールを信頼関係を介して引き受けられるようにすることです。これは MWAA Execution ロールごとに 1 回限りの操作で、IAM コンソールで手動で実行するか、または次の AWS CLI コマンドを使用して実行できます。

MWAA_ENVIRONMENT_NAME="MyAirflowEnvironment"
MWAA_REGION=us-east-2 

MWAA_EXECUTION_ROLE_ARN=$(aws mwaa get-environment --region $MWAA_REGION --name $MWAA_ENVIRONMENT_NAME --query 'Environment.ExecutionRoleArn' --output text )
MWAA_EXECUTION_ROLE_NAME=$(echo $MWAA_EXECUTION_ROLE_ARN | xargs basename) 
MWAA_EXECUTION_ROLE_POLICY=$(aws iam get-role --role-name $MWAA_EXECUTION_ROLE_NAME --query 'Role.AssumeRolePolicyDocument' --output json | jq '.Statement[0].Principal.Service += ["airflow-serverless.amazonaws.com"] | .Statement[0].Principal.Service |= unique | .Statement += [{"Sid": "AllowAirflowServerlessAssumeRole", "Effect": "Allow", "Principal": {"Service": "airflow-serverless.amazonaws.com"}, "Action": "sts:AssumeRole", "Condition": {"StringEquals": {"aws:SourceAccount": "${aws:PrincipalAccount}"}, "ArnLike": {"aws:SourceArn": "arn:aws:*:*:${aws:PrincipalAccount}:workflow/*"}}}]')

aws iam update-assume-role-policy --role-name $MWAA_EXECUTION_ROLE_NAME --policy-document "$MWAA_EXECUTION_ROLE_POLICY"

正常に変換された各 DAG を順にループし、それぞれにサーバーレスワークフローを作成できます。

S3_BUCKET=$(aws mwaa get-environment --name $MWAA_ENVIRONMENT_NAME --query 'Environment.SourceBucketArn' --output text --region us-east-2 | cut -d':' -f6)

for file in /tmp/yaml/*.yaml ; do MWAA_WORKFLOW_NAME=$(basename "$file" .yaml); \ 
    aws s3 cp "$file" s3://$S3_BUCKET/yaml/$MWAA_WORKFLOW_NAME.yaml --region us-east-2 ; \ 
    aws mwaa-serverless create-workflow --name $MWAA_WORKFLOW_NAME \ 
    --definition-s3-location "{\"Bucket\": \"$S3_BUCKET\", \"ObjectKey\": \"yaml/$MWAA_WORKFLOW_NAME.yaml\"}" --role-arn $MWAA_EXECUTION_ROLE_ARN  \ 
    --region us-east-2  
    done

作成したワークフローのリストを表示するには、次のコマンドを実行します。

aws mwaa-serverless list-workflows --region us-east-2

モニタリングと可視化

MWAA サーバーレスのワークフロー実行ステータスは、GetWorkflowRun API で返されます。結果には、その特定の実行の詳細が含まれます。ワークフロー定義にエラーがある場合、次の例のように RunDetailErrorMessage フィールドに返されます。

{
  "WorkflowVersion": "7bcd36ce4d42f5cf23bfee67a0f816c6",
  "RunId": "d58cxqdClpTVjeN",
  "RunType": "SCHEDULE",
  "RunDetail": {
    "ModifiedAt": "2025-11-03T08:02:47.625851 + 00:00",
    "ErrorMessage": "expected token ',', got 'create_test_table'",
    "TaskInstances": [],
    "RunState": "FAILED"
  }
}

適切に定義されているが、タスクが失敗したワークフローは、"ErrorMessage": "Workflow execution failed" を返します:

{
  "WorkflowVersion": "0ad517eb5e33deca45a2514c0569079d",
  "RunId": "ABC123456789def",
  "RunType": "SCHEDULE",
  "RunDetail": {
    "StartedOn": "2025-11-03T13:12:09.904466 + 00:00",
    "CompletedOn": "2025-11-03T13:13:57.620605 + 00:00",
    "ModifiedAt": "2025-11-03T13:16:08.888182 + 00:00",
    "Duration": 107,
    "ErrorMessage": "Workflow execution failed",
    "TaskInstances": [ 
      "ex_5496697b-900d-4008-8d6f-5e43767d6e36_create_bucket_1"
    ],
    "RunState": "FAILED"
  },
}

MWAA Serverless タスクログは、CloudWatch ロググループ /aws/mwaa-serverless/<workflow id>/ に保存されます (ここで /<workflow id> は、ワークフローの ARN の一意のワークフロー ID と同じ文字列です)。特定のタスクのログストリームを取得するには、実行されたワークフローのタスクを一覧表示し、各タスクの情報を取得する必要があります。これらの操作を 1 つの CLI コマンドに組み合わせることができます。

aws mwaa-serverless list-task-instances \ 
  --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ 
  --run-id ABC123456789def \ 
  --region us-east-2 \ 
  --query 'TaskInstances[].TaskInstanceId' \ 
  --output text | xargs -n 1 -I {} aws mwaa-serverless get-task-instance \ 
  --workflow-arn arn:aws:airflow-serverless:us-east-2:111122223333:workflow/simple_s3_test-abc1234def \ 
  --run-id ABC123456789def \ 
  --task-instance-id {} \ 
  --region us-east-2 \ 
  --query '{Status: Status, StartedAt: StartedAt, LogStream: LogStream}'

実行結果は、次のようになります:

{
    "Status": "SUCCESS",
    "StartedAt": "2025-10-28T21:21:31.753447+00:00",
    "LogStream": "workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=list_objects/attempt=1.log"
}
{
    "Status": "FAILED",
    "StartedAt": "2025-10-28T21:23:13.446256+00:00",
    "LogStream": "workflow_id=simple_s3_test-abc1234def/run_id=ABC123456789def/task_id=create_object_list/attempt=1.log"
}

その時点で、CloudWatch の LogStream 出力を使用してワークフローをデバッグします。 Amazon MWAA Serverless コンソールで、ワークフローを表示および管理できます:

AWS Lambda、Amazon CloudWatch、Amazon DynamoDBAmazon EventBridge を使用して詳細なメトリクスと監視ダッシュボードを作成する例については、この GitHub リポジトリの例を参照してください。

リソースのクリーンアップ

このチュートリアルで作成したすべてのリソースを削除して、継続的な課金を避けるには、次の手順に従ってください。

    1. MWAA Serverless ワークフローを削除する – すべてのワークフローを削除するには、次の AWS CLI コマンドを実行します:
      aws mwaa-serverless list-workflows --query 'Workflows[*].WorkflowArn' --output text | while read -r workflow ; do aws mwaa-serverless delete-workflow --workflow-arn $workflow done
    2. このチュートリアルで作成した IAM ロールとポリシーを削除します:
      aws iam delete-role-policy --role-name mwaa-serverless-access-role --policy-name mwaa-serverless-policy
    3. S3 バケットから YAML ワークフロー定義を削除します:
      aws s3 rm s3://amzn-s3-demo-bucket/yaml/ --recursive

これらのステップを完了したら、AWS マネジメントコンソールで、すべてのリソースが適切に削除されたことを確認してください。CloudWatch Logs はデフォルトで保持されるため、ワークフロー実行の痕跡をすべて削除したい場合は、別途削除する必要があります。

エラーが発生した場合は、必要な権限を持っているか、リソースが存在するかを確認してから削除を試みてください。一部のリソースには、特定の順序で削除する必要がある依存関係がある可能性があります。

結論

この記事では、Apache Airflow ワークフロー管理を簡素化する新しいデプロイメントオプションである Amazon MWAA Serverless について説明しました。YAML 定義を使用してワークフローを作成する方法、既存の Python DAG をサーバーレス形式に変換する方法、ワークフローを監視する方法を実演しました。 MWAA Serverless には以下のような主要な利点があります:

  • プロビジョニングのオーバーヘッドがありません
  • 従量課金モデルです
  • ワークフローの需要に基づいて自動的にスケーリングします
  • 細かい IAM 権限によってセキュリティが強化されています
  • YAML を使用してワークフロー定義が簡素化されています

MWAA Serverless の詳細は、ドキュメントを参照してください。


著者について

John は開発者、システムアーキテクト、プロダクトマネージャーとして、スタートアップと大企業の両方で25年以上のソフトウェア経験を持ち、Amazon MWAA を担当する AWS プリンシパルプロダクトマネージャーです。