亚马逊AWS官方博客

在 AWS Glue 的 Python Shell 作业中部署 AWS Data Wrangler 进行 ETL 数据处理

AWS Glue是一项完全托管的数据ETL(提取、转换和加载) 服务,可以自动生成用于执行数据ETL过程的代码,自执行与数据发现、数据清理和数据移动等许多无差别的、繁重的数据处理任务,因此您可以将更多时间用在数据分析上。AWS Glue 属于无服务器服务,因此无需配置和管理计算资源。您可以在AWS Glue 完全托管的 Apache Spark 或 Python Shell 环境中运行各项ETL 作业。

AWS Glue 可以自动发现存储在 Amazon S3 数据湖、Amazon Redshift 数据仓库以及在 AWS 上运行的各种数据库中的结构化和半结构化数据。它通过 Glue 数据目录提供统一的数据视图,该数据目录可用于 ETL 以及使用 Amazon AthenaAmazon EMRAmazon Redshift Spectrum 等服务进行查询和报告。

AWS Glue的基本架构如下图所示:

 

此外,AWS Glue 已经分别在2019年12月和2020年4月,在由西云数据运营的 AWS 中国(宁夏)区域和由光环新网运营的 AWS 中国(北京)区域正式推出。

AWS Glue Python Shell作业

在 AWS Glue 中,除了通过Apache Spark作业完成大型分布式数据处理任务之外,您也可以使用 Python 脚本来运行中小型任务,这些任务通常是 ETL工作流的一部分。您可以使用 Python Shell 作业向 Amazon Redshift、Amazon Athena 或 Amazon EMR 等服务提交 SQL 查询,或者运行机器学习和科学分析等。

AWS Glue 中的 Python Shell 作业提供了兼容Python 2.7和Python 3.6的支持,并且还预装了许多与AWS和数据分析相关的常用库,例如Boto3、NumPy、SciPy 和 Pandas 等。同时,Python Shell 作业的资源也是通过Glue的 DPU(数据处理单元,1个 DPU 提供的处理能力由 4 个 vCPU 和 16GB 内存组成)来衡量的,针对一个作业您可以设置使用1个或者 0.0625 个 DPU(即 1/16 个 DPU)运行,默认值为 0.0625。

 

AWS Data Wrangler

AWS Data Wrangler是什么呢?在数据分析领域,Pandas肯定是经常使用到的Python分析库,您可以认为AWS Data Wrangler就是Pandas on AWS。

它是由AWS专业服务团队针对Pandas库在AWS云上进行的增强和扩展,并对其进行了开源。使用AWS Data Wrangler,您可以非常方便地通过DataFrames和AWS数据相关服务(Amazon Redshift,AWS Glue,Amazon Athena,Amazon EMR,Amazon QuickSight等)进行连接。

AWS Data Wrangler构建在其他开源项目(如Pandas,Apache ArrowBoto3SQLAlchemyPsycopg2PyMySQL)之上,提供了抽象函数来执行通常的ETL任务,例如从数据湖、数据仓库和数据库加载/卸载数据。关于AWS Data Wrangler可以支持的详细信息,请查看文档中的功能列表

 

通过以上内容的介绍,可以看出AWS Data Wrangler和AWS Glue会有很好的集成,以完成在AWS平台上更便捷的数据分析。

 

Glue Python Shell作业中使用AWS Data Wrangler

接下来,我们通过在AWS Glue 中的Python Shell作业内借助AWS Data Wrangler库将CSV文件转换为分区的Parquet文件,来演示AWS Data Wrangler在Python Shell作业内的使用方式,用户如果有自定的数据处理逻辑可以在此基础之上进行修改和调整。之后,我们将对应的元数据存储到AWS Glue数据目录中,并使用Athena查询数据以创建数据分析。

