亚马逊AWS官方博客

构建数据湖的一款开源利器 – AWS Data Wrangler

背景

最近几年,“数据湖”的概念突然甚嚣尘上,俨然成为继“大数据”后数据技术的最热的题材。抛开附在概念之上浮华的包装,所谓的数据湖无非是为了解决传统的数据技术存在的“信息孤岛”(information siloing)这一类问题。相较于数据仓库、数据集市等传统的方案,取而代之的“数据湖”提出了以任意规模存储所有结构化和非结构化数据的解决思路。这也意味着,我们在存储数据时无需先对数据进行结构化处理,而是以原始的方式保存全量的数据,并在此之上运行不同类型的分析,例如数据可视化、大数据批量处理、实时分析以及机器学习等操作,以帮助我们做出更好的业务决策。

挑战

正如上述所说,“数据湖”存储来自业务应用的关系数据,以及来自诸如移动应用、IoT 设备和社交媒体等的非关系数据。当捕获数据时,我们不会定义数据结构或 Schema。这意味着直到我们进行分析处理时,才需要按照特定的Schema来读取/写入数据,通常我们称其为读取型Schema。这就引出了一个新的概念 -“数据整理”(Data Wrangling)。

数据整理,是指将数据从一种“原始”数据形式转换和映射到另一种特定格式的过程,目的是使其更适合和更有价值的各种后续处理,例如:数据分析。这个步骤包括对于数据的调整,可视化,聚合,训练统计模型以及许多其他潜在用途的处理。通常遵循一些常规的步骤,这些步骤首先从数据源(例如:S3的存储桶)中提取原始格式的数据,使用算法(例如:排序等)“压缩”原始数据或将数据解析为预定义的数据结构,最后 将结果内容存放到数据接收器中,以供存储和将来使用。

看到这里,有人也许有这样的疑问:“数据整理” 与以往提到的“ETL” 有何不同?

数据整理 vs ETL

首先,数据整理和ETL都是为了解决了同一个问题:如何将来自不同来源的各种格式的数据转换为用于分析和报告的公共结构。顾名思义,ETL通过从各种来源提取数据,将其转换为预定义的格式,然后将其加载到数据湖或数据仓库中,以便业务智能或报表生成器应用程序访问。与之相对的数据整理则不是为IT专业人员设计的,而是为最终用户设计的,例如:管理人员、业务分析师等,他们正在寻找特定问题的答案。其核心理念是,尽可能接近数据。

为了理解这两个概念的差异,我们可以通过这张表来做以区分 –

数据整理 ETL
用户 业务人员:管理人员、分析师等 IT专业人员
数据结构 各种复杂数据 结构化
用例 探索性 数据报表

 

对于数据湖而言,其主要挑战是存储原始数据而不监督内容。对于使数据可用的数据湖,它需要有定义的机制来编目和保护数据。没有这些元素,就无法找到或信任数据,从而导致出现“数据沼泽”(Data Swamp)。 满足更广泛受众的需求就需要数据湖具有管理、语义一致性和访问控制。事实上,这也就是“数据整理”对于数据湖的价值所在。缺乏有效的“数据整理”,数据湖则空有规模庞大的数据而缺乏真正的业务价值。同时,数据湖在处理数据时的“读取型Schema”的特点又决定了其不同与解决ETL的思路,无法使用简单的工具完成数据的整理。这就引出了我们今天的重点,一款开源的“数据整理”工具 – AWS Data Wrangler。

AWS Data Wrangler 是什么?

