自动预配 Apache Airflow 环境



约 25 美元

注册 / 登录 亚马逊云科技账户
- 本教程假设您具备 Apache Airflow 的应用知识
- 您需要确保有足够的容量来部署新的 VPC - 默认情况下,您可以在一个区域中部署 5 个 VPC。如果您已达到限制,您需要提高限制,或清理一个现有 VPC
- 已安装并配置 Amazon CDK(我使用的是 2.60.0 build 2d40d77)
- 能够访问支持适用于 Apache Airflow 的托管工作流的亚马逊云科技区域
- 已安装 Git 和 jq
- 代码是在 Linux 机器上开发的,并在 Mac 上经过测试,能正常运行。尽管我还没有进行测试,但它应该能在安装了适用于 Linux 的 Windows 子系统 (WSL) 的 Windows 计算机上运行。如果您在操作过程中遇到任何问题,您可以尝试在 Amazon Cloud9 IDE 环境中启动并执行代码。

本教程中使用的示例代码来自 GitHub

世界各地的企业都努力用数据驱动的决策为客户提供价值。数据分析是其成功的关键,但构建可扩展、可重用的数据管道并不容易。Apache Airflow 是一种开源编排工具,许多用户利用它自动化和简化数据管道的构建。随着使用量和采用率的扩大,管理 Airflow、向用户提供 Airflow 可能会非常繁琐。好消息是,我们可以大量借鉴现代开发技术(特别是 DevOps)来减少创建和扩展这些数据管道所需的时间。
学习内容
在本教程中,您将学习如何应用 DevOps 技术,轻松管理 Apache Airflow 环境。我们先介绍一些常见挑战,再介绍使用基础设施即代码 (IaC) 的自动化,展示如何通过自动化来解决这些问题。学习内容:
- 扩展 Apache Airflow 时的常见挑战和解决办法
- 如何使用 Amazon CDK 自动预配 Apache Airflow 基础设施
- 如何自动部署工作流和支持资源
与许多开源技术一样,您可以通过多种方式配置和部署 Apache Airflow。有人认为应该自行管理 Apache Airflow。也有人认为,使用适用于 Apache Airflow 的托管工作流 (MWAA) 等托管服务有利于降低运行 Apache Airflow 的复杂度和操作负担,也有助于更多构建者入门。
您应该花点时间来了解您在扩展 Apache Airflow 的过程中可能面临的挑战。那么有哪些挑战呢?
- Apache Airflow 这项技术用起来相当复杂,它有许多可配置项。您是否愿意或有能力承担这项管理工作?
- Apache Airflow 社区的创新是持续不断的,您的数据工程师希望快速利用最新更新。您能以多快的速度发布更新和更改来满足他们的需求?
- 您如何保证开发人员体验最佳,尽可能减少将工作流部署到生产环境时出现的问题?
- 考虑到安全性应从一开始就纳入考虑,您会如何划分职责,并确保开发人员需要访问的敏感信息的数量降至最低?
- 将工作流部署到生产环境可能会导致 Apache Airflow 故障,如何最大程度地避免这种情况?
- 新的 Python 库不断发布,新的数据工具也在不断变化。您要如何在 Apache Airflow 环境中使用它们?
首先要决定是使用托管的还是自行管理的 Apache Airflow 环境。一般而言,这个决定受特定业务或使用场景的多种因素影响。这些因素包括:
- 您是否需要提高访问级别,即对 Apache Airflow 的配置进行更高级别的控制
- 您是否需要使用 Apache Airflow 的最新版本或功能
- 您是否需要运行资源需求较高而托管服务恰能满足这种需求的工作流(例如需要大量算力)
总拥有成本:在评估托管与自行管理时,需要对比考虑托管服务的成本与您自己进行同样操作的总成本。两方面都要考虑其真正的总成本。经常有人只比较实际的算力和存储资源,却不考虑手动实现的时候多做的那些操作。
如果您对这些问题的答案是肯定的,那么您使用托管服务时可能不会满意。
教程指南
本教程将介绍如何自动预配托管和自行管理的 Apache Airflow 环境,然后介绍一些有助于改善开发人员体验、更轻松地将工作流投入生产的产品和服务选项。
首先将介绍如何使用适用于 Apache Airflow 的 Amazon 托管工作流 (MWAA) 实现托管 Apache Airflow 环境的自动化。我们将了解如何使用 Amazon Cloud Development Kit (Amazon CDK) 自动预配基础设施。然后,教程将展示如何构建自动部署工作流代码的管道。最后是一个端到端示例,示例使用 GitOps 方法通过 Git 存储库管理基础设施和工作流。
如果您希望通过自行管理的 Apache Airflow 实现上述目标,请关注下一个教程。下一个教程将探讨可用的产品和服务选项,然后详细介绍并构建 GitOps 方法来运行自行管理的 Apache Airflow 环境。
自动化 Apache Airflow 环境的托管工作流 (MWAA)
概览
MWAA 是一项全托管服务,允许您部署 Apache Airflow 的上游版本。本节展示如何使用基础设施即代码 (IaC) 部署 MWAA 环境。我们将使用 Amazon CDK 作为首选的基础设施即代码工具。最终结果将是您在亚马逊云科技上完成构建如下所示的 Apache Airflow 环境:

