How do I use pivoted data after an AWS Glue relationalize transformation?

Last updated: 2019-11-15

I want to use the AWS Glue relationalize transform to flatten my data. Which fields can I use as partitions to store the pivoted data in Amazon Simple Storage Service (Amazon S3)?

Short Description

The relationalize transform makes it possible to use NoSQL data structures, such as arrays and structs, in relational databases. The relationalize transform returns a collection of DynamicFrames (a DynamicFrameCollection in Python and an array in Scala). All DynamicFrames returned by a relationalize transform can be accessed through their individual names in Python, and through array indexes in Scala.

Resolution

Relationalize the data

This tutorial uses the following schema:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Use the following relationalize syntax for Python:

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Use the following relationalize syntax for Scala:

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

Interpret the pivoted data

This relationalize transform produces two schemas: root and root_images.

root:

|-- family_name: string
|-- name: string
|-- links: long
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: order of the array element (1, 2, or 3)
  • index: index position for each element in an array
  • images.val.url: value for images.val.url in root_images

These are the only fields that can be used as partition fields to store this pivoted data in Amazon S3. Specifying root table fields, such as name, doesn't work because those fields don't exist in root_images.

Join the relationalized data to get the normalized data

The id attribute in root_images is the order the arrays (1, 2, or 3) in the dataset. The images attribute in root holds the value of the array index. This means that you must use images and id to join root and root_images. You can run dynamicFrame.show() to verify the order of the arrays and the value of the array index.

To join root and root_images:

Python:

joined_root_root_images = Join.apply(personRelationalize(0), personRelationalize(1), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

Store the pivoted data

To store the pivoted data in Amazon S3 with partitions:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

Note: In the following example, personRelationalize(2) is the root_images pivoted data table.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

To store the pivoted data in Amazon S3 without partitions:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

Note: In the following example, personRelationalize(2) is the root_images pivoted data table.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

After you write the data to Amazon S3, query the data in Amazon Athena or use a DynamicFrame to write the data to a relational database, such as Amazon Redshift.