¿Cómo puedo utilizar datos dinámicos después de una transformación Relationalize de AWS Glue?

4 minutos de lectura
0

Quiero usar la transformación Relationalize de AWS Glue para aplanar mis datos. ¿Qué campos puedo usar como particiones para almacenar los datos dinámicos en Amazon Simple Storage Service (Amazon S3)?

Descripción corta

La transformación Relationalize permite utilizar estructuras de datos que no sean de SQL, como matrices y estructuras, en bases de datos relacionales. La transformación Relationalize devuelve una recopilación de DynamicFrames (una DynamicFrameCollection en Python y una matriz en Scala). Se puede acceder a todos los DynamicFrames devueltos por una transformación Relationalize a través de sus nombres individuales en Python y a través de índices de matrices en Scala.

Resolución

Relacionalización de los datos

En este tutorial se utiliza el siguiente esquema:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Utilice la siguiente sintaxis Relationalize para Python:

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Utilice la siguiente sintaxis Relationalize para Scala:

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

Interpretación de los datos dinámicos

Esta transformación Relationalize da lugar a dos esquemas: root y root_images.

root:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: orden del elemento de la matriz (1, 2 o 3)
  • index: posición del índice para cada elemento de una matriz
  • images.val.url: valor de images.val.url en root_images

Estos son los únicos campos que se pueden usar como campos de partición para almacenar estos datos dinámicos en Amazon S3. Especificar campos de tabla root, como name, no funciona porque esos campos no existen en root_images.

Unión de los datos resultado de Relationalize para obtener los datos normalizados

El atributo id en root_images indica el orden de las matrices (1, 2 o 3) en el conjunto de datos. El atributo images en root contiene el valor del índice de la matriz. Esto significa que debe utilizar images e id para unir root y root_images. Puede ejecutar dynamicFrame.show() para comprobar el orden de las matrices y el valor del índice de la matriz.

Para unir root y root_images:

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

Almacenamiento de los datos dinámicos

Para almacenar los datos dinámicos en Amazon S3 con particiones:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

Nota: En el siguiente ejemplo, personRelationalize(2) es la tabla de datos dinámicos root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Para almacenar los datos dinámicos en Amazon S3 sin particiones:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

Nota: En el siguiente ejemplo, personRelationalize(2) es la tabla de datos dinámicos root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Después de escribir los datos en Amazon S3, consulte los datos en Amazon Athena o utilice DynamicFrame para escribir los datos en una base de datos relacional, como Amazon Redshift.


Información relacionada

Simplificación de las consultas en JSON con anidamiento mediante la transformación Relationalize de AWS Glue

Ejemplo de código: combinación y relacionalización de datos

OFICIAL DE AWS
OFICIAL DE AWSActualizada hace 3 años