Comment utiliser des données pivotées après une transformation de relationalisation AWS Glue ?

Date de la dernière mise à jour : 17/03/2020

Je souhaite utiliser la transformation de relationalisation AWS Glue pour aplatir mes données. Quels champs peut-on utiliser comme partitions pour stocker les données pivotées dans Amazon Simple Storage Service (Amazon S3) ?

Brève description

La relationalisation de transformation permet d'utiliser des structures de données NoSQL, telles que des tableaux et des structures, dans les bases de données relationnelles. La transformation de relationalisation renvoie un ensemble de DynamicFrames (DynamicFrameCollection dans le langage Python et un tableau dans le langage Scala). Toutes les DynamicFrames retournés par une transformation de relationalisation sont accessibles via leurs noms individuels dans le langage Python et par le biais d'index de tableau dans le langage Scala.

Solution

Relationaliser les données

Ce didacticiel utilise le schéma suivant :

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Utilisez la syntaxe de relationalisation suivante pour Python :

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Utilisez la syntaxe de relationalisation suivante pour Scala :

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

Interpréter les données pivotées

Cette transformation de relationalisation produit deux schémas : root et root_images.

root :

|-- family_name: string
|-- name: string
|-- links: long
|-- gender: string
|-- image: string
|-- images: long

root_images :

|-- id: long
|-- index: int
|-- images.val.url: string
  • id : ordre de l'élément de tableau (1, 2 ou 3)
  • index : position d'index de chaque élément dans un tableau
  • images.val.url : valeur pour images.val.url dans root_images

Il s'agit des seuls champs qui peuvent être utilisés comme champs de partition pour stocker ces données pivotées dans Amazon S3. La spécification de champs de table root tels que name, ne fonctionne pas, car ces champs n'existent pas dans root_images.

Joindre les données relationalisées pour obtenir les données normalisées

L'attribut id dans root_images correspond à l'ordre des tableaux (1, 2 ou 3) dans l'ensemble de données. L'attribut images dans root contient la valeur de l'index du tableau. Cela signifie que vous devez utiliser images et ID pour joindre root et root_images. Vous pouvez exécuter dynamicFrame.show() pour vérifier l'ordre des tableaux et la valeur de l'index du tableau.

Pour joindre root et root_images:

Python :

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala :

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

Stocker les données pivotées

Pour stocker les données pivotées dans Amazon S3 avec des partitions :

Python :

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala :

Remarque : dans l'exemple suivant, personRelationalize(2) est la table de données pivotée root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Pour stocker les données pivotées dans Amazon S3 sans partitions :

Python :

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala :

Remarque : dans l'exemple suivant, personRelationalize(2) est la table de données pivotée root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Une fois que vous avez écrit les données dans Amazon S3, interrogez les données dans Amazon Athena ou utilisez un DynamicFrame pour écrire les données dans une base de données relationnelle, telle qu'Amazon Redshift.