亚马逊AWS官方博客

使用 Apache Airflow、Genie 和 Amazon EMR 编排大数据工作流:第 2 部分

在 AWS 上运行大数据 ETL 工作流的大型企业的运营规模很大,它们为很多内部终端用户提供服务,并且同时运行数千个管道。再加上需要持续更新和扩展大数据平台以第一时间掌握新框架和最新版本的大数据处理框架,这便需要高效的架构和组织结构,既能简化大数据平台的管理,又能方便对大数据应用进行访问。

此博文系列的第 1 部分中,您已经学习了如何使用 Apache Airflow、Genie 和 Amazon EMR 管理大数据工作流。本博文将指导您部署 AWS CloudFormation 模板、配置 Genie 以及运行在 Apache Airflow 中创作的示例工作流。

先决条件

在此演练中,您应符合以下先决条件:

解决方案概览

此解决方案使用 AWS CloudFormation 模板创建必要的资源。

用户通过连接至堡垒主机的 SSH 隧道访问 Apache Airflow Web UI 和 Genie Web UI。

Apache Airflow 部署将 Amazon ElastiCache for Redis 用作 Celery 后端、将 Amazon EFS 用作挂载点来存储 DAG 并将 Amazon RDS PostgreSQL 用于数据库服务。

Genie 将 Apache Zookeeper 用于指挥者(leader)选举,使用 Amazon S3 存储桶存储配置(二进制文件、应用程序依赖项、集群元数据)并将 Amazon RDS PostgreSQL 用于数据库服务。Genie 将作业提交至 Amazon EMR 集群。

此博文中的架构仅供演示使用。在生产环境中,Apache Airflow 和 Genie 实例应是 Auto Scaling 组的一部分。有关更多信息,请参阅 Genie 参考指南上的部署

下图展示了解决方案的架构。

在 AWS Systems Manager Parameter Store 中创建和存储管理员密码

此解决方案使用 AWS Systems Manager Parameter Store 来存储配置脚本中使用的密码。利用 AWS Systems Manager Parameter Store,您可以创建安全字符串参数,这些参数具有纯文本参数名称和加密参数值。Parameter Store 使用 AWS KMS 来加密和解密安全字符串参数的值。

部署 AWS CloudFormation 模板之前,请执行以下 AWS CLI 命令。这些命令通过创建 AWS Systems Manager Parameter Store 参数来存储 RDS 主用户(master user)、Airflow DB 管理员和 Genie DB 管理员密码。

aws ssm put-parameter --name "/GenieStack/RDS/Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/AirflowSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/GenieSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

为解决方案创建 Amazon S3 存储桶并将解决方案构件上传至 S3

此解决方案使用 Amazon S3 来存储解决方案中使用的所有构件。部署 AWS CloudFormation 模板之前,请创建一个 Amazon S3 存储桶并通过此链接下载解决方案所需的构件。

解压解决方案所需的构件,然后将 airflowgenie 目录上传至您刚刚创建的 Amazon S3 存储桶。记录 Amazon S3 根路径,因为您稍后需要将它作为参数添加到 AWS CloudFormation 模板中。

例如,以下屏幕截图使用根位置 geniestackbucket

使用您创建的 Amazon S3 存储桶作为 AWS CloudFormation 参数 GenieS3BucketLocation 和 AirflowBucketLocation的值。

部署 AWS CloudFormation 堆栈

要启动整个解决方案,请选择启动堆栈

下表解释了模板所需的参数。对于表中未列出的任何参数,您均可接受缺省值。有关完整的参数列表,请参阅 AWS CloudFormation 模板。

参数
配置构件的位置 GenieS3BucketLocation 具有 Genie 构件和 Genie 安装脚本的 S3 存储桶。例如,geniestackbucket
AirflowBucketLocation 具有 Airflow 构件的 S3 存储桶。例如,geniestackbucket
网络 SSHLocation 至 Genie、Apache Zookeeper 和 Apache Airflow EC2 实例的 SSH IP 地址范围。
安全性 BastionKeyName 用于启用对堡垒主机实例的 SSH 访问权限的现有 EC2 密钥对。
AirflowKeyName 用于启用对 Apache Airflow 实例的 SSH 访问权限的现有 EC2 密钥对。
ZKKeyName 用于启用对 Apache Zookeeper 实例的 SSH 访问权限的现有 EC2 密钥对的名称。
GenieKeyName 用于启用对 Genie 的 SSH 访问权限的现有 EC2 密钥对。
EMRKeyName 用于启用对 Amazon EMR 集群的 SSH 访问权限的现有 Amazon EC2 密钥对。
日志记录 emrLogUri 用于存储 Amazon EMR 集群日志的 S3 位置。例如:s3://replace-with-your-bucket-name/emrlogs/

