如何在 AWS Glue 关系转型后使用转换后的数据?

上次更新时间:2020 年 3 月 17 日

我想要使用 AWS Glue 关系转型来平展我的数据。我可以将哪些字段用作分区来将转换后的数据存储在 Amazon Simple Storage Service (Amazon S3) 中?

简短描述

通过关系转型,可以在关系数据库中使用 NoSQL 数据结构,如数组和结构。关系转型将返回一系列 DynamicFrames(Python 中的 DynamicFrameCollection 和 Scala 中的数组)。关系转型返回的所有 DynamicFrames 均可以通过其在 Python 中的各个名称和 Scala 中的数组索引来访问。

解决方法

对数据进行关系化

本教程使用以下架构:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

对 Python 使用以下关系化语法:

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

对 Scala 使用以下关系化语法:

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

解读转换后的数据

此关系转型产生两个架构:root root_images

root

|-- family_name: string
|-- name: string
|-- links: long
|-- gender: string
|-- image: string
|-- images: long

root_images

|-- id: long
|-- index: int
|-- images.val.url: string
  • id:数组元素的顺序(1、2 或 3)
  • index:数组中每个元素的索引位置
  • images.val.urlroot_images 中的 images.val.url

它们是唯一可用作分区字段以将此转换后的数据存储在 Amazon S3 中的字段。由于这些字段不存在于 root_images 中,指定 root 表字段(如 name)不起作用。

加入关系化数据以获取标准化数据

root_images 中的 id 属性指的是数组(1、2 或 3)在数据集中的顺序。root 中的 images 属性将保留数组索引的值。这表示,您必须使用 imagesid 来加入 rootroot_images。您可以运行 dynamicFrame.show() 来验证数组的顺序和数组索引的值。

要加入 rootroot_images

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

存储转换后的数据

要将转换后的数据存储在具有分区的 Amazon S3:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

注意:在下面的示例中,personRelationalize(2)root_images 转换后的数据表。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

要将转换后的数据存储在不具有分区的 Amazon S3:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

注意:在下面的示例中,personRelationalize(2)root_images 转换后的数据表。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

当您将数据写入 Amazon S3 后,在 Amazon Athena 中查询数据或使用 DynamicFrame 将数据写入关系数据库中,如 Amazon Redshift。


这篇文章对您有帮助吗?

我们可以改进什么?


需要更多帮助?