Amazon Web Services ブログ

JSONSerDe によるマッピングを使って,入れ子の JSON から Amazon Athena のテーブルを作成する

by AWS Japan Staff | on | in General |

多くのシステムでは、イベント情報を記録するのに Java Script Object Notation (JSON) を使っています。JSON は効率的かつ柔軟ではありますが、JSON から情報を取り出すのは面倒です。

この投稿では、ログデリバリー手段としての Amazon Kinesis Firehose、ログ保存先としての Amazon S3、データの加工整形やデータベースへの挿入なしに ログに対して JSONSerDe を使って SQL クエリを投げる手段としての Amazon Athena を、緊密に連携させます。これらの処理は、完全にサーバーレスで行われます。コンピューティングリソースを準備する必要はありません。

Amazon SES を使えば、サービス間の全メッセージに対する詳細なログが入手でき、SES イベント発行によって、それを Firehose でも利用することができます。しかし、トレンドやコンプライアンスのデータに関する詳細ログのパースには、多額のインフラ投資や開発期間が必要となります。Athena は保存されているデータに対して、そのままのフォーマットで、コードを書いたりアーキテクチャ設計をしたりすることなく直接クエリできることにより、こうしたデータ探索に非常に適しています。その上、Athena では多くの標準 SQL クエリとシンタックスが利用可能です。

ウォークスルー: データセットの作成

まず、以下のような SES 送信イベントのデータセットをみてみましょう。

{
	"eventType": "Send",
	"mail": {
		"timestamp": "2017-01-18T18:08:44.830Z",
		"source": "youraddress@example.com",
		"sourceArn": "arn:aws:ses:us-west-2:111222333:identity/youraddress@example.com",
		"sendingAccountId": "111222333",
		"messageId": "01010159b2c4471e-fc6e26e2-af14-4f28-b814-69e488740023-000000",
		"destination": ["success@simulator.amazonses.com"],
		"headersTruncated": false,
		"headers": [{
				"name": "From",
				"value": "youraddress@example.com"
			}, {
				"name": "To",
				"value": "success@simulator.amazonses.com"
			}, {
				"name": "Subject",
				"value": "Bounced Like a Bad Check"
			}, {
				"name": "MIME-Version",
				"value": "1.0"
			}, {
				"name": "Content-Type",
				"value": "text/plain; charset=UTF-8"
			}, {
				"name": "Content-Transfer-Encoding",
				"value": "7bit"
			}
		],
		"commonHeaders": {
			"from": ["youraddress@example.com"],
			"to": ["success@simulator.amazonses.com"],
			"messageId": "01010159b2c4471e-fc6e26e2-af14-4f28-b814-69e488740023-000000",
			"subject": "Test"
		},
		"tags": {
			"ses:configuration-set": ["Firehose"],
			"ses:source-ip": ["54.55.55.55"],
			"ses:from-domain": ["amazon.com"],
			"ses:caller-identity": ["root"]
		}
	},
	"send": {}
}

 

このデータセットには、 SES の送受信に関する多くの価値ある情報が含まれています。インサイトを得るためにパース処理が必要な、同じ形式のデータセットが山ほどあります。まずはデータを取得しましょう。

1. SES のコンソールまたは CLI から、Firehose のデリバリーストリームを S3 に対しニアリアルタイムで送信、保存する configuration set を作成します。

NestedJson_1

2. SES を使ってテストメールを送ってみます。送信の際に、作成した configuration set を使用するよう指定します。

SES コンソールから指定を行う場合、More options を選択してください。Configuration Set を含む、いくつかのフィールドが表示されます。

NestedJson_2

また SES の検証手順や AWS CLI を使って、メールボックスシミュレータのアドレスにメッセージを送信できます。

$ aws ses send-email --to bounce@simulator.amazonses.com --from youraddress@example.com --subject "Bounced Like a Bad Check" --text "This should bounce" --configuration-set-name Firehose

3. ログが蓄積される S3 バケットを指定します。

NestedJson_3

ウォークスルー: Athena でクエリ

