亚马逊AWS官方博客

推出 Amazon Managed Workflows for Apache Airflow (MWAA)

随着数据处理流水线容量和复杂性的增大,您可以将整个过程分解为一系列较小的任务来将其简化,然后将这些任务作为工作流的一部分来协调它们的执行。为此,许多开发人员和数据工程师使用 Apache Airflow,它是一个由社区创建的用于以编程方式编写、安排和监控工作流的平台。借助 Airflow,您可以将工作流作为脚本进行管理,通过用户界面 (UI) 对其进行监控,并使用一组强大的插件扩展其功能。但是,手动安装、维护和扩展 Airflow,以及处理用户的安全性、身份验证和授权需要花费大量您用于集中精力解决实际业务问题的时间。

出于这些原因,我很高兴地宣布将推出Amazon Managed Workflows for Apache Airflow (MWAA)。它是一项完全托管的服务,使您可以轻松地在 AWS 上运行开源版 Apache Airflow 并构建工作流来执行您的提取、转换、加载 (ETL) 作业和数据流水线。

Airflow 工作流使用 Amazon Athena 查询从 Amazon Simple Storage Service (S3) 等资源中检索输入,在 Amazon EMR 集群上执行转换,而且可以使用由此产生的数据在 Amazon SageMaker 上培训机器学习模型。使用 Python 编程语言将 Airflow 中的工作流编写为有向无环图 (DAG)

Airflow 的一个关键优势是它可以通过插件实现开放式扩展。借助这一优势,您可以创建与 AWS 或工作流所需的本地资源交互的任务,包括 AWS BatchAmazon CloudWatchAmazon DynamoDBAWS DataSyncAmazon ECS 和 、Amazon Elastic Kubernetes Service (EKS)Amazon Kinesis Firehose、、AWS LambdaAmazon RedshiftAmazon Simple Queue Service (SQS) 以及 Amazon Simple Notification Service (SNS)

为了提高可观察性,Airflow 指标可以作为 CloudWatch 指标发布,日志也可以发送到 CloudWatch Logs。默认情况下,Amazon MWAA 自动执行次要版本升级和修补程序,并提供指定执行这些升级的维护时段的选项。

您可以通过以下三个步骤使用 Amazon MWAA

  1. 创建环境 — 每个环境都包含 Airflow 集群,其中包括计划程序、工作线程和 Web 服务器。
  2. 将 DAG 和插件上传到 S3 — Amazon MWAA 可自动将代码加载到 Airflow 中。
  3. 在 Airflow 中运行 DAG — 从Airflow UI 或命令行界面 (CLI) 运行 DAG,并使用 CloudWatch 监控环境。

我们来看看这些步骤的实际操作!

如何使用 Amazon MWAA 创建 Airflow 环境
Amazon MWAA 控制台中,单击创建环境。为环境命名并选择要使用的 Airflow 版本

然后,选择 S3 存储桶和文件夹以加载 DAG 代码。存储桶名称必须以 airflow- 开头。

或者,也可以指定插件文件和要求文件:

  • 插件文件是一个 ZIP 文件,其中包含 DAG 所用的插件。
  • 要求文件描述了运行 DAG 的 Python 依赖关系。

对于插件和要求,可以选择要使用的 S3 对象版本。如果使用的插件或要求在环境中造成了不可恢复的错误,Amazon MWAA 会自动回滚到以前的工作版本。


单击下一步以配置高级设置,从网络开始配置。所有环境都在使用两个可用区中私有子网的 Amazon Virtual Private Cloud 中运行。Web 服务器对 Airflow UI 的访问始终受安全登录的保护,该安全登录使用 AWS Identity and Access Management (IAM)。不过,您可以选择在公共网络上访问 Web 服务器,以便通过互联网或 VPC 中的私有网络登录。为简单起见,我选择了公共网络。让 Amazon MWAA 创建一个具有正确出入站规则的新安全组。或者,还可以添加一个或多个现有安全组,来对环境的出入站流量的控制进行微调。

现在,我们来配置环境类。每个环境都包括一个计划程序、一个 Web 服务器和一个工作线程。工作线程会根据工作负载自动扩展和缩小。我们根据 DAG 的数量为您建议要使用哪个环境类,但您可以监控环境中的负载并随时修改其对应的类。