AWS Data Wrangler 是一款开源的 Python 程序包,其特色就在于将 Pandas<https://github.com/pandas-dev/pandas> 库的功能扩展到连接 DataFrame 和 AWS 数据相关的一系列服,例如:Amazon Redshift、AWS Glue、Amazon Athena、Amazon EMR、Amazon QuickSight 等。它建立在其他开源项目(例如 Pandas、Apache Arrow、Boto3、s3fs、SQLAlchemy、Psycopg2 和 PyMySQL)之上,并提供抽象功能来执行常规的 ETL 任务,例如从数据湖、数据仓库和数据库中加载/卸载数据。这里提到的开源项目/框架的简单介绍如下 –

  • Pandas – 是一个快速、强大、灵活且易于使用的开源数据分析和操作工具,构建在Python编程语言之上
  • Apache Arrow – 是一个用于内存数据的跨语言开发平台。它为平面和层次数据指定了一种标准的独立于语言的柱状内存格式
  • Boto3 –是适用于Python的AWS SDK,提供了易于使用的面向对象的API,以及对AWS服务的低级访问
  • s3fs – 允许Linux和macOS通过FUSE挂载S3存储桶
  • SQLAlchemy – 是Python SQL工具箱和对象关系映射器,它为应用程序开发人员提供了SQL的全部功能和灵活性
  • Psycopg2– 是用于Python编程语言的PostgreSQL数据库适配器
  • PyMySQL – 是一个基于PEP 249的纯Python 的MySQL客户端库

与其它同类型的工具相比,AWS Data Wrangler 更重要的一个特点是,它提供了连接数据库服务和AWS数据相关服务的能力。其支持的AWS数据服务有这样一些 –

  • Amazon Redshift – AWS上托管的云数据仓库服务
  • AWS Glue – AWS上完全托管的提取、转换和加载 (ETL) 服务
  • Amazon Athena – 是一种交互式查询服务,使用标准 SQL 分析 Amazon S3 中的数据
  • Amazon EMR -AWS托管的云大数据平台,可使用多种开放源代码工具处理大量数据,例如 Apache Spark、Apache Hive、Apache HBase、Apache Flink、Apache Hudi 和 Presto
  • Amazon QuickSight –是一项采用云技术的商业智能服务,创建和发布包含 ML Insights 的交互式仪表板

这个介绍应该让我们对这个项目有了一些了解。如果用一个更为简洁的方式来介绍这个项目,我更喜欢这样的表示 “Pandas on AWS”。关于这个项目更多的信息,可以访问Github 上的内容,项目地址为 https://github.com/awslabs/aws-data-wrangler。

AWS Data Wrangler 的样例代码

我们用这一小段Python代码来演示一下Data Wrangler的特点 –

1.	import awswrangler as wr  
2.	import pandas as pd  
3.	  
4.	# 定义DataFrame  
5.	df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})  
6.	  
7.	# 在S3的数据湖上存储数据  
8.	wr.s3.to_parquet(  
9.	    df=df,  
10.	    path="s3://bucket/dataset/",  
11.	    dataset=True,  
12.	    database="my_db",  
13.	    table="my_table"  
14.	)  
15.	  
16.	# 直接从 Amazon S3 中检索数据  
17.	df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)  
18.	  
19.	# #从 Amazon Athena 中检索数据  
20.	df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")  
21.	  
22.	# 从Glue进行Redshift连接(SQLAlchemy),并从Redshift Spectrum检索数据  
23.	engine = wr.catalog.get_engine("my-redshift-connection")  
24.	df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)  
25.	  
26.	# 创建QuickSight数据源和数据集来映射新表  
27.	wr.quicksight.create_athena_data_source("athena-source", allowed_to_manage=["username"])  
28.	wr.quicksight.create_athena_dataset(  
29.	    name="my-dataset",  
30.	    database="my_db",  
31.	    table="my_table",  
32.	    data_source_name="athena-source",  
33.	    allowed_to_manage=["username"]  
34.	)  
35.	  
36.	# 从Glue目录获取MySQL连接(SQLAlchemy)并将数据加载到MySQL中  
37.	engine = wr.catalog.get_engine("my-mysql-connection")  
38.	wr.db.to_sql(df, engine, schema="test", name="my_table")  
39.	  
40.	# 从Glue目录获取PostgreSQL连接(SQLAlchemy)并将数据加载到PostgreSQL  
41.	engine = wr.catalog.get_engine("my-postgresql-connection")  
42.	wr.db.to_sql(df, engine, schema="test", name="my_table")  

AWS Data Wrangler 的安装与使用场景

使用AWS Data Wrangler 的方式有很多种选择。我们既可以在独立的Python程序、Jupyter Notebook 中如同我们习以为常的Pandas 那样使用它。更重要的是我们可以在AWS 的服务中直接使用它。

独立Python 应用