部署后的步骤

要访问 Apache Airflow 和 Genie Web 界面,请设置 SSH 并为您的浏览器配置 SOCKS 代理。请执行以下步骤:

  1. 在 AWS CloudFormation 控制台上,选择您创建的堆栈。
  2. 选择输出
  3. 找到堡垒主机实例的公开 DNS。以下屏幕截图显示的是此博文使用的实例。
  4. 使用动态端口转发设置至主节点的 SSH 隧道
    使用堡垒主机实例的公开 DNS 并将用户 hadoop 替换为用户 ec2-user,而不是使用文档中作为参考的引用的集群的主机公开 DNS 名称和用户名 hadoop。
  1. 配置代理设置,以查看主节点上托管的网站
    您无需修改文档中的任何参考步骤。

此流程将配置 SOCKS 代理管理工具,通过该工具,您可以根据文本模式自动筛选 URL 并将代理设置限制为与 Amazon EC2 实例的公开 DNS 名称格式匹配的域。

访问 Apache Airflow 和 Genie 的 Web UI

要访问 Apache Airflow 和 Genie 的 Web UI,请执行以下步骤:

  1. 在 CloudFormation 控制台上,选择您创建的堆栈。
  2. 选择输出
  3. 找到 Apache Airflow 和 Genie Web UI 的 URL。以下屏幕截图显示的是此博文使用的 URL。
  1. 在 Web 浏览器中打开两个标签页。您将使用这两个标签页来访问 Apache Airflow UI 和 Genie UI。
  2. 对于您先前配置的 Foxy Proxy,请单击添加到浏览器右上角的“Foxy Proxy”图标,并选择根据预定义的目视和优先级使用代理。以下屏幕截图显示代理选项。
  1. 在相应的标签页上输入 Apache Airflow Web UI 和 Genie Web UI 的 URL。

现在,您已准备就绪,可以运行此解决方案中的工作流。

准备应用程序资源

作为平台管理工程师,第一步是准备平台支持的大数据应用程序的二进制文件和配置。在本博文中,Amazon EMR 集群使用版本 5.26.0。Amazon EMR 版本 5.26.0 已安装 Hadoop 2.8.5 和 Spark 2.4.3,因此,这些即为您希望在大数据平台中支持的应用程序。如果决定使用其他 EMR 版本,请准备适合于这些版本的二进制文件和配置。如果您想要使用其他 EMR 版本,那么,以下部分将指导您完成二进制文件准备步骤。

要准备 Genie 应用程序资源,请创建一个 YAML 文件,该文件具有在创建应用程序资源的请求中发送至 Genie 的字段。

该文件定义与应用程序相关的元数据信息,如应用程序名称、类型、版本、标签、在设置脚本 S3 上的位置以及应用程序二进制文件的位置。有关更多信息,请参阅 Genie REST API 指南中的创建应用程序

应用程序资源的标签结构

此博文使用以下应用程序资源标签:

  • type – 应用程序类型,如 Spark、Hadoop、Hive、Sqoop 或 Presto。
  • version – 应用程序版本,如 Hadoop 2.8.5。

下一部分将介绍如何在 YAML 文件中定义应用程序资源的标签。您可以添加任意数量的标签,以便与 Genie 资源关联。此外,除了平台管理员定义的标签外,Genie 还会维持其自己的标签,您可以在文件的字段 ID 和字段名称中看到它们。

准备 Hadoop 2.8.5 应用程序资源

此博文演示的是自动创建 YAML 文件。以下代码显示作为结果的文件详细信息:

id: hadoop-2.8.5
name: hadoop
user: hadoop
status: ACTIVE
description: Hadoop 2.8.5 Application
setupFile: s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/setup.sh
configs: []
version: 2.8.5
type: hadoop
tags:
  - type:hadoop
  - version:2.8.5
dependencies:
  - s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.tar.gz

该文件还可以直接从以下位置获取:s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml

注意:如果手动完成此过程,而不是使用提供的自动选项,则以下步骤仅供参考。

setupFile 和依赖项标签中引用的 S3 对象在 S3 存储桶中可用。您可以参考如下所示的属性 setupFile 和依赖项使用的构件准备步骤:

  1. 下载 hadoop-2.8.5.tar.gz,地址:https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/
  2. hadoop-2.8.5.tar.gz 上传至 s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/

准备 Spark 2.4.3 应用程序资源

此博文演示的是自动创建 YAML 文件。以下代码显示作为结果的文件详细信息:

id: spark-2.4.3
name: spark
user: hadoop
status: ACTIVE
description: Spark 2.4.3 Application
setupFile: s3://Your_Bucket_Name/genie/applications/spark-2.4.3/setup.sh
configs: []
version: 2.4.3
type: spark
tags:
  - type:spark
  - version:2.4.3
  - version:2.4
dependencies:
  - s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

该文件还可以直接从以下位置获取:s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml

注意:如果手动完成此过程,而不是使用提供的自动选项,则以下步骤仅供参考。

setupFile 和依赖项中的对象在 S3 存储桶中可用。您可以参考如下所示的属性 setupFile 和依赖项使用的构件准备步骤:

  1. 下载 spark-2.4.3-bin-hadoop2.7.tgz,地址:https://archive.apache.org/dist/spark/spark-2.4.3/
  2. spark-2.4.3-bin-hadoop2.7.tgz 上传至 s3://Your_Bucket_Name/genie/applications/spark-2.4.3/

由于 spark-2.4.3-bin-hadoop2.7.tgz 使用的是 Hadoop 2.7 而不是 Hadoop 2.8.3,因此,您需要从运行 Hadoop 2.7 的 EMR 集群(版本 5.11.3)中提取 Hadoop 2.7 的 EMRFS 库。它已位于您的 S3 存储桶中。以下所示的 EMRFS 库提取步骤可供您参考:

  1. 部署 EMR 集群版本 5.11.3。
  2. 运行以下命令:
aws s3 cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.20.0.jar s3://Your_Bucket_Name/genie/applications/spark-2.4.3/hadoop-2.7/aws/emr/emrfs/lib/

准备命令资源

作为平台管理管理师,第二步是准备平台支持的 Genie 命令。

本博文中的工作流使用 Apache Spark。此部分介绍准备 Apache Spark 类型的命令资源的步骤。

要准备 Genie 命令资源,请创建一个 YAML 文件,该文件具有在创建命令资源的请求中发送至 Genie 的字段。

该文件定义与命令相关的元数据信息,如命令名称、类型、版本、标签、在设置脚本 S3 上的位置以及在执行命令过程中使用的参数。有关更多信息,请参阅 Genie REST API 指南中的创建命令

命令资源的标签结构

此博文使用以下命令资源标签结构:

  • type – 命令类型,如 spark-submit。
  • version – 命令版本,例如,Spark 2.4.3。

下一部分将介绍如何在 YAML 文件中定义命令资源的标签。此外,除了平台管理员定义的标签外,Genie 还会维持其自己的标签,您可以在文件的字段 ID 和字段名称中看到它们。

准备 spark-submit 命令资源

此博文演示的是自动创建 YAML 文件。以下代码显示作为结果的文件详细信息:

id: spark-2.4.3_spark-submit
name: Spark Submit 
user: hadoop 
description: Spark Submit Command 
status: ACTIVE 
setupFile: s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/setup.sh
configs: [] 
executable: ${SPARK_HOME}/bin/spark-submit --master yarn --deploy-mode client 
version: 2.4.3 
tags:
  - type:spark-submit
  - version:2.4.3
checkDelay: 5000

该文件也可从以下位置获取:s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml

setupFile 中的对象在 S3 存储桶中可用。

准备集群资源

在此博文中也将自动执行集群资源准备步骤 ;它采用与先前所述类似但适用于集群资源的流程。

在 Amazon EMR 集群启动过程中,自定义脚本将创建一个其中包含与集群相关的元数据详细信息的 YAML 文件,并将文件上传至 S3。有关更多信息,请参阅 Genie REST API 指南中的创建集群

该脚本也会提取所有 Amazon EMR 库并将其上传至 S3。下一部分将讨论使用 Genie 注册集群的过程。

该脚本位于以下位置:s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh

集群资源的标签结构

此博文使用以下集群资源标签结构:

  • cluster.release – The Amazon EMR 发布版本名称.例如,emr-5.26.0。
  • cluster.id – Amazon EMR 集群 ID。例如,j-xxxxxxxx
  • cluster.name – Amazon EMR 集群名称。
  • cluster.role – 与此集群关联的角色。在此博文中,角色为batch。其他角色可能是临时查询或者 Presto。

您可以为集群资源添加新标签或者更改现有标签值,方法是编辑如下文件: s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh

此外,您还可以使用其他标签组合,例如用于标识应用程序生命周期环境或所需自定义 jar 的标签。