Amazon Athena は、Amazon S3 に置かれたデータに対し標準 SQL を使って簡単に分析を行える、インタラクティブなクエリサービスです。Athena を使うためにはサーバを用意する必要はなく、したがってインフラを管理する必要もありません。実行したクエリに対してのみ、料金が発生します。CSV,、JSON、ORC、そして Parquet といったさまざまな標準的なデータフォーマットに対応しています。

Hive メタストアと互換性のある DDL ステートメントを用いて、データスキーマを定義することによって、Athena がデータを処理できるようになります。Athena は分散 SQL エンジンの Presto を使用して、クエリを実行します。また Apache Hive の DDL シンタックスによってテーブルやパーティションの作成、削除、変更を行います。Athena は schema-on-read と呼ばれるアプローチを採用しており、それによりクエリを実行するタイミングで初めてスキーマを付与することになります。クエリの結果を得るために、ログに含まれる各フィールドを、対応するカラムにマッピングすることになります。

もし Apache Hive にすでに慣れている場合には、Athena でのテーブル作成は非常に似たやり方で行えます。クエリエディター、ウィザード、JDBCドライバーのいずれかを使って DDL ステートメントを記述することで、テーブルを作成できます。テーブル作成の中で大事なのは SerDe (“Serializer and Deserializer” の略称) です。データフォーマットが JSON のため、Athena でもともとサポートされている org.openx.data.jsonserde.JsonSerDe を使って、データのパースを行います。その際には、Hive/Presto と JSON データセットにまつわる2つのよくある問題に対処しなければなりません。

  • 入れ子または複数階層の JSON
  • (マッピングの制御に使われる) 禁止文字

Athena のクエリエディタでは、最初の Athena テーブルを作る際には次のような DDL ステートメントを使います。LOCATION にはログが置かれている S3 のファイルパスを指定します。

CREATE EXTERNAL TABLE sesblog (
  eventType string,
  mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>
              > 
  )           
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<YOUR BUCKET HERE>/FH2017/' 

この DDL ステートメントでは、Presto のデータ型に合わせて JSON データセット内のフィールドを宣言しています。またオブジェクトのグループを扱うために、配列や構造体に似た、Hive のコレクション型を使用しています。

ウォークスルー: 入れ子の JSON

