AWS Glue でリレーショナル変換後にピボットされたデータを使用する方法を教えてください。

最終更新日: 2020 年 3 月 17 日

AWS Glue のリレーショナル変換を使用してデータを平坦にしたいです。ピボットされたデータを Amazon Simple Storage Service(Amazon S3) に保存するためのパーティションとして使用できるフィールドはどれですか?

簡単な説明

リレーショナル変換により、リレーショナルデータベースで NoSQL データ構造 (配列や構造など) を使用できるようになります。リレーショナル変換は、DynamicFrames のコレクション (Python では DynamicFrameCollection、Scala では配列) を返します。リレーショナル変換によって返されるすべての DynamicFrames には、Python では個別の名前から、Scala では配列インデックスを使用してアクセスできます。

解決方法

データをリレーショナル化

このチュートリアルでは、次のスキーマを使用します。

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Python では、次のリレーショナル構文を使用します。

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Scala には、次のリレーショナル構文を使用します。

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

ピボットされたデータを解釈

このリレーショナル変換では、root root_images という 2 つのスキーマが生成されます。

root:

|-- family_name: string
|-- name: string
|-- links: long
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: 配列要素の順序 (1、2、または 3)
  • index: 配列内の各要素のインデックス位置
  • images.val.url: root_imagesimages.val.url

これらは、このピボットされたデータを Amazon S3 に保存するためのパーティションフィールドとして使用できる唯一のフィールドです。名前などの root テーブルフィールドは、root_images に存在しないため、機能しません。

正規化されたデータを取得するためにリレーショナルデータを結合する

root_imagesid 属性は、データセット内の配列 (1、2、または 3) の順序です。rootimages 属性は、配列インデックスの値を保持します。つまり、rootroot_images を結合するには、イメージID を使用する必要があります。dynamicFrame.show () を実行して、配列の順序と配列インデックスの値を確認できます。

rootroot_images を結合する方法:

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

ピボットされたデータを保存

ピボットされたデータをパーティションを使用して Amazon S3 に保存する方法:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

注意: 次の例で、personRelationalize(2)root_images のピボットされたデータテーブルです。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

ピボットされたデータをパーティションなしで Amazon S3 に保存する方法:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

注意: 次の例で、personRelationalize(2)root_images のピボットされたデータテーブルです。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

データを Amazon S3 に書き込んだ後、Amazon Athena でデータをクエリするか、DynamicFrame を使用して Amazon Redshift などのリレーショナルデータベースにデータを書き込む操作を行います。