在部署 MWAA 环境时,了解所需的关键组件会很有帮助,这有助于确定需要自动化的内容。部署 MWAA 时,必须:
- 创建部署 MWAA 资源的 VPC(请参阅上面的架构图)
- 确保为 Airflow DAGs 文件夹定义的 S3 存储桶名称唯一
- 确定是否要将 Airflow 连接和变量与 Amazon Secrets Manager 集成
- 创建 MWAA 环境
CDK 堆栈
我们将使用 Amazon CDK 来自动部署和配置 MWAA 环境。由于 Apache Airflow 是面向 Python 开发人员的工具,因此我们将在 Python 中开发此“堆栈”(CDK 术语,指构建亚马逊云科技资源的应用程序)。
该代码位于支持存储库中。
查看代码时,您会注意到堆栈中有许多文件和资源:
├── app.py
├── cdk.json
├── mwaa_cdk
│ ├── mwaa_cdk_dev_env.py
│ └── mwaa_cdk_vpc.py
└── requirements.txt
堆栈包含许多关键元素,我们将详细探讨这些元素。我们希望可以根据不同的要求创建可重用的代码,因此我们将定义用于实现重用的配置参数,这样就可以使用代码创建多个环境。
app.py 文件是 CDK 应用的入口点,它定义将要部署的内容。您将看到我们定义了要部署的亚马逊云科技环境和区域,以及一些特定于 MWAA 的参数:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
#!/usr/bin/env python3
import aws_cdk as cdk
from mwaa_cdk.mwaa_cdk_vpc import MwaaCdkStackVPC
from mwaa_cdk.mwaa_cdk_dev_env import MwaaCdkStackDevEnv
env_EU=cdk.Environment(region="{AWSREGION}", account="{YOURAWSACCNT")
mwaa_props = {'dagss3location': 'mwaa-094459-devops-demo','mwaa_env' : 'mwaa-devops-demo', 'mwaa_secrets_var':'airflow/variables', 'mwaa_secrets_conn':'airflow/connections'}
app = cdk.App()
mwaa_devopswld_vpc = MwaaCdkStackVPC(
scope=app,
id="mwaa-devops-vpc",
env=env_EU,
mwaa_props=mwaa_props
)
mwaa_devopswld_env_dev = MwaaCdkStackDevEnv(
scope=app,
id="mwaa-devops-dev-environment",
vpc=mwaa_devopswld_vpc.vpc,
env=env_EU,
mwaa_props=mwaa_props
)
app.synth()
在此堆栈中定义的参数是:
- dagss3location - 这是 MWAA 用于存储 Airflow DAGs 的 Amazon S3 存储桶。您需要确保存储桶名称唯一,否则堆栈将运行失败
- mwaa_env - MWAA 环境的名称(将显示在亚马逊云科技控制台和所有 CLI 交互中)
接下来的两个参数将在本教程的下一章节出现,现在不必操心。
- mwaa_secrets_var - 这是用于与 Amazon Secrets Manager for Airflow Variables 集成的前缀
- mwaa_secrets_conn - 这个也是前缀,和前一个类似,但它用于 Airflow Connections。
有两个用于创建资源的堆栈。MwaaCdkStackVPC 用于创建部署 MWAA 的 VPC 资源。MwaaCdkStackDevEnv 用于创建 MWAA 环境。MwaaCdkStackDevEnv 依赖于 VPC 资源,因此我们将首先部署此堆栈。我们探究一下代码:
from aws_cdk import (
aws_ec2 as ec2,
Stack,
CfnOutput
)
from constructs import Construct
class MwaaCdkStackVPC(Stack):
def __init__(self, scope: Construct, id: str, mwaa_props, **kwargs) ->None:
super().__init__(scope, id, **kwargs)
# Create VPC network
self.vpc = ec2.Vpc(
self,
id="MWAA-DevOpsDemo-ApacheAirflow-VPC",
ip_addresses=ec2.IpAddresses.cidr("10.192.0.0/16"),
max_azs=2,
nat_gateways=1,
subnet_configuration=[
ec2.SubnetConfiguration(
name="public", cidr_mask=24,
reserved=False, subnet_type=ec2.SubnetType.PUBLIC),
ec2.SubnetConfiguration(
name="private", cidr_mask=24,
reserved=False, subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS)
],
enable_dns_hostnames=True,
enable_dns_support=True
)
CfnOutput(
self,
id="VPCId",
value=self.vpc.vpc_id,
description="VPC ID",
export_name=f"{self.region}:{self.account}:{self.stack_name}:vpc-id"
)
运行以下命令,部署 VPC:
cdk deploy mwaa-devops-vpc
它很快就会开始部署。完成部署需要 5-10 分钟。
✨ Synthesis time: 8.48s
mwaa-devops-vpc: building assets...
[0%] start: Building 79937fb364dbc1582cf6fdc5a8512d7b7e651a833c551d330d09d80a1bdd7820:704533066374-eu-west-2
[100%] success: Built 79937fb364dbc1582cf6fdc5a8512d7b7e651a833c551d330d09d80a1bdd7820:704533066374-eu-west-2
mwaa-devops-vpc: assets built
mwaa-devops-vpc: deploying... [1/1]
[0%] start: Publishing 79937fb364dbc1582cf6fdc5a8512d7b7e651a833c551d330d09d80a1bdd7820:704533066374-eu-west-2
[100%] success: Published 79937fb364dbc1582cf6fdc5a8512d7b7e651a833c551d330d09d80a1bdd7820:704533066374-eu-west-2
mwaa-devops-vpc: creating CloudFormation changeset...
[██▌·······················································] (1/23)
12:55:43 PM | CREATE_IN_PROGRESS | AWS::CloudFormation::Stack | mwaa-devops-vpc
12:55:48 PM | CREATE_IN_PROGRESS | AWS::EC2::EIP | MWAA-DevOpsDemo-Ap.../publicSubnet1/EIP
12:55:48 PM | CREATE_IN_PROGRESS | AWS::EC2::InternetGateway | MWAA-DevOpsDemo-ApacheAirflow-VPC/IGW
12:55:49 PM | CREATE_IN_PROGRESS | AWS::EC2::VPC | MWAA-DevOpsDemo-ApacheAirflow-VPC
✅ mwaa-devops-vpc
✨ Deployment time: 203.17s
Outputs:
mwaa-devops-vpc.ExportsOutputRefMWAADevOpsDemoApacheAirflowVPC6BCB1144F03B298E = vpc-0fe5d94126e565558
mwaa-devops-vpc.ExportsOutputRefMWAADevOpsDemoApacheAirflowVPCprivateSubnet1Subnet098AE99142C9E55B = subnet-0de1bb9b17a9be64d
mwaa-devops-vpc.ExportsOutputRefMWAADevOpsDemoApacheAirflowVPCprivateSubnet2Subnet49E71BD9F7E4C9D0 = subnet-0faae51f83750269e
mwaa-devops-vpc.VPCId = vpc-0fe5d94126e565558
Stack ARN:
arn:aws:cloudformation:eu-west-2:704533066374:stack/mwaa-devops-vpc/85e737d0-bdb0-11ed-929b-060b1defad6c
✨ Total time: 211.65s
现在可以查看 MwaaCdkStackDevEnv 堆栈,该堆栈将 MWAA 环境创建到刚刚创建的 VPC 中。该代码附有详尽的文档,帮助您了解其工作原理并按需进行自定义。注意我们使用 f"{mwaa_props['dagss3location'] 引入了在 app.py 中定义的参数,因此如果您想添加其他配置参数,可以按需调整和更改此代码。
文件的开头创建了用于存储工作流的 S3 存储桶(Airflow DAGs 文件夹),并为其添加 tag(标签)。
注意:此代码创建一个 S3 存储桶,其名称是配置中的参数后加一个 -dev,因此使用示例代码创建的 S3 存储桶的名称是 mwaa-094459-devops-demo-dev。
# Create MWAA S3 Bucket and upload local dags
s3_tags = {
'env': f"{mwaa_props['mwaa_env']}-dev",
'service': 'MWAA Apache AirFlow'
}
dags_bucket = s3.Bucket(
self,
"mwaa-dags",
bucket_name=f"{mwaa_props['dagss3location'].lower()}-dev",
versioned=True,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL
)
for tag in s3_tags:
Tags.of(dags_bucket).add(tag, s3_tags[tag])
dags_bucket_arn = dags_bucket.bucket_arn
文件的下一部分创建 MWAA 运行所需的各种 IAM 策略。该操作使用我们定义的参数,并仅提供所需的最低权限。因代码篇幅较长,此处不再赘述,您可以到代码存储库查看。
文件的下一部分也与安全相关,它为各种 MWAA 服务配置安全组,方便它们相互通信。
文件的下一部分定义了在创建 MWAA 环境时要使用的日志记录。日志关系到成本,因此请考虑适合您的特定使用场景的日志记录级别。您可以通过查看此处的文档来了解有关可以使用的不同日志记录级别的更多信息。
# **OPTIONAL** Configure specific MWAA settings - you can externalise these if you want
logging_configuration = mwaa.CfnEnvironment.LoggingConfigurationProperty(
dag_processing_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty(
enabled=True,
log_level="INFO"
),
task_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty(
enabled=True,
log_level="INFO"
),
worker_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty(
enabled=True,
log_level="INFO"
),
scheduler_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty(
enabled=True,
log_level="INFO"
),
webserver_logs=mwaa.CfnEnvironment.ModuleLoggingConfigurationProperty(
enabled=True,
log_level="INFO"
)
)
文件的下一部分中,我们可以自定义一些 Apache Airflow 配置选项。选项的文档都在 MWAA 网站上,但其中一部分我放在这里,方便您入门并按需定制。
options = {
'core.load_default_connections': False,
'core.load_examples': False,
'webserver.dag_default_view': 'tree',
'webserver.dag_orientation': 'TB'
}
tags = {
'env': f"{mwaa_props['mwaa_env']}-dev",
'service': 'MWAA Apache AirFlow'
}
文件的下一部分创建 KMS 密钥和支持 IAM 策略,以确保 MWAA 加密所有内容。
文件的最后一部分使用前面几节创建的所有对象来创建 MWAA 环境。您可以按需定制。
# Create MWAA environment using all the info above
managed_airflow = mwaa.CfnEnvironment(
scope=self,
id='airflow-test-environment-dev',
name=f"{mwaa_props['mwaa_env']}-dev",
airflow_configuration_options={'core.default_timezone': 'utc'},
airflow_version='2.4.3',
dag_s3_path="dags",
environment_class='mw1.small',
execution_role_arn=mwaa_service_role.role_arn,
kms_key=key.key_arn,
logging_configuration=logging_configuration,
max_workers=5,
network_configuration=network_configuration,
#plugins_s3_object_version=None,
#plugins_s3_path=None,
#requirements_s3_object_version=None,
#requirements_s3_path=None,
source_bucket_arn=dags_bucket_arn,
webserver_access_mode='PUBLIC_ONLY',
#weekly_maintenance_window_start=None
)
managed_airflow.add_override('Properties.AirflowConfigurationOptions', options)
managed_airflow.add_override('Properties.Tags', tags)
在继续操作之前,您应确保您选择的 S3 存储桶名称唯一且尚未创建。如果 CDK 部署失败,问题通常就在这里。
现在使用以下命令部署 MWAA:
cdk deploy mwaa-devops-dev-environment
这一次,系统将提示您继续。这是因为您是在创建 IAM 策略和其他安全配置,CDK 希望您确保在继续操作之前查看这些配置。检查后,回答 Y 即可开始部署
(NOTE: There may be security-related changes not in this list. See https://github.com/aws/aws-cdk/issues/1299)
Do you wish to deploy these changes (y/n)? y
mwaa-devops-dev-environment: deploying... [2/2]
[0%] start: Publishing 3474e40ce2d4289e489135153d5803eeddfaf5690820166aa763fe36af91ff54:704533066374-eu-west-2
[0%] start: Publishing 2bc265c5e0569aeb24a6349c15bd54e76e845892376515e036627ab0cc70bb64:704533066374-eu-west-2
[0%] start: Publishing 91ab667f7c88c3b87cf958b7ef4158ef85fb9ba8bd198e5e0e901bb7f904d560:704533066374-eu-west-2
[0%] start: Publishing 928e66c0f69701e1edd93d9283845506b7ca627455684b4d91a8a96f13e187d0:704533066374-eu-west-2
[25%] success: Published 2bc265c5e0569aeb24a6349c15bd54e76e845892376515e036627ab0cc70bb64:704533066374-eu-west-2
[50%] success: Published 91ab667f7c88c3b87cf958b7ef4158ef85fb9ba8bd198e5e0e901bb7f904d560:704533066374-eu-west-2
[75%] success: Published 928e66c0f69701e1edd93d9283845506b7ca627455684b4d91a8a96f13e187d0:704533066374-eu-west-2
[100%] success: Published 3474e40ce2d4289e489135153d5803eeddfaf5690820166aa763fe36af91ff54:704533066374-eu-west-2
mwaa-devops-dev-environment: creating CloudFormation changeset...
[████▏·····················································] (1/14)
1:13:16 PM | CREATE_IN_PROGRESS | AWS::CloudFormation::Stack | mwaa-devops-dev-environment
这将需要 25-30 分钟,您可以休息一下。完成后,您应该会看到类似如下的输出:
✅ mwaa-devops-dev-environment
✨ Deployment time: 1659.28s
Outputs:
mwaa-devops-dev-environment.MWAASecurityGroupdev = sg-0f5553f9c1f37d0fe
Stack ARN:
arn:aws:cloudformation:eu-west-2:704533066374:stack/mwaa-devops-dev-environment/88d05bf0-bdda-11ed-b72d-0227f9640b62
✨ Total time: 1670.12s
检查环境
恭喜,您已经完成了 MWAA 环境部署的自动化。但这只是个开始,还有别的步骤要自动化,接下来介绍这些步骤。在此之前,先检查安装情况,确保一切正常。
转到 MWAA 控制台,您应该可以看到列出的新环境,以及指向基于 Web 的 Apache Airflow UI 的链接。

也可以在命令行中用以下命令获取这个链接:
aws mwaa get-environment --name {name of the environment created} --region={region} | jq -r '.Environment | .WebserverUrl'
该命令输出指向 Apache Airflow UI 的链接:
89ba6225-846e-43e6-8abc-53f43d8ccdc1.c2.eu-west-2.airflow.amazonaws.com
将其输入浏览器就可以看到 Apache Airflow UI。

回顾 MWAA 的托管工作流自动化和后续步骤
本教程的第一章节探讨了如何使用 Amazon CDK 创建可配置的堆栈,该堆栈允许我们通过 Apache Airflow 托管服务的托管工作流 (MWAA) 部署 Apache Airflow 环境。在下一节中,将在此基础上研究怎样自动化 Apache Airflow 的另一个重要部分:Connection(连接)和 Variable(变量)。
自动化连接和变量
Apache Airflow 允许您将数据存储在其元存储中,您在编写工作流时可以依赖这些数据。这样,您就可以参数化代码并创建更多可重用的工作流。Airflow 提供两种方式帮助您实现这一点:通过变量 (Variable) 和连接 (Connection) 来存储信息。变量 (Variable) 是可以在 Airflow 代码中引用的键值对。连接 (Connection) 用于操作符 (Operator),旨在抽离连接和认证细节,这使得您可以区别系统管理员与安全人员所掌握的信息(诸如密码等连接到各种服务所需的敏感信息,这是不应该公开的)和开发人员需要了解的信息(即连接 ID)。无论是变量还是连接,在 Airflow 元存储中都是经过加密的。
阅读更多 您可以查看这篇详细的博文,更深入地了解这个主题。
Apache Airflow UI 提供了一种存储变量和连接详细信息的方法,但您希望最好能直接用预配基础设施的方式来预配这些变量和连接详细信息。我们可以将 MWAA 集成至 Amazon Secrets Manager,这意味着我们可以用管理 MWAA 的同一套工具来统一管理所有的变量以及连接信息。具体方式为首先定义一个前缀,并使用该前缀将变量和连接存储在 Amazon Secret Manager 中,最后用定义的前缀将 Amazon Secrets Manager 集成到 Airflow 中。这样一来,在 Airflow 查找变量和连接信息时,就会对 Amazon Secrets Manager 进行查询操作。
集成 Amazon Secret Manager
首先必须启用集成。我们需要设置两个 Airflow 配置设置。调整原来的 CDK 代码,添加以下内容。注意,我们用的是在 app.py 中定义的配置参数,以便轻松设置所需前缀。我们不想在代码中固定连接和变量的前缀,因此在 app.py 文件中定义了一些额外的配置参数,这些参数将使用 airflow/variables 和 airflow/connections 作为 MWAA 中的集成点:
'secrets.backend' : 'airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend',
'secrets.backend_kwargs' : { "connections_prefix" : f"{mwaa_props['mwaa_secrets_conn']}" , "variables_prefix" : f"{mwaa_props['mwaa_secrets_var']}" } ,
代码现在应如下所示:
options = {
'core.load_default_connections': False,
'core.load_examples': False,
'webserver.dag_default_view': 'tree',
'webserver.dag_orientation': 'TB',
'secrets.backend' : 'airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend',
'secrets.backend_kwargs' : { "connections_prefix" : f"{mwaa_props['mwaa_secrets_conn']}" , "variables_prefix" : f"{mwaa_props['mwaa_secrets_var']}" } ,
}
具体原理要定义 MWAA 可以使用的变量或连接,请用您定义的前缀,在 Amazon Secrets Manager 中创建这些变量或连接。上例中,二者分别设置为 airflow/variables 和 airflow/connections。如果创建一个名为 airflow/variable/foo 的新密钥,那么在 Airflow 工作流中,就可以在 Airflow 代码中使用 Variable.get 函数,以 foo 为名称来引用这个变量。
深入了解 阅读 John Jackson 的博文,其中更详细地介绍了此功能 -> 将您的 Apache Airflow 连接和变量移至 Amazon Secrets Manager
如果我们更新和重新部署 CDK 应用程序,一旦 MWAA 完成更新,集成将会尝试访问 Amazon Secrets 以获取此信息。但该操作会失败,因为 MWAA 环境访问 Amazon Secrets Manager 中密钥的功能尚未启用,因此我们需要修改 CDK 应用程序,添加一些其他权限:
mwaa_secrets_policy_document = iam.Policy(self, "MWAASecrets",
statements=[
iam.PolicyStatement(
actions=[
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds",
"secretsmanager:ListSecrets"
],
effect=iam.Effect.ALLOW,
resources=[
f"arn:aws:secretsmanager:{self.region}:{self.account}:secret:{mwaa_props['mwaa_secrets_var']}*",
f"arn:aws:secretsmanager:{self.region}:{self.account}:secret:{mwaa_props['mwaa_secrets_conn']}*",
],
),
]
)
mwaa_service_role.attach_inline_policy(mwaa_secrets_policy_document)
运行以下命令,更新环境:
cdk deploy mwaa-devops-dev-environment
在系统提示检查安全更改后,CDK 将进行更改,MWAA 将进行更新。这将需要 20-25 分钟,您可以再休息一会。完成后,您应该看到类似以下内容:
mwaa-devops-dev-environment: creating CloudFormation changeset...
✅ mwaa-devops-dev-environment
✨ Deployment time: 873.73s
Outputs:
mwaa-devops-dev-environment.MWAASecurityGroupdev = sg-0f5553f9c1f37d0fe
Stack ARN:
arn:aws:cloudformation:eu-west-2:704533066374:stack/mwaa-devops-dev-environment/88d05bf0-bdda-11ed-b72d-0227f9640b62
✨ Total time: 883.92s
测试变量
在 Amazon Secrets Manager 中创建一些变量和连接,然后创建一个示例工作流来查看显示的值,即可测试变量是否正常工作。
我们先创建一个新密钥。注意要将密钥存储在部署 MWAA 环境所在的亚马逊云科技区域中。我们可以使用 Amazon CLI 在命令行中执行此操作。
aws secretsmanager create-secret --name airflow/variables/buildon --description "Build on AWS message" --secret-string "rocks!" --region={your region}"
提示:如果您想在部署 MWAA 环境时提供一组标准变量或连接,更新 CDK 应用程序并使用 Amazon Secrets 构造即可添加这些变量或连接。但是,您一定要了解,采取这样的操作会使这些数值公开,因此不要分享您重要的“秘密”。最好不要在预配环境时部署和配置密钥,防止其以明文可见的形式存储。
现在可以创建一个工作流来测试是否可以看到此值。
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
message = Variable.get("buildon", default_var="could not find variable on secret manager")
def print_hello():
print(message)
return 'Hello Wolrd'
dag = DAG('hello_world_schedule', description='Hello world example', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries = 3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
注意,我们使用了标准的 Airflow 变量处理方式(from airflow.models import Variable)然后在工作流中创建了一个新变量来获取在 Amazon Secrets Manager 中定义的变量(/airflow/variables/buildon),但我们仅将其简单引用为 buildon。为了应对上述操作失败的情况,我们还设置了一个默认值,这在排查相关问题时会很有帮助。
message = Variable.get("buildon", default_var="could not find variable on secret manager")
将此工作流复制到 MWAA DAGs 文件夹 S3 存储桶来部署此工作流,几分钟后,您可以启用并触发此工作流。日志输出结果应如下所示:
[2023-03-09, 08:43:26 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(PythonOperator): hello_task> on 2023-03-09 08:43:18.798898+00:00
[2023-03-09, 08:43:26 UTC] {{standard_task_runner.py:55}} INFO - Started process 1378 to run task
[2023-03-09, 08:43:26 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'hello_world_schedule', 'hello_task', 'manual__2023-03-09T08:43:18.798898+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/sample-cdk-dag.py', '--cfg-path', '/tmp/tmpxpdth0nh']
[2023-03-09, 08:43:26 UTC] {{standard_task_runner.py:83}} INFO - Job 5: Subtask hello_task
[2023-03-09, 08:43:27 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: hello_world_schedule.hello_task manual__2023-03-09T08:43:18.798898+00:00 [running]> on host ip-10-192-2-6.eu-west-2.compute.internal
[2023-03-09, 08:43:27 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=hello_world_schedule
AIRFLOW_CTX_TASK_ID=hello_task
AIRFLOW_CTX_EXECUTION_DATE=2023-03-09T08:43:18.798898+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-09T08:43:18.798898+00:00
[2023-03-09, 08:43:27 UTC] {{logging_mixin.py:137}} INFO - rocks!
[2023-03-09, 08:43:27 UTC] {{python.py:177}} INFO - Done. Returned value was: Hello Wolrd
[2023-03-09, 08:43:27 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=hello_world_schedule, task_id=hello_task, execution_date=20230309T084318, start_date=20230309T084326, end_date=20230309T084327
[2023-03-09, 08:43:27 UTC] {{local_task_job.py:159}} INFO - Task exited with return code 0
连接
一个比较普遍的问题是关于如何处理存储在 Amazon Secrets Manager 中的连接信息。那么,现在让我们来看一下这个问题。假设要创建与 Amazon Redshift 集群的连接。在 Apache Airflow UI 中,我们通常进行如下配置:

我们会将其存储在 Amazon Secrets Manager 中,如下所示:
aws secretsmanager create-secret \
--name airflow/connections/redshift_default \
--description "Connect to Amazon Redshift Cluster BuildON" \
--secret-string "Postgres://awsuser:XXXXX@airflow-summit.cq7hpqttbcoc.eu-west-1.redshift.amazonaws.com:5439/mwaa" \
--region={your region}"
当您在 Apache Airflow 中将 redshift_default 引用为一个连接时,它将使用这些值。某些连接需要在 Extra(额外)字段中添加信息,那如何添加呢?如果连接需要 Extra 字段信息,就以 ?{parameter}={value}&{parameter}={value} 这样的格式来附加额外信息。把这个格式应用到上边的内容,即可创建如下所示的密钥:
aws secretsmanager create-secret \
--name airflow/connections/redshift_default \
--description "Connect to Amazon Redshift Cluster BuildON" \
--secret-string "Postgres://awsuser:XXXXX@airflow-summit.cq7hpqttbcoc.eu-west-1.redshift.amazonaws.com:5439/mwaa?param1=value1¶m2=value2" \
--region={your region}"
高级功能
亚马逊云科技集成,也即与 Amazon Secrets Manager 的集成,是由 Apache Airflow Amazon Provider 软件包提供的。此软件包定期更新,并提供所有不同种类的 Airflow Operator,使您能够将其与亚马逊云科技服务集成。如果您使用的是较新版本的 Amazon Provider 软件包(版本 7.3 或更高版本),则可以在配置 Amazon Secrets Manager 时执行一些其他操作,例如:
- 选择是同时使用变量和连接,还是仅使用其中之一;
- 此外,您还可以指定正则表达式,将(存储在 Airflow 元存储中的)原生 Airflow 变量、连接与 Amazon Secrets Manager 相结合
在以下示例中,Airflow 只会在 Amazon Secrets Manager 中查找定义为 aws-* 的连接,例如 aws-redshift,或 aws-athena。
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"connections_lookup_pattern": "^aws-*",
"profile_name": "default"
}
您可以在 Apache Airflow 的 Amazon Secrets Manager 后端文档页面中查看完整详细信息
自动化连接和变量的回顾以及后续步骤
本教程的这一章节研究了如何在 Apache Airflow 中自动化变量和连接,以及它们如何帮助我们创建可重用的工作流。本教程的下一章节将讲解如何构建将工作流交付到 Apache Airflow 环境中的自动化管道。
构建工作流部署管道
我们使用 Amazon CDK 自动预配了 MWAA 环境,也已经启动并运行了 Apache Airflow。在本教程的下一章节中,我们学习如何自动将工作流部署到这些环境。在此之前,先快速回顾一下 MWAA 如何加载其工作流和支持资源。
MWAA 使用 Amazon S3 存储桶作为 DAGs 文件夹。另外,配置中还有一个指定附加库的值,这个值指向上传到 S3 存储桶的 requirements.txt 文件的特定版本。最后,如果要部署自定义 Airflow 插件,那么还需要将这些插件部署到 S3 存储桶,再更新 MWAA 配置。
我们先创建一个简单的管道,该管道从 Amazon CodeCommit 上托管的 Git 存储库中获取工作流,然后自动将其部署到 MWAA 环境中。
创建管道
接下来自动预配管道和所有支持资源。开始之前,考虑一下我们具体需要些什么。为了创建一个将工作流部署到 MWAA 环境中的自动化管道,我们:
- 需要有一个源代码存储库,开发人员在这个存储库中提交其最终工作流代码
- 在存储库中检测到新代码时,要运行一定的测试
- 如果工作流代码通过了所有测试,我们可能希望在将其推送至 MWAA 环境之前获得最终审查或批准
- 最后一步是管道将工作流传送到 MWAA DAGs 文件夹中
我们会将其分解为多个步骤,方便对照说明操作。查看代码存储库时可以看到里面有一些 CDK 代码,这些代码会用于预配支持基础设施,但我们也有源 DAGs,用于初始化 MWAA 环境。
├── app.py
├── cdk.json
├── mwaa_pipeline
│ ├── MWAAPipeline.py
├── repo
│ └── dags
│ ├── first_bash_copy_file.py
│ └── new_workflow.py
└── requirements.txt
CDK 应用程序非常简单,包含定义要使用的配置值的初始入口点文件,以及用于构建管道基础设施的代码(MWAAPipeline)。来看 app.py:
import aws_cdk as cdk
from mwaa_pipeline.MWAAPipeline import MWAAPipeline
env=cdk.Environment(region="AWSREGION", account="AWSACCOUNT")
airflow_props = {'code_repo_name': 'mwaa-dags','branch_name' : 'main', 'dags_s3_bucket_name' : 'mwaa-094459-devops-demo-dev'}
app = cdk.App()
mwaa_cicd_pipeline = MWAAPipeline(
scope=app,
id="mwaa-pipeline",
env=env,
airflow_props=airflow_props
)
app.synth()
可以看到我们定义了以下内容:
- code_repo_name 和 branch_name,用于创建 Amazon CodeCommit 存储库
- dags_s3_bucket_name 是 MWAA 环境的 DAGs 文件夹名称
我们在堆栈本身 (MWAAPipeline) 中创建 CodeCommit 存储库并配置 CodePipeline 和 CodeBuild 步骤。从这段代码中可以看到,我们首先为 DAGs 创建代码存储库。
# Setup CodeCommit repo and copy initial files
dags_bucket = f"{airflow_props['dags_s3_bucket_name']}"
dags_bucket_arn = s3.Bucket.from_bucket_name(self, "GetDAGSbucket", f"{airflow_props['dags_s3_bucket_name']}").bucket_arn
repo = codecommit.Repository(
self,
"Repository",
repository_name=f"{airflow_props['code_repo_name']}",
code=codecommit.Code.from_directory("repo",f"{airflow_props['branch_name']}")
)
cdk.CfnOutput(self, "Repository_Name", value=repo.repository_name)
我们定义了一个 CodeBuild 任务,用于将 DAGs 部署到部署 MWAA 环境时创建的 S3 DAGs 文件夹。我们为将在 CodeBuild 运行程序中创建的 S3 存储桶定义一个名为 $BUCKET_NAME 的环境变量,其目的在于重用此管道。
deploy = codebuild.Project(
self,
"DAGS_Deploy",
source=codebuild.Source.code_commit(repository=repo, branch_or_ref=f"{airflow_props['branch_name']}"),
environment=codebuild.BuildEnvironment(compute_type=codebuild.ComputeType.SMALL, privileged=True, build_image=codebuild.LinuxBuildImage.AMAZON_LINUX_2_4),
environment_variables={"BUCKET_NAME": codebuild.BuildEnvironmentVariable(value=dags_bucket)},
build_spec=codebuild.BuildSpec.from_object(
dict(
version="0.2",
phases={
"pre_build": {"commands": ["aws --version", "echo $BUCKET_NAME"]},
"build": {"commands": ["cd dags", "aws s3 sync . s3://$BUCKET_NAME/dags/ --delete"]},
},
)
),
)
注意,我们只是使用 Amazon CLI 将文件从签出存储库同步到目标 S3 存储桶。如果您按上述步骤不加更改地尝试此操作,它将失败,这是因为 CodeBuild 需要权限。添加权限也很容易。我们将访问级别范围缩小到只包含 DAGs 存储桶。
deploy.add_to_role_policy(iam.PolicyStatement(
actions=["s3:*"],
effect=iam.Effect.ALLOW,
resources=[ f"{dags_bucket_arn}", f"{dags_bucket_arn}/*" ],)
)
代码的最后一部分是管道的不同 stage(阶段)。这是一个非常简单的管道,它只有两个阶段:
source_output = codepipeline.Artifact()
deploy_output = codepipeline.Artifact("AirflowImageBuild")
pipeline = codepipeline.Pipeline(
self,
"MWAA_Pipeline"
)
source_action = codepipeline_actions.CodeCommitSourceAction(
action_name="CodeCommit",
repository=repo,
output=source_output,
branch=f"{airflow_props['branch_name']}"
)
build_action = codepipeline_actions.CodeBuildAction(
action_name="Build_action",
input=source_output,
project=deploy,
outputs=[deploy_output]
)
pipeline.add_stage(
stage_name="Source",
actions=[source_action]
)
pipeline.add_stage(
stage_name="Deployment",
actions=[build_action]
)
使用以下命令部署管道,在查看弹出的安全信息后回答 y。
cdk deploy mwaa-pipeline
几分钟后,您可以在 Amazon CodePipelines 控制台中签入,现在应该有一个新管道出现。新管道应当已经开始执行,并且很可能正在运行。完成后,应该有两个工作流 DAGs 出现在 Apache Airflow UI 中。(注意:MWAA 环境可能要花上 4-5 分钟才能读取到这两个 DAG 的存在。)

实施中间步骤
目前为止创建的工作流都很简单。每次提交时,构建管道都会自动将其同步到 MWAA S3 DAGs 文件夹。虽然这对简易开发环境来说可能已经足够,但理想状况下,您应该在构建管道中添加一些附加步骤。例如:
- 运行测试 - 先运行一些基本测试,再将文件部署到 S3 DAGs 文件夹,这样可以确保文件有效,并减少部署时出错的可能性
- 审批 - 在部署到生产环境之前实施额外的审批流程
我们可以增强 CDK 代码,添加其他步骤,轻松实现这些目标。
添加测试阶段
建议您实现将 DAG 部署到 S3 DAGs 文件夹前的测试阶段。本例将使用一个非常简化的测试,但实际场景中要考虑执行许多不同的测试,确保将 DAG 可靠地部署到 MWAA 环境中。
我们将使用 CodeCommit 存储库来存储运行测试所需的脚本、资源文件和二进制文件等资产。
我们可以使用之前创建的已有管道,添加一个新阶段,在其中执行一些测试。为此,添加一个新的构建步骤,在其中定义想要执行的操作:
test = codebuild.Project(
self,
"DAGS_Test",
source=codebuild.Source.code_commit(repository=repo, branch_or_ref=f"{airflow_props['branch_name']}"),
environment=codebuild.BuildEnvironment(compute_type=codebuild.ComputeType.SMALL, privileged=True, build_image=codebuild.LinuxBuildImage.AMAZON_LINUX_2_4),
environment_variables={"BUCKET_NAME": codebuild.BuildEnvironmentVariable(value=dags_bucket)},
build_spec=codebuild.BuildSpec.from_object(
dict(
version="0.2",
phases={
"pre_build": {"commands": ["aws --version", "echo $BUCKET_NAME"]},
"build": {"commands": ["echo 'Testing'"]},
},
)
),
)
然后添加阶段并修改现有阶段,如下所示:
source_output = codepipeline.Artifact()
test_output = codepipeline.Artifact()
deploy_output = codepipeline.Artifact("AirflowImageBuild")
pipeline = codepipeline.Pipeline(
self,
"MWAA_Pipeline"
)
source_action = codepipeline_actions.CodeCommitSourceAction(
action_name="CodeCommit",
repository=repo,
output=source_output,
branch=f"{airflow_props['branch_name']}"
)
test_action = codepipeline_actions.CodeBuildAction(
action_name="Test",
input=source_output,
project=test,
outputs=[test_output]
)
build_action = codepipeline_actions.CodeBuildAction(
action_name="Build_action",
input=source_output,
project=deploy,
outputs=[deploy_output]
)
pipeline.add_stage(
stage_name="Source",
actions=[source_action]
)
pipeline.add_stage(
stage_name="Testing",
actions=[test_action]
)
pipeline.add_stage(
stage_name="Deployment",
actions=[build_action]
)
重新部署 CDK 应用来更新管道:
cdk deploy mwaa-pipeline
几分钟后,新的测试阶段就会出现。本例中的测试仅仅是 echo(回显)一条 test 而已。但您在此步骤中要添加您的所有常用命令,并定义它们。您还可以在 Git 存储库中包含并使用其他资源(例如,测试工具的单元测试或配置文件)。
添加审批阶段
您可能还需要添加审批入口。添加以下代码即可轻松地将审批入口加进管道中:
approval = pipeline.add_stage(
stage_name="Approve")
manual_approval_action = codepipeline_actions.ManualApprovalAction(
action_name="Approve",
notification_topic=sns.Topic(self, "Topic"), # optional
notify_emails=["YOUR@EMAIL.ADDR"],
additional_information="additional info")
approval.add_action(manual_approval_action)
必须在部署阶段之前添加此步骤,因此在存储库的最终代码中,我们有:
pipeline.add_stage(
stage_name="Testing",
actions=[test_action]
)
approval = pipeline.add_stage(
stage_name="Approve")
manual_approval_action = codepipeline_actions.ManualApprovalAction(
action_name="Approve",
notification_topic=sns.Topic(self, "Topic"), # optional
notify_emails=["YOUR@EMAIL.ADDR"],
additional_information="additional info")
approval.add_action(manual_approval_action)
pipeline.add_stage(
stage_name="Deployment",
actions=[build_action]
)
使用 cdk deploy mwaa-pipeline 重新部署 CDK 应用程序时,您将收到一封电子邮件,确认您愿意收到刚刚设置的审批流程的通知(否则您将不会收到通知)。
You have chosen to subscribe to the topic:
arn:aws:sns:eu-west-2:704533066374:mwaa-pipeline-TopicBFC7AF6E-KUjxGcbAFog2
To confirm this subscription, click or visit the link below (If this was in error no action is necessary):
Confirm subscription
Please do not reply directly to this email. If you wish to remove yourself from receiving all future SNS subscription confirmation requests please send an email to sns-opt-out
更改工作流代码后,当管道开始运行时,您将收到一封电子邮件通知,要求您检查和批准更改。在执行此操作,也就是点击 Approval(批准)链接之前,DAG 不会部署。这是我使用此代码时收到的示例邮件:
Hello,
The following Approval action is waiting for your response:
--Pipeline Details--
Pipeline name: mwaa-pipeline-MWAAPipeline97839E8E-B58OAGC4ALOT
Stage name: Approve
Action name: Approve
Region: eu-west-2
--Approval Details--
Approve or reject: https://console.aws.amazon.com/codesuite/codepipeline/pipelines/mwaa-pipeline-MWAAPipeline97839E8E-B58OAGC4ALOT/view?region=eu-west-2#/Approve/Approve/approve/a049b46a-56ad-45af-971a-476975a657d7
Additional information: additional info
Deadline: This review request will expire on 2023-03-16T18:56Z
Sincerely,
Amazon Web Services
点击该链接将直接转到亚马逊云科技控制台,按需进行审核和批准。

批准后,管道将继续,部署步骤将更新 DAG。恭喜,您已完成 DAG 部署自动化。
高级自动化主题
目前为止,我们只是非常粗略地了解了怎样将 DevOps 原则应用于数据管道。如果您想更深入地了解,您可以探索一些其他主题,以进一步自动化和扩展您的 Apache Airflow 工作流。
参数和可重用工作流
创建可重用的工作流有助于扩展数据管道的使用方式。一种常见的技术是创建基于参数的通用工作流,从而提高这些工作流的重用率。有许多方法可以帮助您提高工作流的重用率。要了解更多信息,您可以查看在适用于 Apache Airflow 的 Amazon 托管工作流中使用参数和变量这篇博文。
使用私有 Python 库存储库
在构建工作流的过程中,您借助 Python 库来达成目标。公共库对于许多组织来说是一个隐患,他们希望控制这些库的加载位置。此外,开发团队也会创建内部库,这些库需要地方存储。构建者经常用私有存储库来解决这个问题。用 Amazon MWAA 和 Amazon CodeArtifact 管理 Python 依赖项这篇博文讲解了如何将 Amazon MWAA 与 Amazon CodeArtifact 集成来管理 Python 依赖项。
可观察性 - CloudWatch 控制面板和指标
针对适用于 Apache Airflow 的 Amazon 托管工作流自动化 Amazon CloudWatch 控制面板和告警这篇博文提供了一种解决方案,可自动检测与亚马逊云科技账户关联的、已部署的 Airflow 环境,为每个环境构建 CloudWatch 仪表板和一些有用的警报。
介绍 Amazon MWAA 环境的容器、数据库和队列利用率指标这篇博文深入探讨了指标。该文章能使您更好了解有关 Amazon MWAA 环境性能的指标,学会解决容量、延迟相关问题,并洞悉如何适当调整 Amazon MWAA 环境的大小。
构建工作流部署管道的回顾和后续步骤
本教程的这一章节讲解了如何构建自动将工作流从开发人员交付到 Apache Airflow 环境的管道。本教程的下一环节,我们将全面整合这些知识点,深入研究一个端到端的全自动化方案,同时覆盖了基础设施和工作流的自动化处理。
构建端到端管道
到目前为止,我们已经构建了一个自动化部署 MWAA 环境的方法,并实现了一个自动化部署工作流到 MWAA 环境的方式。现在我们将进一步,打造一个解决方案,这个方案将实现 GitOps,能够自动地根据 Git 存储库中的配置信息来预配和更新 MWAA 环境,同时部署您的工作流和所有关联资源(例如您在 requirements.txt 中定义的额外 Python 库,或者您想在工作流中使用的自定义插件)。
这就是我们要构建的解决方案。我们将有两个不同的 Git 存储库,供两组不同的开发人员使用。负责基础设施预配(包括支持包、Python 库等的部署)的 MWAA 管理员将使用其中一个 Git 存储库管理 MWAA 环境。Airflow 开发人员将在另外一个存储库中创建代码。两个组将使用 Git 进行交互以更新和进行更改。

这次依然使用 Amazon CDK 来实现自动化。首先,浏览此解决方案的文件。从展开的文件树中可以看到大致的文件结构。
└── cdk-amazon-mwaa-cicd
├── app.py
├── cdk.json
├── setup.py
├── mwaairflow
│ ├── __init__.py
│ ├── assets
│ │ ├── plugins
│ │ │ ├── __init__.py
│ │ │ ├── operators
│ │ │ │ ├── __init__.py
│ │ │ │ └── salesforce_to_s3_operator.py
│ │ │ └── salesforce_to_s3_plugin.py
│ │ ├── plugins.zip
│ │ └── requirements.txt
│ ├── mwaairflow_stack.py
│ ├── nested_stacks
│ │ ├── __init__.py
│ │ ├── environment.py
│ │ ├── project.py
│ │ ├── provisioning.py
│ │ └── vpc.py
│ └── project
│ ├── Makefile
│ ├── cookiecutter-config-file.yml
│ ├── dags
│ │ ├── __init__.py
│ │ └── example_dag.py
│ ├── poetry.lock
│ ├── pyproject.toml
│ ├── setup.cfg
│ ├── src
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── example.py
│ │ └── py.typed
│ └── tests
│ └── test_example
│ └── test_hello.py
└── requirements.txt
setup.py 用于初始化 Python,并确保此堆栈的所有依赖项都可用。实例中需要以下内容:
"aws-cdk-lib==2.68.0",
"constructs>=10.0.0,<11.0.0",
"boto3"
CDK 应用程序的入口点是 app.py,其中定义亚马逊云科技账户和区域信息。还有一个名为 mwaairflow 的目录,其中包含许多关键目录:
- assets - 此目录包含您想要部署至 MWAA 环境的资源,其中包括一个 requirements.txt 文件。您可以利用该文件自定义选择要安装的 Python 库,然后打包并部署一个 plugin.zip,其中包含一些您可能会用到的自定义 Airflow 操作符的示例代码。本例中可以看到自定义的 Salesforce Operator
- nested_stacks - 此目录包含负责预配 VPC 基础设施、部署 MWAA 环境以及最终部署管道的 CDK 代码
- project - 此目录包含要在 DAGs 文件夹中部署的 Airflow 工作流。本例提供了一些有关 Python 代码检查和测试的额外代码,您可以改动这些代码,在部署自己的工作流之前运行一下
Makefile:在之前的管道中,我们定义了通过 Amazon CodeBuild Buildspec 文件部署工作流的机制。这次我们创建了 Makefile 文件,在其中创建了测试、验证、部署等许多不同的任务。这次部署 DAG 时,只需运行 make deploy $bucket_name=,并指定要使用的目标 S3 存储桶。
在前面的示例中,我们自动构建了 MWAA 环境,在 app.py 文件中定义了配置值。这次我们用另外一种方式来传入配置参数。在 Amazon CDK 中,您可以在执行 cdk deploy 命令时使用 -- context,传入键值对格式的配置值。
cdk deploy --context vpcId=vpcid --context envName=mwaademo {cdkstack}
- vpcId - 如果您已有满足 MWAA 要求的 VPC(例如,您可能希望在同一 VPC 中部署多个 MWAA 环境),则可以传入要部署到的 VPCId。例如,您将使用 --context vpcId=vpc-095deff9b68f4e65f。
- cidr - 如果要创建新的 VPC,可以使用此参数定义首选 CIDR 块(否则将使用默认值 172.31.0.0/16)。例如,您将使用 --context cidr=10.192.0.0/16。
- subnetIds - 是子网 ID 的逗号分隔列表,集群将在这些子网中部署。如果没有提供 ID,它将在同一可用区中查找私有子网。
- envName - 表示 MWAA 环境名称的字符串,如果不设置,则默认为 MwaaEnvironment。例如 --context envName=MyAirflowEnv。
- envTags - 允许您为 MWAA 资源设置 tag(标签),这里需要提供一个 json 表达式。例如,您将使用 --context envTags='{"Environment":"MyEnv","Application":"MyApp","Reason":"Airflow"}'。
- environmentClass - 允许您配置 MWAA Worker 大小(mw1.small、mw1.medium、mw1.large,默认为mw1.small)。例如 --context environmentClass=mw1.medium。
- maxWorkers - 更改 MWAA Max Workers 的数量,默认为 1。例如 --context maxWorkers=2。
- webserverAccessMode - 定义 MWAA 环境使用公共端点还是私有端点(使用 PUBLIC_ONLY 还是 PRIVATE_ONLY)。例如,您将使用 --context webserverAccessMode=PUBLIC_ONLY 模式(取值为 private 或 public)。
- secretsBackend - 配置是否要与 Amazon Secrets Manager 集成,值为 Airflow 或 SecretsManager。例如,您将使用 --context secretsBackend=SecretsManager。
我们可以查看 app.py 文件调用的 mwaairflow_stack 文件,了解 CDK 应用程序如何使用它。
# Try to get Stack params
self.subnet_ids_list = self.node.try_get_context("subnetIds") or ""
self.env_name = self.node.try_get_context("envName") or "MwaaEnvironment"
self.env_tags = self.node.try_get_context("envTags") or {}
self.env_class = self.node.try_get_context("environmentClass") or "mw1.small"
self.max_workers = self.node.try_get_context("maxWorkers") or 1
self.access_mode = (
self.node.try_get_context("webserverAccessMode") or "PUBLIC_ONLY"
)
self.secrets_backend = self.node.try_get_context("secretsBackend")
使用以下命令部署此堆栈:
cdk deploy \
--context cidr=10.192.0.0/16 \
--context envName=MWAAe2e \
--context envTags= '{"Environment":"MWAAe2e","Application":"MyApp","Reason":"Airflow"}' \
--context secretsBackend=SecretsManager \
--context webserverAccessMode=PUBLIC_ONLY MWAAirflowStack
这大约需要 25-30 分钟才能完成,您可以休息一下。完成后,您可以在控制台中看到一个新的 MWAA 环境。

转到 Amazon CodeCommit,可以看到我们有两个存储库:mwaa-provisioning 和 mwaaproject。
mwaa-provisioning
查看此存储库中的源文件可以看到,它们是最初用于部署环境的堆栈的副本。
├── app.py
├── cdk.context.json
├── cdk.json
├── mwaairflow
│ ├── __init__.py
│ ├── assets
│ │ ├── plugins
│ │ │ ├── __init__.py
│ │ │ ├── operators
│ │ │ │ ├── __init__.py
│ │ │ │ └── salesforce_to_s3_operator.py
│ │ │ └── salesforce_to_s3_plugin.py
│ │ ├── plugins.zip
│ │ └── requirements.txt
│ ├── mwaairflow_stack.py
│ ├── nested_stacks
│ │ ├── __init__.py
│ │ ├── environment.py
│ │ ├── project.py
│ │ ├── provisioning.py
│ │ └── vpc.py
│ └── project
│ ├── Makefile
│ ├── archive
│ │ ├── code.zip
│ │ └── docker
│ │ ├── Dockerfile
│ │ └── README.md
│ ├── code.zip
│ ├── dags
│ │ ├── __init__.py
│ │ └── example_dag.py
│ ├── poetry.lock
│ ├── pyproject.toml
│ ├── setup.cfg
│ ├── src
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── example.py
│ │ └── py.typed
│ └── tests
│ └── test_example
│ └── test_hello.py
├── requirements.txt
└── setup.py
系统管理员如果想更新 MWAA 环境的配置(例如更改 Airflow 配置设置、更改 MWAA Worker 的大小或更改日志记录设置),只需要签出存储库、更改 mwaairflow_stack 文件中的代码,然后将更改推送回 Git 存储库。该操作将启动 Amazon CodePipeline 并触发重新配置。
若要更新 Python 库,或要让插件新版本在 Worker 上可用,操作也是一样的。只需要调整 assets 文件夹中的文件即可,当将其提交回 Git 存储库时,它将触发 MWAA 环境的重新配置。
在这两个示例中,您可能会触发 MWAA 环境重新启动,这取决于具体更改。在开始操作之前,请您务必了解这一点。
来看一个简单更改。通过更新 requirements.txt 来更新 Python 库是常见操作。我们将更新 MWAA 环境,以使用更高版本的 Amazon Provider 软件包。为此需要签出存储库,进行更改,然后提交回去。
git clone https://git-codecommit.eu-west-2.amazonaws.com/v1/repos/mwaa-provisioning
cd mwaa-provisioning
vi requirements.txt
更新 Amazon Provider 软件包,使它从:
apache-airflow==2.4.3
apache-airflow-providers-salesforce==5.1.0
apache-airflow-providers-amazon==6.0.0 # Old version
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-mongo==3.0.0
更改为:
apache-airflow==2.4.3
apache-airflow-providers-salesforce==5.1.0
apache-airflow-providers-amazon==7.1.0 # New version
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-mongo==3.0.0
然后将此更改推送到存储库:
git add .
git commit -m "update requirements.txt to update amazon provider to 7.1"
git push
可以看到管道已经启动:

完成后,进入 MWAA 环境可以看到有一个较新的文件,但较旧的文件仍处于活动状态。

更新 requirements.txt
您可能想知道为什么 MWAA 环境尚未设置最新的 requirements.txt。原因在于该操作将触发环境重启,而这也正是您在执行该操作之前需要注意的地方。您可以自动化这个操作,其方法是将以下内容添加到 CodeBuild 部署阶段的部署部分:
bucket_name=f"{bucket.bucket_name}"
mwaa_env=f"{env_name}"
latest=$(aws s3api list-object-versions --bucket $bucket_name --prefix requirements/requirements.txt --query 'Versions[?IsLatest].[VersionId]' --output text)
aws mwaa update-environment --name $mwaa_env --requirements-s3-object-version=$latest
提示:如果要单独运行此操作,只需将 bucket_name 和 mwaa_env 变量设为适合您环境的值。
这样就会使用最新版本的 requirements.txt 文件触发环境更新。
mwaaproject
从存储库中的源文件可以看到,它们中有可部署到 Airflow DAGs 文件夹(对于 MWAA 来说是一个 S3 存储桶)的文件。
├── Makefile
├── dags
│ ├── __init__.py
│ └── example_dag.py
├── poetry.lock
├── pyproject.toml
├── setup.cfg
├── src
│ ├── __init__.py
│ ├── __main__.py
│ ├── example.py
│ └── py.typed
└── tests
└── test_example
└── test_hello.py
在本例中,我们仅用 DAGs 文件夹来存储工作流。添加或更改工作流时,更改一旦被提交到 Git 存储库,就将触发 Amazon CodePipeline 运行 Makefile deploy(部署)任务,将 DAGs 文件夹复制到 MWAA 环境。您可以使用此工作流,也可以调整它来执行更复杂的工作流,例如开发可能在工作流中使用的支持 Python 资源。
在本例中,我们仅用 DAGs 文件夹来存储工作流。添加或更改工作流时,更改一旦被提交到 Git 存储库,就将触发 Amazon CodePipeline 运行 Makefile 部署任务,将 DAGs 文件夹复制到 MWAA 环境。您可以使用此工作流,也可以调整它来执行更复杂的工作流,例如开发可能在工作流中使用的支持 Python 资源。
我们可以通过一个添加新工作流的简单示例来直观理解。先在本地签出存储库,然后添加新的工作流文件 (demo.py),该文件位于源存储库中。
git clone https://git-codecommit.eu-west-2.amazonaws.com/v1/repos/mwaaproject
cd mwaaproject/dags
cp demo.py .
再将它提交回存储库:
git add .
git commit -m "new dag - demo.py"
git push
可以看到,这会触发 CodePipeline。


几分钟后可以看到部署成功,回到 Apache Airflow UI 可以看到新的工作流。

检查 CodeBuild 日志 若要了解环境和工作流管道运行过程的更多详细信息,可以查看 CodeBuild 运行程序的日志。
恭喜您,您完成了本教程。本教程协助您应用 DevOps 原则自动交付 MWAA 环境,简化工作流的部署方式。在结束之前,请务必清理环境,以防资源继续运行产生费用。
清理资源
使用 CDK 运行以下命令,删除所有按本博文指导创建的资源:要删除本教程第一章节中的资源,请执行
cdk destroy mwaa-pipeline
cdk destroy mwaa-devops-dev-environment
cdk destroy mwaa-devops-vpc
要删除本教程第二章节中的资源,即端到端堆栈,请执行
cdk destroy MWAAirflowStack
注意:由于无法删除 S3 存储桶,删除过程将失败。您应通过亚马逊云科技控制台删除这些存储桶(先使用 Empty 再使用 Delete),然后通过 CloudFormation 控制台手动删除堆栈。
大功告成!
在本教程中,我们了解了自动化 Apache Airflow 的一些挑战,以及如何应用 DevOps 原则来解决这些问题。我们了解了如何使用 Amazon 的 Managed Workflow for Apache Airflow (MWAA) 来实现这一点。在下一篇博文中,我们将介绍如何使用自行管理的 Apache Airflow 环境来实现这一点。