JSON が 3段階で入れ子になっていることにより、mail キーの定義はとてもややこしい形になります。この例では、他のさまざまなキーが入れ子になって内部に含まれた mail という構造体を、一番上の階層で作成します。mail には messageId や destination が2つめの階層として含まれます。timestamp フィールドは、バッククォート (`) 文字に囲まれています。timestamp は Presto のデータ型として予約語になっているため、テーブル作成において混乱が生じないように、予約語と同一名のカラムを定義する際にはバックォートで囲む必要があります。3番目の階層はヘッダデータになります。ここには 名前:値 のペアが複数含まれます。<name:string, value:string> の形をしたオブジェクトの配列として、スキーマの定義ができます。なお予約語のカラムを作成するためには、commonHeaders struct の中にある `from` のようにバックスラッシュで囲む必要があります。

これでテーブルが作成され、クエリを投げることができるようになりました!

SELECT * FROM sesblog limit 10;

この出力結果では、eventTypemail という2つの最上位階層のカラムが表示されていますが、これだとクエリした結果のデータが取得できた、以上の有益な結果は得られません。分析したいデータを取得するために、入れ子表記を使用したクエリを使うことができます。

“月曜日のキャンペーンで、宛先不明で戻ってきたメッセージはどれか?”

SELECT eventtype as Event,
       mail.destination as Destination, 
       mail.messageId as MessageID,
       mail.timestamp as Timestamp
FROM sesblog
WHERE eventType = 'Bounce' and mail.timestamp like '2017-01-09%'

“あるドメインで、宛先不明で戻ってきたメッセージは何件か?”

SELECT COUNT(*) as Bounces 
FROM sesblog
WHERE eventType = 'Bounce' and mail.destination like '%amazonses.com%'

“amazonses.com ドメインに戻ってきたメッセージはどれか?”

SELECT eventtype as Event,
       mail.destination as Destination, 
       mail.messageId as MessageID 
FROM sesblog
WHERE eventType = 'Bounce' and mail.destination like '%amazonses.com%'

データセットからユースケースに応じた結果を得るために、さらに突っ込んだクエリを書くこともできます。作成したテーブルでは、JSON イベント内の tags セクションについてはスキーマを指定していません。続いてはこれをみていきます。

ウォークスルー: マッピングにおける禁止文字の取り扱い

データセットに対する DDL を最初に書く際に、真っ先にぶつかる問題として、ログフォーマットを変更するのが困難であるということと、Hive ではデータ型を定義する際にコロン (:) が大きな役割を果たしているということが挙げられます。イベント内の tag セクションについて、各フィールドをパースするために、JSONSerDe を使う必要があります。このデータによって誰がメッセージを作成したのかがわかるため、監査やセキュリティのユースケースにおいて非常に大事なものです。

Athena のクエリエディタで、以下の DDL を実行して 2 つめの Athena テーブルを作成しましょう。LOCATION として、ログが置かれている S3 バケットを指定します:

CREATE EXTERNAL TABLE sesblog2 (
  eventType string,
  mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>,
              tags:struct<ses_configurationset:string,ses_source_ip:string,ses_from_domain:string,ses_caller_identity:string>
              > 
  )           
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity"
  )
LOCATION 's3://<YOUR BUCKET HERE>/FH2017/' 

新しいテーブルを作成する際に、SERDEPROPERTIES の記述を書き加えます。これにより、SerDe に追加のパラメタを与えることができます。カラム名の真ん中に : を含んでいるデータを取り扱うための、マッピングのプロパティを記述します。これにより、ses:configuration-setses というカラム名に configuration-set というデータ型が指定されたものとして解釈されます。先ほどの記述と違い、この場合はバッククォートで囲むことはできません。JSON SERDEPROPERTIES のマッピングセクションでは、テーブル作成の際にあらゆる不正な文字を再マッピングしてしまいます。

例えば、ses データ内の ses:configuration-set に対して、先ほどの定義をすることで、Athena に対して ses_configurationset の形でクエリを投げることができます。このマッピングは、S3 嬢のデータに対しては一切変更を加えません。これは Hive の概念としてのみ取り扱われ、既存のデータを置き換えるものではありません。プロパティ内で 4 つのフィールドをマッピングして(コロンを含んだものをすべて、サポートされているアンダースコアに置き換えています)、tag の構造体を作成する際に、この新しいマッピングされた名前を使います。

これで、そのほかの認証・監査に関するカラムにアクセスできるようになったので、クエリを投げることでさらなる疑問を解決できます。

“すべての宛先不明なメッセージを作成したのは誰か?”

SELECT eventtype as Event,
         mail.timestamp as Timestamp,
         mail.tags.ses_source_ip as SourceIP,
         mail.tags.ses_caller_identity as AuthenticatedBy,
         mail.commonHeaders."from" as FromAddress,
         mail.commonHeaders.to as ToAddress
FROM sesblog2
WHERE eventtype = 'Bounce'

特筆すべき点として、mail.commonHeaders.”from” の取り扱いがあります。from は Presto の予約語なので、予約語と解釈させないためにダブルクォーテーション (“) で囲む必要があります。

ウォークスルー: SES カスタムタギングに対するクエリ

SES では AWS の外に対するメッセージに対して、カスタムタグをセットすることができるため、この mail.tag は非常に重要な意味を持ちます。大事なメッセージに tag を付与しておき、その tag を用いて Athena で分析することができます。例えばマーケティングキャンペーンを補足するためにキャンペーンタグを使いたい場合、SES CLI から -tags オプションを使ってタグを付与することができます:

$ aws ses send-email --to success@simulator.amazonses.com --from youraddress@example.com --subject "Perfume Campaign Test" --text "Buy our Smells" --configuration-set-name Firehose --tags Name=Campaign,Value=Perfume

この結果には、カスタムタグが付与されたエントリーが含まれます。

…
		"tags": {
			"ses:configuration-set": ["Firehose"],
			"Campaign": ["Perfume"],
			"ses:source-ip": ["54.55.55.55"],
			"ses:from-domain": ["amazon.com"],
			"ses:caller-identity": ["root"],
			"ses:outgoing-ip": ["54.240.27.11"]
		}
…

キャンペーンタグを管理するための 3 つめのテーブルを作成します。

CREATE EXTERNAL TABLE sesblog3 (
  eventType string,
  mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:string,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>,
              tags:struct<ses_configurationset:string,Campaign:string,ses_source_ip:string,ses_from_domain:string,ses_caller_identity:string>
              > 
  )           
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity"
  )
LOCATION 's3://<YOUR BUCKET HERE>/FH2017/' 

そして各メールに付与したカスタムタグを使ってクエリを実行することができます。

SELECT eventtype as Event,
       mail.destination as Destination, 
       mail.messageId as MessageID,
       mail.tags.Campaign as Campaign
FROM sesblog3
where mail.tags.Campaign like '%Perfume%'

NestedJson_4

ウォークスルー: プログラムによって hive-json-schema を使った DDL を作成

これらの全ての例では、SES のインタラクションタイプとして send のみをもとにして、テーブル作成を行っています。SES には他にも、delivery、complaint、bounce などのインタラクションタイプがあり、それぞれ複数の追加フィールドを持っています。これらに対応するために、すべての異なる SES eventTypes をパースし、1 つのテーブルとしてクエリできるようなマスター DDL を作成します。

適切に動作する JSONSerDe DDL を作業で記述するのは、退屈な上に間違いを起こしやすいので、ここでは AWS サポートでよく用いられている、オープンソースのツールを使用します。これを使った場合、コロンを含む SES カラムのマッピングのみ記述するだけになります。

このサンプル JSON ファイルには、すべての SES イベントタイプで取りうる全てのフィールドが含まれています。hive-json-schema を使うことで,入れ子になった JSON DDL を記述することができます。

こちらが全てのタイプの SES ログにクエリできる “マスター” DDL です:

CREATE EXTERNAL TABLE sesmaster (
  eventType string,
  complaint struct<arrivaldate:string, 
                   complainedrecipients:array<struct<emailaddress:string>>,
                   complaintfeedbacktype:string, 
                   feedbackid:string, 
                   `timestamp`:string, 
                   useragent:string>,
  bounce struct<bouncedrecipients:array<struct<action:string, diagnosticcode:string, emailaddress:string, status:string>>,
                bouncesubtype:string, 
                bouncetype:string, 
                feedbackid:string,
                reportingmta:string, 
                `timestamp`:string>,
  mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>,
              tags:struct<ses_configurationset:string,ses_source_ip:string,ses_outgoing_ip:string,ses_from_domain:string,ses_caller_identity:string>
              >,
  send string,
  delivery struct<processingtimemillis:int,
                  recipients:array<string>, 
                  reportingmta:string, 
                  smtpresponse:string, 
                  `timestamp`:string>
  )           
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity",
  "mapping.ses_outgoing_ip"="ses:outgoing-ip"
  )
LOCATION 's3://<YOUR BUCKET HERE>/FH2017/'

結論

この投稿では、AWS サービスのログで使われている JSON 形式のデータに対して、Amazon Athena のクエリを投げる実際的なユースケースを紹介しました。ユースケースの中には、宛先不明や迷惑メールフラグを集計するようなものがありました。その他にも、キャンペーンの効果検証のレポーティングのようなものもあります。さらに、どのインスタンスやユーザーがメールを送信したかといった、監査やセキュリティに関するものもありました。さらに入れ子の JSON を扱う方法や SerDe のマッピングについて触れることで、データを加工整形することなく元のフォーマットのままでクエリを投げることができました。

AWS QuickSight のような新しい ツールを使うことで、こうしたデータからダッシュボードを作成することができます。これによりデータのレポーティングがさらに簡単になります。Athena を QuickSight のデータソースとして使う方法は、こちらのブログ記事を参照してください。

さらに、クエリのパフォーマンスを向上させるための最適化を行ったり、、必要なデータだけをスキャンすることでスキャンデータ量を削減するためにパーティションを設定したり、といった改善を実施することができます。限られた時間内にレポートを出す必要があるときには、S3 のライフサイクル設定を使用して、古くなったデータを Amaazon Glacier に移動させたり、削除したりといった対応を取ることもできます。

原文: Create Tables in Amazon Athena from Nested JSON and Mappings Using JSONSerDe(翻訳: SA 志村)