在下面的示例中,我们将使用NOAA(美国海洋和大气管理局)提供的全球历史气候网络数据(NOAA Global Historical Climatology Network Daily,GHCN-D),全球历史气候网络数据是从美国海洋和大气管理局的全球陆地地区每日观测数据集,它包含了来自世界各地陆基站点的测量数据。它是由众多来源的气候记录合成的,有些数据的历史已经超过了175年。数据格式为CSV格式。更多信息可以参考Registry of Open Data on AWS中的介绍

以下示例的操作环境为AWS 中国(北京)区域。

 

创建S3存储桶

首先,在AWS 中国(北京)区域创建一个用户存储数据的S3存储桶,例如awsdatawrangler-bucket。

 

下载数据并上传到S3存储桶中

 首先,将数据下载到本地,可以参考以下命令。

mkdir noaa-ghcn-pds

cd noaa-ghcn-pds

for i in `seq 0 9`;
do
wget https://noaa-ghcn-pds.s3.amazonaws.com/csv/188$i.csv
done

然后,将这些数据都上传到S3存储桶中,例如s3://awsdatawrangler-bucket/csv/中,可以在控制台上操作或者参考以下命令。

aws s3 sync . s3://awsdatawrangler-bucket/csv/ --profile bjs

 

创建Glue数据库

接着,在AWS Glue中创建一个新的数据库,例如awsdatawrangler-db。

 

 

创建Glue作业IAM角色

示例中Glue作业在运行时需要访问S3中的数据,所以这里需要作业对应的角色具备Glue服务的权限和S3存储桶的访问权限。

切换到IAM服务页面,然后选择 Create role(创建角色)。对于角色类型,选择 AWS Service,找到并选择 Glue,然后选择 Next: Permissions(下一步:权限)。这里为了简化,选择了AWSGlueServiceRole、AmazonS3FullAccess以及AmazonAthenaFullAccess,在生产环境中请根据实际情况对数据访问设置精细化的权限管理,更多信息可以参考为 AWS Glue 设置 IAM 权限

构建AWS Data Wranglerwhl安装包

尽管AWS Glue提供了很多内置的库,对于自定义的库需要额外进行加载,可以将一个或多个 Python 库打包为一个 .egg 或 .whl 文件,然后在Python Shell作业中进行指定。

针对AWS Data Wrangler,您可以直接在这里下载对应的whl文件,但是如果直接使用下载后的文件在Glue中使用,作业运行时会有awscli包不兼容的报错,如下所示。

ERROR: awscli 1.16.242 has requirement botocore==1.12.232, but you'll have botocore 1.19.4 which is incompatible.
ERROR: awscli 1.16.242 has requirement s3transfer<0.3.0,>=0.2.0, but you'll have s3transfer 0.3.3 which is incompatible.

您可以在源码中修改requirements.txt文件,然后再构建whl文件。首先,通过git克隆AWS Data Wrangler项目(示例中使用的是1.9.6版本)。

git clone https://github.com/awslabs/aws-data-wrangler.git

cd aws-data-wrangler

然后编辑requirements.txt文件,并添加以下内容。

awscli&gt;=1.18.0,&lt;2.0.0

然后执行以下命令构建whl文件,确保是在Python 3的环境中执行。

python3 setup.py bdist_wheel

将whl文件上传到S3桶当中,可以在控制台上操作或者参考以下命令。

aws s3 cp dist/awswrangler-1.9.6-py3-none-any.whl s3://awsdatawrangler-bucket/whl/ --profile bjs

创建Glue Python Shell作业

接下来就可以创建Python Shell作业了,在AWS Glue服务页面上创建作业。

在作业基础属性配置中,填写对应的信息。名称例如awsdatawrangler,然后IAM角色选择刚刚创建的角色GlueJobRole,类型选择Python Shell,Python版本这里为Python 3(Glue Version 1.0)。