此外,除了平台管理员定义的标签外,Genie 还会维持其自己的标签,您可以在文件的字段 ID 和字段名称中看到它们。如果多个集群共用相同标签,默认情况下,Genie 将在与此相同标签关联的集群中随机分配作业。有关更多信息,请参阅 Genie 参考指南中的集群负载均衡

使用 Genie 注册资源

到目前为止,前面部分提到的所有配置活动均已准备完毕。

下面的部分将介绍如何使用 Genie 注册资源。在本部分,您将通过 SSH 连接至堡垒以运行配置命令。

注册应用程序资源

要注册您在前面部分准备的应用程序资源,请通过 SSH 连接至堡垒主机并运行以下命令:

python /tmp/genie_assets/scripts/genie_register_application_resources.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

要查看资源信息,请导航至 Genie Web UI 并选择应用程序选项卡。请参见以下屏幕截图。该屏幕截图显示了两个应用程序资源,一个是 Apache Spark(版本 2.4.3),另一个是 Apache Hadoop(版本 2.8.5)。

注册命令并将命令与应用程序关联

下一步是通过特定应用程序注册 Genie 命令资源。在此博文中,因为 spark-submit 依赖于 Apache Hadoop 和 Apache Spark,因此,请将 spark-submit 命令与两个应用程序关联。

您在文件 genie_register_command_resources_and_associate_applications.py 中为应用程序定义的顺序非常重要。由于 Apache Spark 依赖于 Apache Hadoop,因此,文件将会先引用 Apache Hadoop,然后再引用 Apache Spark。请参阅以下代码:

commands = [{'command_name' : 'spark-2.4.3_spark-submit', 'applications' : ['hadoop-2.8.5', 'spark-2.4.3']}]

要注册命令资源并将其与上一步骤中注册的应用程序资源关联,请通过 SSH 连接至堡垒主机并运行以下命令:

python /tmp/genie_assets/scripts/genie_register_command_resources_and_associate_applications.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

要查看您注册的命令及其所链接的应用程序,请导航至 Genie Web UI 并选择命令选项卡。

以下屏幕截图显示了命令的详细信息及其所链接的应用程序。

注册 Amazon EMR 集群

如前面所述,在此解决方案中部署的 Amazon EMR 集群将在集群通过 Amazon EMR 步骤启动时注册该集群。您可以通过以下位置访问 Amazon EMR 集群使用的脚本:s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh。当集群终止时,脚本也会通过 Genie 自动取消注册集群。

在 Genie Web UI 中,选择集群选项卡。此页面向您显示当前的集群资源。此外,您还可以在注册过程中找到上传至集群 S3 位置的配置文件的位置。

以下屏幕截图显示了集群的详细信息以及配置文件(yarn-site.xml、core-site.xml、mapred-site.xml)的位置。

将命令链接至集群

现在,您已注册了所有应用程序、命令和集群,并且已将命令与其依赖的应用程序关联。最后一步是将命令链接至配置为运行它的特定 Amazon EMR 集群。

请完成以下步骤:

  1. 通过 SSH 链接至堡垒主机。
  2. 在您的首选文本编辑器中打开 /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py
  3. 在代码中查找以下行:# Change cluster_name below
    clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
    ['spark-2.4.3_spark-submit']}]
  1. 将文件中的 j-xxxxxxxx 替换为 cluster_name
    要查看集群的名称,请导航至 Genie Web UI 并选择集群
  2. 要将命令链接至特定 Amazon EMR 资源,请运行以下命令:
    python /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

命令现已链接至集群。

在 Genie Web UI 中,选择命令选项卡。此页面向您显示了当前的命令资源。选择 spark-2.4.3_spark_submit 并查看与命令关联的集群。

以下屏幕截图显示了命令的详细信息及其所链接的集群。

您已配置完 Genie 及所有资源;它现在即可接收作业请求。

运行 Apache Airflow 工作流

本博文中未提供工作流代码和数据集的详细描述。此部分详细介绍了 Apache Airflow 如何通过本博文中提供的 GenieOperator 将作业提交至 Genie。

Apache Airflow 的 GenieOperator

通过 GenieOperator,数据工程师可以定义标签组合,以标识在其中运行任务的命令和集群。

在以下示例代码中,集群标签为“emr.cluster.role:batch”,命令标签为“type:spark-submit”和“version:2.4.3”。