静态数据始终启用了加密,虽然我可以选择由 AWS Key Management Service (KMS) 托管的自定义密钥,但相反,此处我保留 AWS 代表我持有及管理的默认密钥。

将环境性能发布到 CloudWatch 指标以便对环境进行监控。默认情况下此功能已启用,但可以在启动后禁用 CloudWatch 指标。对于日志,可以指定日志级别以及哪些 Airflow 组件应将其日志发送到 CloudWatch Logs。此处我保留默认设置,即仅发送任务日志并使用 INFO 日志级别。

Airflow 配置选项的默认设置可以修改,例如 default_task_retriesworker_concurrency。此处我没有修改这些值。

最后但最重要的是,配置权限,环境会使用这些权限访问 DAG、写日志和运行访问其他 AWS 资源的 DAG。选择创建新角色,然后单击创建环境。几分钟后,新的 Airflow 环境就可以使用了。

使用 Airflow UI
Amazon MWAA 控制台中,找到刚刚创建的新环境,然后单击打开 Airflow UI。此时会显示一个新的浏览器窗口,使用 AWS IAM 通过安全登录进行身份验证。

然后在 movie_list_dag.py 文件中找到放在 S3 上 DAG。DAG 将会下载 MovieLens 数据集,使用 Amazon Athena 处理 S3 上的文件,将结果加载到 Redshift 集群并创建表(如缺失)。

以下是 DAG 的完整源代码:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='files/'
redshift_cluster='my-redshift-cluster'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/latest/'
download_http='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies (
  `movieId` int,
  `title` string,
  `genres` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/movies.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_ratings_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings (
  `userId` int,
  `movieId` int,
  `rating` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/ratings.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_tags_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags (
  `userId` int,
  `movieId` int,
  `tag` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/tags.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
join_tables_athena_query="""
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM demo_athena_db.ML_Latest_Small_Movies m
INNER JOIN (SELECT rating, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE rating > 4) r on m.movieId = r.movieId
"""
def download_zip():
    s3c = boto3.client('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.read(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Body=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.client('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Body=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:role/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title	character varying, rating	int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False 
}
with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",query=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",query=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",query=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",query=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

在代码中,使用 PythonOperator(适用于通用 Python 代码)或 AWSAthenaOperator 之类的运算符创建不同的任务,以使用与 Amazon Athena 的集成。要了解这些任务如何连接到工作流,您可以参阅最新的几行,为方便起见,我在这里重复一遍(未缩进):

check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

Airflow 代码在 Python 中重载右移 >> 运算符来创建依赖关系,这意味着应首先执行左侧的任务,然后将输出传递给右侧的任务。查看代码便可以很轻松地理解。上面四行代码中的每一行都添加了依赖关系,并且四行代码会一起受到评估,以便按正确的顺序执行任务。

在 Airflow 控制台中,可以看到 DAG 的图形视图,该图形清楚地表示了任务的执行方式:

现已推出
Amazon Managed Workflows for Apache Airflow (MWAA) 现已在美国东部(弗吉尼亚北部)、美国西部(俄勒冈)、美国东部(俄亥俄)、亚太地区(新加坡)、亚太地区(东京)、亚太地区(悉尼)、欧洲(爱尔兰)、欧洲(法兰克福)和欧洲(斯德哥尔摩)推出。您可以通过控制台、AWS 命令行界面 (CLI)AWS 开发工具包启动新的 Amazon MWAA 环境。然后,您可以使用 Airflow 的集成生态系统在 Python 中开发工作流。

Amazon MWAA 的收费将根据您使用的环境类别和工作线程收取。有关更多信息,请参阅定价页面

上游兼容性是 Amazon MWAA 的核心原则。我们对 AirFlow 平台的代码更改将发布回开源。

借助 Amazon MWAA,您可以将更多时间花在构建工程和数据科学任务的工作流中,减少管理和扩展 Airflow 平台基础设施的时间。

立即了解有关 Amazon MWAA 的更多信息,并开始使用。

Danilo