作业选项选择“您将编写新脚本”,然后配置脚本文件名(例如awsdatawrangler)和存储脚本所在的 S3 路径(例如s3://awsdatawrangler-bucket/script/)。

在“安全配置、脚本库和作业参数(可选)”中,将刚刚上传的AWS Data Wrangler的whl文件填写到Python 库路径中,并修改“最大容量”为1。其他参数保持不变。

点击下一步,然后跳过“连接”部分直接点击“保存作业并编辑脚本”。在作业编辑窗口,复制以下代码,请根据在创建资源过程中的实际命名情况对变量进行修改。点击“保存”。

import awswrangler as wr

# Define the column keys
col_names = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

# Read csv files from S3
df = wr.s3.read_csv(
    path="s3://awsdatawrangler-bucket/csv/188",
    names=col_names,
    parse_dates=["dt", "obs_time"]
    )

# Add a new column 'year' to df for partition
df["year"] = df["dt"].dt.year

# Transform csv files to parquet and write to S3
wr.s3.to_parquet(
    df=df,
    path="s3://awsdatawrangler-bucket/parquet/",
    dataset=True,
    database="awsdatawrangler-db",
    table="awsdatawrangler_parquet",
    partition_cols=["year"]
)

# Edit the SQL query for Athena
sql = """
SELECT
    dt,
    (value / 10.0) AS temperature  -- Converting tenths of degrees C to regular degrees C
FROM "awsdatawrangler_parquet"
WHERE year BETWEEN 1887 AND 1889  -- Only last 3 years (PARTITION filter)
AND substr(id, 1, 2)='US'  -- Only U.S. stations
AND element='TMAX'  -- Only Maximum temperature elements
AND q_flag is NULL  -- Only HIGH quality measurement
"""

# Query data from S3 parquet
df_res = wr.athena.read_sql_query(sql, database="awsdatawrangler-db")

# Print result samples
print(df_res[0:9])

运行Glue Python Shell作业

点击“运行作业”,保留默认参数,然后确认运行。

注:如果在作业运行过程中报如下错误,说明您在账户中已经开启了通过Lake Formation对数据湖进行精细化管理的功能,因此需要在Lake Formation中对Glue作业的IAM角色进行适当的授权,详情请参考Lake Formation示例中的数据授权操作

AccessDeniedException: An error occurred (AccessDeniedException) when calling the GetTable operation: Insufficient Lake Formation permission(s) on awsdatawrangler-parquet

查看Glue Python Shell作业执行结果

作业执行完成后,可以在作业的历史记录中进行查看,显示的信息包括重试尝试、运行状态、日志、错误日志、开始时间、结束时间、执行时间等。

 

 

此外,因为在代码中使用了print函数打印了结果的部分内容,这些内容也可以到CloudWatch日志中进行查看,如下所示。

 

小结

本文首先介绍了AWS Glue以及该服务的功能和使用场景,然后介绍了AWS Glue 中的Python Shell作业,可以基于Python完成一些基础的ETL操作。接下来,我们又介绍了Pandas on AWS – AWS Data Wrangler这款在AWS上进行数据分析的利器,并通过一个示例场景(CSV转换Parquet)来介绍了如何在Python Shell作业引入AWS Data Wrangler来简化在AWS平台上的无服务器化的ETL任务。

除此之外,AWS Data Wrangler还提供了丰富的参考Jupyter Notebook示例,展示了对Amazon Redshift、Amazon Athena、Amazon EMR和Amazon QuickSight等服务的支持。如果AWS Data Wrangler对数据分析业务有帮助或者您对此感兴趣,可以关注Github上该项目的发展。

 

参考资料:

https://aws.amazon.com/about-aws/whats-new/2019/01/introducing-python-shell-jobs-in-aws-glue/

https://aws.amazon.com/blogs/big-data/optimize-python-etl-by-extending-pandas-with-aws-data-wrangler/

https://aws.amazon.com/cn/blogs/china/an-open-source-tool-for-building-a-data-lake-aws-data-wrangler/

https://aws-data-wrangler.readthedocs.io/en/stable/index.html

本篇作者

史天

史天,AWS解决方案架构师。拥有丰富的云计算、大数据和机器学习经验,目前致力于数据科学、机器学习、无服务器等领域的研究和实践。译有《机器学习即服务》《基于Kubernetes的DevOps实践》《Prometheus监控实战》等。