spark_transform_to_parquet_movies = GenieOperator(
    task_id='transform_to_parquet_movies',
    cluster_tags=['emr.cluster.role:batch'],
    command_tags=['type:spark-submit', 'version:2.4.3'],
    command_arguments="transform_to_parquet.py s3://{}/airflow/demo/input/csv/{}  s3://{}/demo/output/parquet/{}/".format(bucket,'movies.csv',bucket,'movies'), dependencies="s3://{}/airflow/dag_artifacts/transforms/transform_to_parquet.py".format(bucket),
    description='Demo Spark Submit Job',
    job_name="Genie Demo Spark Submit Job",
    tags=['transform_to_parquet_movies'],
    xcom_vars=dict(),
    retries=3,
    dag=extraction_dag)

属性 command_arguments 将定义 spark-submit 命令的参数,dependencies 定义 Apache Spark 应用程序 (PySpark) 的代码位置。

您可以在以下位置找到 GenieOperator 代码:s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py

其中一个 DAG 参数为 Genie 连接 ID (genie_conn_id)。此连接已在自动安装 Apache Airflow 实例过程中创建。要查看此连接和其他现有连接,请执行以下步骤:

  1. 在 Apache Airflow Web UI 中,选择管理员
  2. 选择连接

以下屏幕截图显示了连接详细信息。

DAG 中引用的 Airflow variable s3_location_genie_demo已在安装过程中设置。要查看所有配置的 Apache Airflow 变量,请执行以下步骤:

  1. 在 Apache Airflow Web UI 中,选择管理员
  2. 选择变量

以下屏幕截图显示了变量页面。

触发工作流

您现在可以触发执行 movie_lens_transfomer_to_parquet DAG。请执行以下步骤:

  1. 在 Apache Airflow Web UI 中,选择 DAG
  2. 将 DAG 旁边的关闭更改为开启

以下屏幕截图显示了 DAG 页面。

对于此示例 DAG,本博文使用的是 movielens 数据集的小子集。此数据集是一个常见的开源数据集,您可以将它用于开发数据科学算法。每个数据集文件均为具有单个标题行、以逗号分隔的值 (CSV) 文件。所有文件均已位于下面的解决方案 S3 存储桶中: s3://Your_Bucket_Name/airflow/demo/input/csv。

movie_lens_transfomer_to_parquet 是一个简单的工作流,用于触发将输入文件从 CSV 转换为 Parquet 的 Spark 作业。

以下屏幕截图显示的是 DAG 的图示。

在此示例 DAG 中,在 transform_to_parquet_movies 结束后,您可以并行执行四个任务。DAG 并发已设为 3,因此,如下面的代码示例所示,仅可同时运行三个任务。

# 初始化 DAG
# 并发量 --> 允许并发运行的任务数
extraction_dag = DAG(dag_name,
          default_args=dag_default_args,
          start_date=start_date,
          schedule_interval=schedule_interval,
          concurrency=3,
          max_active_runs=1
          )

访问 Genie 作业 UI

Apache Airflow 的 GenieOperator 已将作业提交至 Genie。要查看作业详细信息,请在 Genie Web UI 中选择作业选项卡。您可以详细信息,如提交的作业、作业参数、作业运行的集群以及作业状态。

以下屏幕截图显示了作业页面。

您现在可以通过预置新的 Amazon EMR 集群,使用新值(例如 “production”) for Genie 标签、“emr.cluster.role”)注册它,将集群链接到命令资源并在 GenieOperator 中更新 DAG 中部分任务使用的标签组合来体验此架构。

清理

为避免产生更多费用,请删除在博文中创建的资源和 S3 存储桶。

小结

本博文介绍了如何部署 AWS CloudFormation 模板,以便为 Genie、Apache Airflow 和 Amazon EMR 设置演示环境。此外,它还演示了如何配置 Genie 和使用 Apache Airflow 的 GenieOperator。

 


关于作者

Francisco Oliveira 是 AWS 的高级大数据解决方案架构师。他专注于使用开源技术和 AWS 构建大数据解决方案。在业余时间,他喜欢尝试新的运动、旅行和探索国家公园。

 

 

 

Jelez Raditchkov 在 AWS 领导 NoSQL AWS 专业服务实践。他通过在 NoSQL、Graph 和 Search 领域提供有针对性的指导,帮助客户实现所需的商业结果。他以前曾任 AWS 专业服务部的首席数据湖架构师。

要了解有关 AWS NoSQL 特定用途数据库服务的更多信息,请访问 https://aws.amazon.com/nosql/

 

 

 

 

Prasad Alle 是 AWS 专业服务部的高级大数据顾问。他在领导和构建适合于 AWS 企业和战略客户的可扩展且可靠的大数据、机器学习、人工智能和物联网解决方案上投入了大量时间。他还对各种不同技术非常感兴趣,例如高级边缘计算、在边缘站点的机器学习等。在他的空余时间,他喜欢阅读和与家人在一起