在独立的Python应用场景下,AWS Data Wrangler的安装与其它Python库的安装与使用并无区别,我们可以通过pip工具直接安装。至于conda 的环境也可以用一条命令完成安装。

 

  • Pypi (pip)的安装 pip install awswrangler
  • Conda 下的安装 conda install -c conda-forge awswrangler

AWS Lambda

在AWS Lambda 下使用 AWS Data Wrangler 有一个简单的方法就是使用AWS Lambda Layer。我们可以通过这几个步骤完成安装 –

  • 访问GitHub项目中Release的部分,然后下载与所需版本相关Layer的zip文件。目前已经提供了对Python 3.6、3.7、3.8 等几个版本的支持
  • 然后转到AWS Lambda控制面板,打开Lambda Layer部分,单击创建Layer
  • 设置名称和适当的Python版本,上传新下载的zip文件,然后按create创建Layer
  • 最后,转到Lambda控制民办并选择使用新的Layer

这是一个在Lambda 中读取PARQUET 文件的例子 -

1.	import os  
2.	import json  
3.	import boto3  
4.	import logging  
5.	import urllib.parse  
6.	import awswrangler as wr  
7.	import pandas as pd  
8.	from datetime import datetime  
9.	from botocore.exceptions import ClientError  
10.	  
11.	  
12.	logger = logging.getLogger(__name__)  
13.	  
14.	  
15.	def s3_url(bucket, prefix, folder=False):  
16.	    """ Produce s3 url from 'bucket' and 'prefix' 
17.	    """  
18.	    assert bucket and prefix  
19.	  
20.	    return "s3://{bucket}/{prefix}{folder}".format(  
21.	        bucket=bucket,  
22.	        prefix=prefix,  
23.	        folder="/" if folder else "")  
24.	  
25.	  
26.	def read_parquet(s3_path, columns, source, dataset=True):  
27.	    """ Read Apache Parquet file(s) from a received S3  
28.	        prefix or list of S3 objects paths. 
29.	        Convert the last_updated column from Timestamp to Epoch time. 
30.	        Return records as list of dictionary.  
31.	    """  
32.	    assert source, "source can't be None"  
33.	  
34.	    df = wr.s3.read_parquet(path=s3_path, columns=columns, dataset=dataset)  
35.	    df[["last_updated"]] = (df[["last_updated"]] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')  
36.	    df["source"] = source  
37.	    df["location"] = s3_path  
38.	      
39.	    return df.to_dict(orient='records')  
40.	  
41.	  
42.	def write_to_ddb(client, table_name, records):  
43.	    """ Write data to DynamoDB table. 
44.	        Returns inserted row count. 
45.	    """  
46.	  
47.	    table = client.Table(table_name)  
48.	    row_count = 0  
49.	    for row in records:  
50.	        table.put_item(Item=row)  
51.	        row_count += 1  
52.	  
53.	    return row_count  
54.	  
55.	  
56.	def lambda_handler(event, context):  
57.	  
58.	    record = event["Records"][0]  
59.	  
60.	    assert "eventSource" in record  
61.	    assert "eventName" in record  
62.	    assert record["eventSource"] == "aws:s3", "Unknown event source: %s." % record["eventSource"]  
63.	    assert record["eventName"].startswith(  
64.	        "ObjectCreated:"), "Unsupported event name: %s." % record["eventName"]  
65.	    assert "s3" in record  
66.	    assert "bucket" in record["s3"]  
67.	    assert "name" in record["s3"]["bucket"]  
68.	    assert "object" in record["s3"]  
69.	    assert "key" in record["s3"]["object"]  
70.	  
71.	    bucket = record["s3"]["bucket"]["name"]  
72.	  
73.	    # If S3 URL contains special character usch as '=', it will be quoted, like: %3D  
74.	    # This is to unquote them back to original character, or it will complain path not exist.  
75.	    key = urllib.parse.unquote(record["s3"]["object"]["key"])  
76.	  
77.	    #create s3 path  
78.	    s3_path = s3_url(bucket=bucket, prefix=key)  
79.	  
80.	    # Retrieving the data directly from Amazon S3  
81.	    cols = ["ticker_symbol", "sector", "last_updated"]  
82.	      
83.	    df = read_parquet(s3_path=s3_path,   
84.	                      columns=cols,  
85.	                      source="system",   
86.	                      dataset=True)  
87.	  
88.	    #Instantiate Dynamodb connection  
89.	    ddb = boto3.resource("dynamodb")  
90.	  
91.	    #Get the dynamodb table name  
92.	    ddb_table = os.environ["DDB_TABLE"]  
93.	  
94.	    #Write data to dynamodb  
95.	    record_count = write_to_ddb(  
96.	        client=ddb, table_name=ddb_table, records=df)  
97.	  
98.	    #return total record inserted  
99.	    return("Total records inserted:", record_count)  

AWS Glue Wheel

在AWS Glue中使用需要注意一点,AWS Data Wrangler仅支持Glue Python Shell,不支持Glue PySpark。安装的步骤如下 -

  • 转到GitHub的Release页面,并下载与所需版本相关的whl文件
  • 将Wheel文件上传到Amazon S3的存储桶中
  • 转到Glue Python Shell job,然后指向S3上新上传的文件

Amazon SageMaker Notebook/Jupyter Notebook

需要在Python 3 Notebook段落中运行此命令,然后确保在导入awswrangler软件包之前重新启动Jupyter的内核,使得awswrangler 生效。安装命令 -

!pip install awswrangler

Amazon SageMaker Notebook Lifecycle

打开SageMaker控制台,转到生命周期的部分,然后使用以下代码段为所有兼容的SageMaker内核配置AWS Data Wrangler –

1.	#!/bin/bash  
2.	  
3.	set -e  
4.	  
5.	# OVERVIEW  
6.	# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which  
7.	# is a system environment reserved for Jupyter.  
8.	# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using  
9.	# "nohup" to run this as a background process in that case.  
10.	  
11.	sudo -u ec2-user -i <<'EOF'  
12.	  
13.	# PARAMETERS  
14.	PACKAGE=awswrangler  
15.	  
16.	# Note that "base" is special environment name, include it there as well.  
17.	for env in base /home/ec2-user/anaconda3/envs/*; do  
18.	    source /home/ec2-user/anaconda3/bin/activate $(basename "$env")  
19.	    if [ $env = 'JupyterSystemEnv' ]; then  
20.	        continue  
21.	    fi  
22.	    nohup pip install --upgrade "$PACKAGE" &  
23.	    source /home/ec2-user/anaconda3/bin/deactivate  
24.	done  
25.	EOF 

EMR Cluster

虽然AWS Data Wrangler并不是分布式的库,但AWS Data Wrangler仍然是大数据管道的一个好帮手。

  • 在集群配置下将Python 3配置为PySpark的默认解释器
1.	[  
2.	  {  
3.	     "Classification": "spark-env",  
4.	     "Configurations": [  
5.	       {  
6.	         "Classification": "export",  
7.	         "Properties": {  
8.	            "PYSPARK_PYTHON": "/usr/bin/python3"  
9.	          }  
10.	       }  
11.	    ]  
12.	  }  
13.	]  
  • 将bootstrap脚本保存在S3上,并在EMR集群上引用它
1.	#!/usr/bin/env bash  
2.	set -ex  
3.	  
4.	sudo pip-3.6 install awswrangler

AWS Data Wrangler这个项目自2019年出现至今,版本已经迭代发展到了1.6.1,项目在Github上获得的Star 数量达到了811。这足以证明项目的活力以及受关注的程度。当然,数据湖的建设是复杂而充满挑战的,肯定不是一两个工具能够一蹴而就的。不可否认的一点,数据湖没有简单的范式,需要大量的针对数据的定制开发的工作,而这正是AWS Data Wrangler 可以大展身手的地方。

关于数据湖以及AWS Data Wrangler 的讨论远非一篇文章能够说尽。非常希望欢迎听到你们的反馈与想法,我的邮箱地址是lianghon@amazon.com

本篇作者

费良宏

费良宏,AWS Principal Developer Advocate。在过去的20多年一直从事软件架构、程序开发以及技术推广等领域的工作。他经常在各类技术会议上发表演讲进行分享,他还是多个技术社区的热心参与者。他擅长Web领域应用、移动应用以及机器学习等的开发,也从事过多个大型软件项目的设计、开发与项目管理。目前他专注与云计算以及互联网等技术领域,致力于帮助中国的 开发者构建基于云计算的新一代的互联网应用。