使用 Amazon EMR 和 Apache Spark 创建 ETL 管道
![Olawale Olaleye Olawale Olaleye](https://d1.awsstatic.com/xuefezha-jennie/%e7%ba%a7%e5%88%ab_level%201.01c35fea97656a6beac50b0c8ae81e8afc1eedef.png)
![](https://d1.awsstatic.com/xuefezha-jennie/Group%20281.67a8494bd80a4bd979e37efcb490ada486dd72ae.png)
![](https://d1.awsstatic.com/xuefezha-jennie/cost-icon.84b1b7bbeb5d58956ebf11a05ed8152992f762ba.png)
![](https://d1.awsstatic.com/xuefezha-jennie/Group%20287.a27381901d308706720071b52d42054d154eab4c.png)
![](https://d1.awsstatic.com/guoheng/product.2d7b328b4c088795e2ac7b9c03e03d54eb5ea73f.png)
对于为任何组织构建强大的数据工程管道而言,提取、转换、加载(ETL,有时称为摄取、转换、导出)都至关重要。从本质上讲,如果使用适当的工具和服务设计和构建 ETL 管道,它将为任何组织在批处理和实时处理方面带来较高的价值。但是,设计和构建这样一个管道非常耗时,而且相关大数据领域中的工具和框架数量如此之多,还需要不同的技能组合。所幸的是,如果您使用的是 EMR 和 Spark,这项任务就会非常容易。
在许多组织中,批量 ETL 都是一种常见的使用场景。本教程将提供一个起点,可以帮助您使用 Amazon EMR (Amazon Elastic MapReduce) 和 Apache Spark 在亚马逊云科技中构建更复杂的数据管道。下面介绍了具体操作步骤。
我们将使用 PySpark 与 Spark 集群进行交互。借助 PySpark,可以使用 Python API 编写 Spark 应用程序。
学习目标
在本指南中,您将:
- 创建并设置 Amazon EMR 集群
- 在 EMR 上提交 PySpark 作业
- 集成 Amazon EMR 与 Amazon S3
让我们开始吧!
使用场景和问题陈述
在本教程中,假设您有一个供应商,该供应商会在每个月末提供增量销售数据。该文件以 CSV 文件的形式存储到 S3,然后需要对其进行处理,使其可供数据分析师进行查询和分析。
架构
为了实现此数据管道,我们将使用 EMR 集群,并使用 Spark 作为分布式处理引擎。我们还将使用 S3 存储:
- RAW 数据(即输入和未处理的数据)
- CLEANSED 数据(即输出和经过处理的数据)
我们需要构建这样一个数据管道:从 S3 存储桶获取这个新的销售文件,使用 Amazon EMR 进行所需的转换处理,并将清理和转换后的数据保存到目标 S3 存储桶,以便稍后使用 Amazon Athena 进行查询。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/Architecture.76842c343f5db885214813b8dc37eee55a08be5a.png)
执行步骤
若要实现数据处理管道,我们需要创建一个 EMR 集群来运行 ETL 作业,还需要创建一个 S3 存储桶来存储原始数据和经过处理的数据。下面,我们开始执行集群任务吧。
步骤 1:创建 EMR 集群
创建 EMR 集群之前,我们需要创建一个 Key Pair,稍后需要使用它访问 EMR 集群的主节点。那么,我们马上创建一个吧。
1. 登录您的亚马逊云科技账户,前往 EC2 控制台,然后点击左侧菜单栏中的 Key Pairs(密钥对)。接着点击 Create Key Pair(创建密钥对)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/key_pair.5f4ed7d16dd4a11541c187624deb15b4147803bc.png)
2. 为密钥对命名 (mykey-emr),然后点击 Create Key Pair(创建密钥对)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/key_pair_2.1431103e2da23e81e250b13be0652d1cd8b1e88e.png)
3. 现在我们继续创建 Amazon EMR cluster。为此,在控制台上前往 Amazon EMR,然后点击 Create cluster(创建集群)以创建一个 EMR 集群。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/emr_1-new.1804b770f8aac2d2dbc938c23ffeee54767daf18.png)
4. 为 EMR 集群命名,这里将 Cluster name(集群名称)设置为 MyDemoEMRCluster 并进行如下选择:
- 在 Software configuration(软件配置)部分,选择 EMR 的最新版本
- 在 Application bundle(应用程序捆绑包)部分,选择 Spark
- 在 Security and access(安全与访问)部分,选择正确的 EC2 密钥对(在上一步中创建的密钥对)
保留所有其他选项的默认设置,然后点击 Create cluster(创建集群)。此操作将创建包含三个实例的集群。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/emr_2-new.9aa1239ff74407175e4611c7b5bdb9b4b9907345.png)
5. 集群创建需要一些时间,几分钟后,您将看到集群已启动并运行,状态为 Waiting(这意味着集群现已准备就绪,正等待执行任何 ETL 作业)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/emr_3-new.9ea20abf15ee92e130fa5921248b1991d8a248bf.png)
步骤 2:创建 Amazon S3 存储桶
现在,我们创建一个 Amazon S3 存储桶,并在其中创建两个子文件夹,用于存储 RAW 和 CLEANSED 数据。
1. 前往 Amazon S3 控制台,然后点击 Create bucket(创建存储桶)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/s3_1-new.beb14ffdcbb7a662503c9c7d11b5eb7e8847e26e.png)
2. 创建一个存储桶(如 etl-batch-emr-demo)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/s3_2-new.09a21ff5556cd488461a4b20e8f623712d9d77d1.png)
3. 创建存储桶后,创建两个子文件夹,分别命名为:
- cleaned_data
- raw_data
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/s3_3.a9cbe433ef94ad2c2888d9775dc3efd9fa96c87b.png)
4. 将销售数据集 CSV 文件上传到存储桶中的文件夹 raw_data 下。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/upload_csv-new.192d058da7e0eb4362bcf3773a530ca555ac0167.png)
步骤 3:提交 PySpark 作业
我们现已将数据集上传到 S3,接下来可以从 EMR 集群提交 PySpark 作业了。
登录亚马逊云科技管理控制台,然后进入 Amazon EMR 控制台。
在左侧导航窗格的 EMR on EC2(EC2 上的 EMR)下,选择 Clusters(集群),然后选择要在其中检索公共 DNS 名称的 myDemoEMRCluster 集群。
在集群详细信息页面的 Summary(摘要)部分,记下 Primary node public DNS(主节点公共 DNS)显示的值。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/emr_4-new.ae220ed74534799a7f6da3c3f885235c683b1703.png)
4. 从终端通过 SSH 访问 EMR 集群的主节点
ssh -i "mykey-emr.pem" root@ec2-18-219-203-79.us-east-2.compute.amazonaws.com
5. 复制 PySpark 代码 etl-job.py 并将其保存到主目录下的 Primary Node,然后进行以下更改并保存文件:
- S3_INPUT_DATA = 's3://<YOUR_BUCKET_LOCATION_OF_RAW_DATA>'
- S3_OUTPUT_DATA = 's3://<YOUR_BUCKET_LOCATION_OF_CLEANED_DATA>'
6. 提交 PySpark job,并等待作业完成。
sudo spark-submit etl-job.py
7. 作业完成后,查看 S3 存储桶中的文件夹 cleaned_data,您将看到经过转换和处理的新数据,新数据以 Parquet 格式进行存储。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/s3_cleaned_data.eb8e81b19d9f0d5f5f1ad0d576bf6da7bf184c68.png)
步骤 4:使用 Amazon Athena 验证输出
cleansed 数据现以 Parquet 格式存储在 Amazon S3 中,但为了让数据分析师或数据科学家更容易使用这类数据,最好能支持数据库表的形式,以便通过 SQL 查询数据。
若要实现该集成,可以执行以下两个步骤:
- 我们需要运行 Glue 爬网程序,基于 S3 数据创建一个 Amazon Glue Data Catalog 表。
- 完成后,我们可以在 Amazon Athena 中运行查询,验证输出。
步骤 5:创建 Amazon Glue Data Catalog
1. 前往 Amazon Glue 控制台,在左侧导航窗格中选择 Crawlers(爬网程序),然后点击 Create crawler(创建爬网程序)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_ui.e0ef5ab168f12ec41a8a1d404a9a63069fbc569c.png)
2. 为 Glue 爬网程序设置名称 (my-crawler-1)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_crawler_1.89e07dae175fac5746fbada5e3746573eb52085a.png)
3. 对于数据源,添加包含清洗和处理后数据的 S3 存储桶 (s3://etl-batch-emr-demo/cleaned_data)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_crawler_2.ac62c312af7604f47f667e057a6d2c4ccdbc343a.png)
4. 创建 IAM 角色 (AWSGlueServiceRole-default) 并添加该角色。您可以创建一个角色并附加以下策略(有关更多详细信息,可以参阅此文档):
AWSGlueServiceRole 亚马逊云科技托管策略,该策略授予 Data Catalog 所需的权限
内联策略,该策略授予数据源(位于 S3_INPUT_DATA)权限
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_crawler_3.7ba5c775326dbe31b76da0a93ea8152c29efc022.png)
5. 点击 Add database(添加数据库)创建一个数据库,然后从下拉菜单中选择该数据库 (my_demo_db)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_crawler_4.c899215538353ad33f436b5b8adc0c180a924b30.png)
6. 查看并验证所有详细信息,然后点击 Create crawler(创建爬网程序)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_crawler_5.f29ba0b1be65280a967624986b6aa621999cf512.png)
7. 创建爬网程序后,选择该爬网程序,然后点击 Run(运行)。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_db_delete.cb466af0ca27bd9434a1790054da387ef7352373.png)
8. 爬网程序完成运行后,您将看到 detected tables。
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_run_complete.86591bdab4f5607996a28c6b757e0d28fd2cbe3a.png)
我们现已创建了 Glue Data Catalog 表,接下来可以前往 Amazon Athena,使用 SQL 查询数据。
到目前为止,我们已经从 Amazon S3 中提取了数据,然后通过 Glue ETL (pySpark) 作业将数据转换为 Parquet 格式,从而转换了数据。最后,我们将使用 Amazon Athena 对清理后的数据进行分析。
步骤 6:使用 Amazon Athena 标准 SQL 查询输出数据
1. 打开 Athena 查询编辑器。可以将 Data source(数据源)保留为默认的 AwsDataCatalog,并选择 my_demo_db 作为 Database(数据库),如以下截图所示,然后运行以下查询。
SELECT * FROM "my_demo_db"."cleaned_data" limit 10;
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/athena_q1.60adfa517718b68bfd8bf827e6bf760f6ce04718.png)
2. 现在,可以执行其他 SQL 查询来分析数据。例如,如果我们想知道每个 region per segment wise 的 forecast_monthly_revenue,可以运行以下查询:
SELECT
region,
segment,
SUM(forecasted_monthly_revenue) as forecast_monthly_revenue
FROM "my_demo_db"."cleaned_data"
GROUP BY segment, region;
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/athena_q2.ac5253c6cdc95c5d1d64f34210ffc56903855413.png)
清理资源
现在您已经完成了本实操教程,为避免产生额外的费用,接下来可以删除以下所有资源:
- 删除 EMR 集群
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/emr_terminate.eb6a0402742d33b12d0eeb397ef4c069e268e74d.png)
- 删除 Amazon S3 存储桶
aws s3 rb s3://<YOUR_BUCKET_LOCATION> --force
- 删除 Glue 数据库
![](https://d1.awsstatic.com/guoheng/create-an-etl-pipeline-apache-spark/glue_db_delete.cb466af0ca27bd9434a1790054da387ef7352373.png)
总结
恭喜您!您已完成使用 Amazon EMR 和 Apache Spark 创建 ETL 管道的教程。
在本教程中,我们学习了如何构建 ETL 管道,此类管道可以应用于各种批处理使用场景,如电子商务销售数据分析。我们还学习了如何从 S3 中提取数据,然后使用简单的 Glue ETL (pySpark) 作业按需转换数据。最后,通过 Amazon Athena 使用 SQL 对数据进行了分析。
![Olawale Olaleye Olawale Olaleye](https://d1.awsstatic.com/xuefezha-jennie/mulu.34646cf00357268db464d54cd1f040ef56c42cbf.png)
![Olawale Olaleye Olawale Olaleye](https://d1.awsstatic.com/xuefezha-jennie/Vector.29f84c2dd1cdfa19249067b529933ca22b72eae5.png)