Category: Analytics*


AWS Glue や Amazon Athena を用いたサーバーレスな Machine Learning 環境

属性をもとにデータセットを分割しなければならなかったことはありませんか?K-means はデータ分割を行うために最も頻繁に使用されている Machine Learning アルゴリズムの 1 つです。このアルゴリズムは異なるグループ (クラスター) にデータを分けることで機能します。各サンプルにはクラスターが指定されます。これは同じクラスターに指定されたサンプルが、他のクラスターにあるサンプルに比べて互いが類似しているようにするためです。

今回のブログでは AWS Glue を使用して、Amazon S3 にあるタクシー乗車のデータセットを例に K-means を適用し、そのデータをタクシーの座標に基づき 100 通りのクラスターに分割する方法を説明します。次に Amazon Athena を使用し、乗車数そして各クラスターのおおよその地域をクエリします。最後に Amazon Athena を使用して、最も乗車数の多かった 4 つの地域の座標を割り出します。AWS Glue と Amazon Athena では、ユーザーによるプロビジョンやサーバー管理を必要とせずに、こうしたタスクを実行することができます。

ソリューションの概要

今回は、過去のブログで使用したニューヨーク市のタクシーに関するデータセットを使用します: 「AWS Glue、Amazon Athena、Amazon QuickSight を使用して様々なプロバイダからのデータを調和、クエリ、視覚化する (Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight)」2016 年 1 月のタクシー乗車に関する緑色のタイプから構成されたテーブルを使用します。

座標に基づいたデータセットを分割するために、Spark Machine Learning K-means クラスタリングライブラリを使用した AWS Glue のジョブスクリプトをお見せします。このスクリプトは緑のタクシーデータを読み込み、各行に指定されるクラスターを示す列を追加することで、このジョブを実行します。スクリプトは parquet 形式を使用してテーブルを Amazon S3 バケット (保存先ファイル) に保存します。バケットをクエリするには Amazon Athena を使用できます。

乗客を拾うすべての登録済みの場所に対し、100 種類のグループ (クラスター) にあるタクシー乗車のデータセットを分配させる場合の問題について見てみましょう ( pickup_longitude および pickup_latitude の列)。この問題を解決するため、AWS Glue スクリプトは出力テーブルを読み込み、Spark Machine Learning ライブラリを使用してクラスター数を 100 に設定した K-means を実装します。結果は parquet 形式を使用して Amazon S3 バケットに保存されるので、Amazon Athena を使用してクエリすることができます。

演習

AWS Glue ジョブの実行

以下の手順に従ってください。

  1. AWS マネジメントコンソールで AWS コンソールにアクセスします。テーブルを書き込むために AWS Glue クローラの新しいデータベースを作成します。
  2. 次にポイントする新しいクローラを作成します。
    s3://serverless-analytics/glue-blog  -- set to run on-demand

  3. クローラを実行します。

    クローラが次の属性を含む緑色のテーブルを分類しているか確認します。
  4. スクリプトファイル MLkmeans.py を S3 バケットいずれかにアップロードします。
  5. 新しい AWS Glue ジョブを追加し、ジョブの名前とロールを選びます。次に [An existing script that you provide] からジョブを実行するオプションを選択し、アップロードしたスクリプトの S3 パスを選んでから一時ファイルの S3 パスを選択します。[Next] を 2 回に渡り選択して完了します。
  6. スクリプトを編集します。
    • ジョブを選びオプションを選択して編集します。
    • [destination] の変数を希望する結果の保存先に変更します (次のイメージでは 17 列目)。
    • [namespace] と [tablename] を先に実行したクローラによる緑色のテーブルのデータベースと名前に変更します (次のイメージでは 18 列目と 19 列目)。
  7. AWS Glue ジョブを実行します。
  8. 送信先のパスで parquet 形式のファイルが作成されているか確認します。
  9. 送信先のパスにポイントするクローラを新たに作成します。
  10. 送信先のパスでクローラを実行し、新たに変換したデータセットにポイントする AWS Glue のデータカタログで新しいテーブルを作成します。

Athena を使用して結果をクエリする方法

AWS Glue が抽出した parquet 形式データセットの分析、変換、ジョブの読み込み (ETL) をクローラが完了したら、次の列を含むデータカタログにテーブルが表示されます。

予測用の列は k-means のアルゴリズムにより追加されたもので、各行に指定されたクラスター ID を表す整数が含まれています。

Amazon Athena にある次のクエリを使用して、算出済みのクラスタすべてをリストにした例を見てみましょう。

SELECT  count(*) as count, 
     (max(pickup_latitude) - min(pickup_latitude))*(max(pickup_longitude) - min(pickup_longitude)) 
as approximate_cluster_area , prediction  
FROM RESULTDATABASE.RESULTTABLENAME group by prediction  order by prediction 

結果のテーブル名とデータベースを RESULTDATABASE.RESULTTABLENAME に置き換えると、クエリは次のようになります。

この結果は地理的な各地域で客を拾ったタクシーの数を列にある数字で表しています。また、 approximate_cluster_area の列で示されている各エリアも対象にしています。

アクティビティが最も多い 10 件のクラスターをリストにした別の例も見てみましょう。そのセンターの座標も算出します。

 SELECT count(*) AS count,
         avg(pickup_latitude) AS latitute,
         avg(pickup_longitude) AS longitude,
         prediction
FROM RESULTDATABASE.RESULTTABLENAME
GROUP BY  prediction
ORDER BY  count DESC limit 10

RESULTDATABASE.RESULTTABLENAME を結果のテーブル名とデータベースに変えると、クエリは次のようになります。

乗車数が最も多かったクラスター 10 件が結果として表示されます。Amazon Quicksight 地理空間の視覚化を可能にする機能でこの座標を使うと次のように表示されます。

まとめ

このブログでは、サーバーを起動したり管理する必要がなく監視されていない Machine Learning アルゴリズムを使用して、AWS Glue や Amazon Athena を使う方法を説明しました。この例ではタクシーの座標に基づき、タクシーの乗車数のデータセットを 100 通りのグループに分類しました。クエリデータを使用して、各グループのエリアや乗車数を計算することができます。

このブログで説明したソリューションは、いくつかの編集を加えるだけで別のデータセットを適用することもできるので、独自のユースケースに使用することもできます。


その他の参考資料

PMML ベースのアプリケーション構築や AWS で予測を生成する方法をご覧ください。


今回のブログの投稿者について

Luis Caro は AWS プロフェッショナルサービスのビッグデータコンサルタントです。お客様にビッグデータプロジェクトに関するガイドや技術的なサポートの提供を担当、AWS を使用して顧客が独自のソリューション価値を改善するためのサポートに努めています。

Amazon Kinesis を用いた Databaseの継続的な変更

Emmanuel Espina は、アマゾン ウェブ サービスのソフトウェア開発エンジニアです。

このブログ記事では、Amazon Kinesis を使用して変更をストリーミングすることによって、中央リレーショナルデータベース を他のシステムと統合する方法について説明します。

次の図は、分散システムにおける一般的なアーキテクチャ設計を示しています。これには、「」と呼ばれる中央ストレージと、この中央ストレージを消費するいくつかの派生「衛星」システムが含まれます。

この設計アーキテクチャを使用して、リレーショナルデータベースを中央データストアとして使用し、このシステムのトランザクション機能を利用してデータの整合性を維持することができます。このコンテキストにおける派生システムは、この変化の事実の単一ソースを観察し、それらの変更を変換し、フィルタリングし、最終的にはその内部インデックスを更新する全文検索システムとすることができます。もう 1 つの例は、OLAP クエリに適した列形式ストレージです。一般に、中央リレーショナルシステムの個々の行を変更する際にアクションを取る必要のあるシステムは、派生データストアに適した候補となります。

これらの種類のアーキテクチャの単純な実装では、変更された行を検索するために派生システムが定期的にクエリを発行し、本質的に SELECT ベースのクエリで中央データベースをポーリングします。

このアーキテクチャのより優れた実装となるのが、非同期の更新ストリームを使用するアーキテクチャです。データベースには通常、行のすべての変更が格納されるトランザクションログがあるため、この変更のストリームが外部オブザーバシステムに公開されている場合、これらのシステムにこれらのストリームを添付して行の変更を処理およびフィルタリングできます。ここでは、中央データベースとして MySQL、メッセージバスとして Amazon Kinesis を使用して、このスキーマの基本的な実装をご紹介します。

通常、MYSQL バイナリログは、マスター上のすべての変更を読み取ってローカルに適用する読取りレプリカに公開されます。この記事では、変更をローカルデータベースに適用するのではなく、Amazon Kinesis ストリームに変更を公開する、一般化されたリードレプリカを作成します。

このメソッドの重要な点の 1 つは、コンシューマーが SQL クエリを受け取らないことです。SQL クエリは公開される可能性もありますが、一般的なオブザーバーは、SQL 互換のデータレプリカを維持しない限り、SQL にはあまり関心がありません。代わりに、変更されたエンティティ (行) を 1 つずつ受け取ります。このアプローチの利点は、コンシューマーが SQL を理解する必要はなく、事実の単一ソースは誰が変更を消費するのかを知る必要はないということにあります。これは、さまざまなチームが、必要なデータ形式で調整することなく作業できることを意味します。さらに都合がいいことに、Amazon Kinesis クライアントはが特定の時点から読む機能を備えているため、各コンシューマーは独自のペースでメッセージを処理します。これが、メッセージバスがシステムを統合するための結合されていない方法の 1 つとなる理由です。

この記事で使用されている例では、行フェッチャーは中央データベースに接続する通常の Python プロセスであり、リードレプリカをシミュレートします。

データベースは、Amazon RDS または MySQL の任意のインストールのいずれかになります。RDS の場合、フェッチャープロセスは RDS インスタンスホストにカスタムソフトウェアをインストールすることができないため、別のホスト (たとえば EC2) にインストールする必要があります。外部インストールの場合、フェッチャープロセスはデータベースと同じホストにインストールできます。

マスター MySQL インスタンスの準備

MySQL マスター (真実の単一のソース) は、それが通常のレプリケーションのマスターであるかのように設定する必要があります。個々の変更された行を受け取るには、Binlog を有効にして ROW 形式で作業する必要があります。(そうしないと、SQL クエリだけになってしまいます。)詳細については、MySQL サイトの「バイナリログ」を参照してください。

バイナリログを有効にするには、my.cnf 設定ファイルに次の 2 行を追加します。

log_bin=<path to binlog>
binlog_format=ROW

すべての接続 (たとえば、init_connect または JDBC などのデータベース API を使用) のグローバルまたはセッションレベルで、トランザクション分離レベルをコミット済み読み取りに設定することによって、行ベースのロギングを取得できます。

RDS (MySql 5.6+) を使用している場合は、簡単です。定期的なバックアップを有効にして (バックアップが有効になっていなければバイナリログは無効になります)、パラメータグループ変数 binlog_format を ROW に更新することによって、必要な設定を作成できます。(これは、パラメータグループの RDS ダッシュボードから行うことができます。)

アクセス権限の追加

RDS で作成されたデフォルトのユーザーを使用している場合は、既にこれらのアクセス許可がある可能性があります。そうでない場合は、REPLICATION SLAVE 権限を持つユーザーを作成する必要があります。詳細については、「レプリケーション用のユーザーの作成」を参照してください。

mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

Amazon Kinesis ストリームの作成

Amazon Kinesis ストリームと boto3 クライアントの認証情報が必要です。クライアントの認証情報の詳細については、「Boto 3 ドキュメント」を参照してください。

Amazon Kinesis コンソールを開き、[ストリームの作成] を選択します。

使用するストリームの名前とシャード数を入力します。この例では、1 つのシャードがあります。

数分後、ストリームで行の変更を受け入れる準備が整います。

CLI ユーザーに権限を割り当てる

AWS Identity and Access Management (IAM) を使用して、このストリームにアクセスする CLI ユーザに権限を与えることができます。

この例では、そのユーザーは KinesisRDSIntegration です。ユーザーを作成したり、既存のユーザーを使用することはできますが、Amazon Kinesis ストリームへの書き込み権限を追加する必要があります。

ストリームに固有のポリシーを作成できます。この例では、Amazon Kinesis に完全にアクセスできるスタンダードポリシーを使用しています。

マスターに接続して変更を公開する

Python パブリッシャーが必要とするライブラリをインストールするには、次のコマンドを実行します。

pip install mysql-replication boto3

詳細な手順については、次を参照してください。

https://github.com/noplay/python-mysql-replication

https://boto3.readthedocs.io/en/latest/guide/quickstart.html

ここに、マジックを実行する Python スクリプトがあります。<HOST>、<PORT>、<USER>、<PASSWORD>、および <STREAM_NAME> の各変数を設定値に置き換えることを忘れないでください。

Python

 

import json

import boto3



from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

  DeleteRowsEvent,

  UpdateRowsEvent,

  WriteRowsEvent,

)



def main():

  kinesis = boto3.client("kinesis")



  stream = BinLogStreamReader(

    connection_settings= {

      "host": "<HOST>",

      "port": <PORT>,

      "user": "<USER>",

      "passwd": "<PASSWORD>"},

    server_id=100,

    blocking=True,

    resume_stream=True,

    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])



  for binlogevent in stream:

    for row in binlogevent.rows:

      event = {"schema": binlogevent.schema,

      "table": binlogevent.table,

      "type": type(binlogevent).__name__,

      "row": row

      }



      kinesis.put_record(StreamName="<STREAM_NAME>", Data=json.dumps(event), PartitionKey="default")

      print json.dumps(event)



if __name__ == "__main__":

   main()

このスクリプトは、変更された各行を JSON 形式でシリアル化された Amazon Kinesis レコードとして公開します。

メッセージを使用する

これで、変更されたレコードを使用する準備が整いました。あらゆるコンシューマーコードが動作します。この記事のコードを使用すると、次の形式でメッセージが表示されます。

{"table": "Users", "row": {"values": {"Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"Name": "Bar user", "idUsers": 124}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"Name": "Foo User", "idUsers": 123}, "after_values": {"Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}

まとめ

このブログ記事では、擬似リードレプリカと Amazon Kinesis を使用して、変更ストリームをデータベースのレコードに公開する方法を説明しました。多くのデータ指向の企業が、これに類似したアーキテクチャを使用しています。この記事で提供されている例は、実際の本番環境には対応していませんが、この統合スタイルを試して、エンタープライズアーキテクチャの拡張機能を改善するために使用できます。最も複雑な部分は、おそらく Amazon Kinesis の背後で既に解決されているものでしょう。接着剤の役割を果たすものを提供する必要があります。

その他のリソース

すべてのソフトウェアエンジニアがリアルタイムデータの統合抽象化について知っておくべきこと

Databus に搭載されているすべてのもの: LinkedIn のスケーラブルで一貫性のある変更データキャプチャプラットフォーム

 

Amazon Kinesis Video Streams – コンピュータビジョン・アプリケーションのためのサーバーレスな動画データの収集と保存

携帯電話、防犯カメラ、ベビーモニター、ドローン、WEBカメラ、車載カメラ、さらには人工衛星まで、これらすべては高輝度で高品質の動画ストリームを生成できます。 住宅、オフィス、工場、都市、街路、高速道路は現在、膨大な数のカメラを備えています。これらのカメラは、洪水などの自然災害時に被害状況の調査を可能にし、公共の安全性を高め、子供が安全かつ健康であることを知らせ、無限に繰り返す「失敗」動画のための一瞬を補足し(個人的な趣味の話です)、身元の判定に役立つデータを集め、交通の問題を解決するなど、様々な場面で活用されます。

この動画データの洪水を扱うことは、言い表せないほど難しいことです。 入力ストリームには、個別に、または何百万という単位でデータが到着します。 ストリームには価値あるリアルタイムなデータが含まれており、遅延したり、一時停止したり、より適切なタイミングで処理するためにデータを脇に置いておいたりすることはできません。生のデータを取得すると、他の課題が発生します。動画データの保存、暗号化、索引作成などが頭に浮かびますね。価値を引き出すこと、つまりコンテンツに深く潜って、そこにあることを理解し、行動を起こすことは、次の大きなステップです。

新しい Amazon Kinesis Video Streams

2017年11月29日、リアルタイムストリーミングサービスであるAmazon Kinesisファミリーの新しいメンバーとして、Amazon Kinesis Video Streamsをご紹介します。 これによって、独自のインフラストラクチャを構築して動かすことなく、何百万ものカメラデバイスからストリーミング動画(または時系列にエンコードされたデータ)を取り込むことができます。 Amazon Kinesis Video Streamsは、入力ストリームを受け入れ、永続的かつ暗号化された形式で保存し、時間に基づいたインデックスを作成し、コンピュータビジョン・アプリケーションの作成を可能にします。 あなたはAmazon Recognition VideoやMXNetTensorFlow、OpenCV、または独自のカスタムコード、つまりクールな新しいロボットや、分析、あなたが考え出すコンシューマー・アプリケーションを支えるあらゆるコードを使用して、入力ストリームを処理することができます。
(more…)

Amazon Elasticsearch Service へのアクセスコントロールの設定

Dr. Jon Handler (@_searchgeek) は、検索技術を専門とする AWS ソリューションアーキテクトです。

Amazon Elasticsearch Service (Amazon ES) ドメインを保護することで、権限のないユーザーがデータにアクセスしたり変更したりすることを防ぐことができます。ほとんどのお客様は、IP アドレスベースまたは ID ベースのアクセスポリシーのセキュリティを望んでいますが、利便性からオープンアクセスを選択しています。オープンアクセスのドメインは、インターネット上の任意の当事者から Amazon ES ドメインへのデータの作成、表示、変更、および削除の要求を受け入れるため、このオプションはほとんどのお客様にとって適切なものではありません。

以前のブログ記事「Amazon Elasticsearch Service ドメインのアクセスをコントロールする方法」では、アクセスコントロールについて詳細に説明しました。このブログ記事では、ドメインの IAM ポリシーを開始する簡単な方法をいくつか紹介します。AWS Identity and Access Management (IAM) ポリシーを設定するにはいくつかの作業が必要ですが、この「予防的対策」により後で大量の作業が発生するのを防ぐことができます。

Amazon Elasticsearch Service における主要なアクセスコントロールの概念

Amazon ES が作成するドメインには、Elasticsearch クラスタ内のノードと複数の AWS サービスのリソースが含まれます。Amazon ESがドメインを作成すると、ドメインはサービス制御された VPC でインスタンスを起動します。これらのインスタンスの前面には Elastic Load Balancing (ELB) があり、ロードバランサーのエンドポイントはRoute 53 を通じて発行されます。ドメインへのリクエストは、ドメインの EC2 インスタンスにルーティングする ELB ロードバランサをパススルーします。リクエストがどこに行くかに関わらず、インスタンスは IAM にコンタクトしてリクエストが承認されているかどうかを判断します。承認されていないリクエストはブロックされ、破棄されます。

IAM ポリシーがどのように適用され、解決されるかを理解するための鍵は、次の点にあります。

  • ポリシーの場所: IAM ポリシーは、ドメインまたは個々のユーザーまたはロールに付加できます。ポリシーがドメインに付加されている場合は、リソースベースのポリシーと呼ばれます。ユーザーまたはロールに関連付けられている場合は、ユーザーベースのポリシーと呼ばれます。
  • ポリシーの解決: IAM は、リクエストが承認されているかどうかを判断するために、リクエストに適用されるすべてのユーザーベースおよびリソースベースのポリシーを収集します。詳細については、ブログ記事「Amazon Elasticsearch Service ドメインのアクセスコントロールを設定する方法」を参照してください。

リソースベースのポリシー、ユーザーベースのポリシー、またはそれらの組み合わせを作成するかどうかにかかわらず、IAM は特定のリクエストに対してすべてのポリシーを尊重します。

Amazon ES コンソールのウィザードを使用してドメインを作成すると、Amazon Elasticsearch Service はさまざまな種類のアクセスに対していくつかのテンプレート IAM ポリシーを提供します。

  • [1 つ以上の  AWS アカウントまたは IAM ユーザーにアクセスを許可、または拒否する] を選択した場合: ドメインにアクセスする必要がある IAM ユーザーまたはロールを指定します。ドメインへのすべてのリクエストは、AWS 署名バージョン 4 署名で署名する必要があります。リクエストがドメインに到達すると、署名検証とアクセスコントロールのためにリクエストが IAM に転送されます。
  • [Allow access to the domain from specific IP(s) (特定の IP からのアドメインへのアクセスを許可する)] を選択した場合: IP または CIDR ブロックを指定します。その IP アドレス範囲からの匿名 (署名なし) リクエストは許可されます。
  • [Deny access to the domain (ドメインへのアクセスを拒否する)] を選択した場合: 署名付きまたは署名なしにかかわらず、リクエストは許可されません。
  • [Deny access to the domain (ドメインへのアクセスを拒否する)] を選択した場合: 署名付きまたは署名なしにかかわらず、リクエストは許可されません。このテンプレートを選択すると、Amazon ES からポップアップ警告が表示されます。

シンプルな IP アドレスベースのセットアップ
Amazon Elasticsearch Service を使い始めたばかりのときは、データをすばやく読み込んだり、いくつかのクエリを実行したり (コマンドラインから、または Kibana を使用して)、コマンドラインからより詳細な検査と監視を行いたいものです。オープンアクセスポリシーは、ネイティブの Elasticsearch クライアント、curl、ウェブブラウザなどのツールがクラスタとやりとりすることを可能にするため、最も早く始める方法となります。

その性質上、オープンアクセスは安全ではありません。オープンアクセスポリシーはお勧めできません。IP アドレスベースのアクセスコントロールを使用すると、ドメインを保護することができます。不正な訪問者やポートスキャナは、次のようなメッセージで拒否されます。

{“Message”: “User: anonymous is not authorized to perform: es:ESHttpGet on resource:<domain ARN>”}

開発を EC2 インスタンスから行う場合は、VPC を設定してパブリック IP アドレスまたは Elastic IP アドレス (EIP) を割り当てることができます。

この簡単なセットアップで、[Allow access to the domain from specific IPs] オプションを選択すると、次のようなポリシーを生成できます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "111.222.333.444/0"
          ]
        }
      },
      "Resource": "arn:aws:es:us-west-2:123456789012:domain/mydomain/*"
    }

 

IPアドレス、アカウント ID、ドメイン名(赤で表示) を必ず自分の値に置き換えてください。

この設定は、開発およびテストのために必要な基本アクティビティをサポートします。curl のようなツールを使ってコマンドをクラスタに直接送ることができます。その EC2 インスタンスで Kibana を実行できます。IP アドレスベースのアクセスポリシーはフルアクセスを許可します。署名されていても匿名であっても、異なる IP アドレスからのドメインへのその他のすべてのリクエストは、IAM によって拒否されます。

Kinesis Firehose、Amazon CloudWatch ログ、または AWS IoT で使用するためのセットアップ
Amazon Elasticsearch Service を開始するには、別の AWS サービスからデータを送信するのが簡単です。Kinesis Firehose ストリームを作成するか、CloudWatch ログデータを Amazon ES にストリーミングするには、これらのサービスがドメインに書き込むことを許可するロールを作成する必要があります。IAM のポリシー解決により、他のサービスからのアクセスが許可され、ドメインにデータを書き込むことができるようになります。IP アドレスベースのポリシーによって、コマンドと Kibana のための EC2 インスタンスへのアクセスが許可されます。他のすべてのリクエストへのアクセスは拒否されます。

AWS IoT の安全なアクセスを設定する方法については、IP アドレスベースのポリシーを使用する方法について説明しているブログ記事「Analyze Device-Generated Data with AWS IoT and Amazon Elasticsearch Service (AWS IoT および Amazon Elasticsearch Service によるデバイス生成データの分析)」を参照してください。

IP アドレスがわからない場合のセットアップ
多くの場合、リクエスト元の静的 IP アドレスはわかりません。データセンター内のノードのセット、モバイルアプリケーションのサポート、または自動スケーリングされた一連のウェブサーバーまたは Logstash インスタンスからのリクエストの送信で、Kibana を実行することができます。

このような場合、VPC の既知の IPアドレスでリバースプロキシを使用して、Kibana から Amazon Elasticsearch Service サービスドメインにリクエストを転送し、ユーザーベースの認証で署名バージョン 4 の署名を使用してアプリケーションサーバーからリクエストを送信できます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:role/webserver "
      },
      "Action": ["es:ESHttpGet"],
      "Resource": "arn:aws:es:us-west-2:123456789012:domain/mydomain/*"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "111.222.333.444/0",   <-- プロキシの IP アドレス
            "112.223.334.445/0"    <-- インスタンスの IP アドレス
          ]
        }
      },
      "Resource": "arn:aws:es:us-west-2:123456789012:domain/mydomain/*"
    }

  ]
}

 

プロキシサーバーの IP アドレスを IP アドレスベースのポリシーに置き換えて、プロキシからのロクエストを許可します。ポリシーに別の IP アドレスを追加して、コマンドラインと Kibana のためのアクセスを開くこともできます。

前述のポリシー例では、ウェブサーバーのロールに許可されているアクションを HTTP GET 呼び出しに制限しています。作成する IAM ポリシーでは、さまざまなコマンドと HTTP メソッドを許可しているため、さまざまなアクターのアクセスコントロールを設定できます。詳細については、ブログ記事「Amazon Elasticsearch Service ドメインのアクセスをコントロールする方法」を参照してください。

プロキシを使用してリクエストの署名を簡素化する
プロキシを作成すると、プロキシの IP アドレスをアイデンティティのソースとして使用して、ドメインへのアクセスをコントロールできます。プロキシから許可されたリクエストのみを発信することによって、アクセスをコントロールします。署名バージョン 4 のリクエスト署名を使用して、リクエストの背後にあるアイデンティティ情報を提供することもできます。Amazon ES は IAM を使用してこれらのリクエストを認証し、許可または拒否します。

リクエストの署名を実装するには、コードを記述する必要があります。これは、コマンドラインまたは Kibana のアクセスのための開発の追加ハードルになります。リクエストを受け取り、署名バージョン 4で署名し、AWS に転送する小さなアプリケーションを提供するオープンソースプロジェクトがあります。

そのような署名プロキシがここにあります: https://github.com/abutaha/aws-es-proxy。この小さなアプリケーションはポート 9200 でリッスンし、署名されたリクエストを Amazon Elasticsearch Service に転送します。

注意: この署名プロキシは、AWS ではなくサードパーティによって開発されたものです。これは開発やテストには適していますが、本番稼働用ワークロードには適していません。AWS は外部コンテンツの機能または適合性について責任を負いません。この署名プロキシを使用すると、1 つ以上の AWS アカウントまたは IAM ユーザーにアクセスを許可、または拒否する テンプレートを使用して、ドメインのポリシーを次のように設定できます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:user/susan"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:us-west-2:123456789012:domain/mydomain/*"
    }
  ]
}

 

ユーザー ARN と赤のドメイン ARN を生成されたポリシーからのものに置き換えます。プロキシを実行し、127.0.0.1:9200 でリッスンします。次に、curl を使用して Elasticsearch API 呼び出しを http://127.0.0.1:9200/ に送信すると、それらが使用するドメインに転送されます。開発マシン上で Kibana をローカルで実行する場合は、kibana.yml のhttp://127.0.0.1:9200/ を参照してください。

CloudTrail を使用してアクセスポリシーの変更を監視する
Amazon CloudTrail は、さまざまなサービスと対話するときに AWS に送信されるリクエストのログを提供します。Amazon Elasticsearch Service は、ドメインの作成、ドメイン設定の更新、タグの追加など、すべての管理アクションに対して CloudTrail イベントを送信します。CreateElasticsearchDomain および UpdateElasticsearchDomainConfig API 呼び出しの CloudTrail イベントを監視して、組織内のユーザーがドメインを作成または変更するときにアクセスポリシーを検証することをお勧めします。これらのログを使用して、すべてのアクセスポリシーを確認し、前述のプラクティスに準拠していることを確認できます。

まとめ
これで、テストと開発の際にニーズに合ったアクセスポリシーを簡単に設定できることを示すことができたと考えています。ご質問、コメント、ご提案があれば、コメントに投稿してください。

Amazon QuickSight の更新 – 地理空間の可視化、プライベートVPCアクセス、その他

AWSでは記念日を敢えて祝うことはあまりしません。100近いサービスによって、週に何度もアップデートを展開するのが当たり前になっています。(まるで週に何度もケーキを食べて、 シャンパンを飲んでいるようなものです。)それは楽しそうに聞こえますが、我々はむしろ、お客様に耳を傾け、イノベーションを起こすことに多くの時間を費やしています。とは言うものの、 Amazon QuickSight は一般提供開始から1年が経ちましたので、簡単にアップデートを紹介したいと思います!

QuickSight の事例

本日、数万のお客様(スタートアップからエンタープライズまで、交通や法律、鉱業、医療などの様々な業界)がお客様のビジネスデータの分析とレポートのためにQuickSightを利用されています。

幾つか例を上げましょう。


Gemini は負傷した労働者を弁護するカリフォルニア弁護士に法的根拠の調達サービスを提供しています。彼らは、カスタムレポートの作成や一度限りのクエリの実行から、ドリルダウンとフィルタリングを使用した動的なQuickSightダッシュボードの作成と共有までを行っています。QuickSightは、販売パイプラインの追跡、注文のスループットの測定、注文処理パイプラインでのボトルネックの特定に使用されています。

Jivochat はウェブサイト訪問者とウェブサイトの所有者とを繋ぐ、リアルタイムメッセージングプラットフォームを提供しています。QuickSightを使用して、彼らはインタラクティブなダッシュボードを作成・共有しながら、元となるデータセットへのアクセスも提供しています。これにより、静的なスプレッドシートを共有するにとどまらず、誰もが同じデータを見ていることを保証し、現時点でのデータに基づいてタイムリーな決定を下すことを後押ししています。

Transfix は、小売業、食品・飲料、製造業およびその他の業種のFortune 500に名を連ねるリテールの荷送主に、荷物にマッチする配送業者を選択でき、ロジスティクスの可視性を高める、オンライン貨物市場です。QuickSightはBIエンジニアと非技術系ビジネスユーザーの両方に分析環境を提供しています。彼らはQuickSightを通じて、輸送ルート、運送業者効率性、プロセス自動化などのビジネスの鍵となる事柄や運営指標を吟味しています。

振り返り / 先読み
QuickSightに対するフィードバックはとても役に立っています。お客様は、自社のBIインフラを設定または実行することなく、従業員がQuickSightを使用してデータに接続し、分析を実行し、データに基づいた高速な決定を下すことができていると教えてくれます。我々は頂いたフィードバックをすべて歓迎し、それを使用してロードマップを推進し、1年で40を超える新機能を導入してきました。以下はその要約です:

今後のことを考えると、お客様に興味深い傾向が見られます。データの分析方法やレポート方法を詳しく見ていくうちに、サーバーレスのアプローチがいくつかの目に見えるメリットをもたらすことに気づき始めるのです。彼らは、Amazon Simple Storage Service (S3) をデータレイクとして使用し、QuickSight と Amazon Athena のコンビネーションによってクエリを実行することで、静的なインフラストラクチャ無しに迅速で柔軟な分析環境を手にしています。また、QuickSightのダッシュボード機能を活用し、ビジネスの結果や運用メトリクスを監視し、数百人のユーザーと彼らの洞察を共有しています。もしこのようなアプローチに興味がある場合は、Building a Serverless Analytics Solution for Cleaner Cities のブログポストや、 Severless Big Data Analytics using Amazon Athena and Amazon QuickSight  のスライドを御覧ください。

新しい機能の追加と拡張
我々は、QuickSightが今後もお客様のニーズを満たすことを確認するために、お客様の声を聞き、それを学ぶことにベストを尽くしています。そして以下の7つの大きな機能をアナウンスできることを幸福に思います:

地理空間の可視化 – 位置情報データセットを地理空間に可視化することが可能になります。

プライベートVPCアクセス – VPC内、またはオンプレミスデータに対し、パブリックなエンドポイント無しにセキュアに接続できる新しい機能のプレビューに参加することができます。

フラットテーブルのサポート – ピボットテーブルに加えて、表形式レポート用のフラットテーブルを使用することができます。詳しくは Using Tabular Reports を参照ください。

SPICE上のデータに対して計算フィールドを適用する – SPICE上のデータに対して計算フィールドを適用することができます。詳しくは 分析への計算フィールドの追加 を参照ください。

ワイドテーブルのサポート – テーブルあたり1000カラムまで使用することができます。

「その他」をまとめて表示 – カーディナリティの高いロングテールデータを、「その他」としてまとめて表示することができます。詳しくは Amazon QuickSight でビジュアルタイプを使用する を参照ください。

HIPAA コンプライアンス – QuickSightでHIPAAコンプライアンス準拠のワークロードに対応できます。

地理空間の可視化
お待たせしました!地理的な識別子(国、都市、州または郵便番号)を含むデータから、数回のクリックで美しいビジュアルを作成できるようになりました。QuickSightはそれらのデータをマップ上の位置情報に変換しますし、緯度/経度にも対応しています。この機能を使用して、州ごとの売上を視覚化したり、店舗を配送先にマップしたりすることができます。視覚化のサンプルは次のとおりです。

詳しくは、Using Geospatial Charts (Maps) , と Adding Geospatial Data を参照ください。

プライベートVPCアクセスのプレビュー
もしあなたがAWS上(もしかすると Amazon RedshiftAmazon Relational Database Service (RDS)  、または EC2上かもしれません)または、パブリック接続の無いオンプレミス上のTeradataやSQL Serverにデータを保持している場合、この機能はあなたのためにあります。QuickSightのプライベートVPCアクセスは、ENIを使用してセキュアに、プライベートにVPC内のデータソースにアクセスします。AWS Direct Connect を使用してセキュアに、オンプレミス上のリソースにプライベートリンクを貼ることもできます。以下のような形です。

プレビューに参加する準備ができている場合、本日からサインアップ可能です。 

Jeff;

(翻訳:SA八木、元の記事はこちらです)

Amazon DynamoDB からのデータストリームを AWS Lambda と Amazon Kinesis Firehose を活用して Amazon Aurora に格納する

Aravind Kodandaramaiah は AWS パートナープログラムのパートナーソリューションアーキテクトです。

はじめに

AWS ワークロードを実行するお客様は Amazon DynamoDBAmazon Aurora の両方を使用していることがよくあります。Amazon DynamoDB は、どのような規模でも、一貫した、数ミリ秒台にレイテンシーを抑える必要のあるアプリケーションに適した、高速で柔軟性の高い NoSQL データベースサービスです。データモデルの柔軟性が高く、パフォーマンスが信頼できるため、モバイル、ウェブ、ゲーム、広告、IoT、他の多くのアプリケーションに最適です。

Amazon Aurora は、MySQL と互換性のあるリレーショナルデータベースエンジンで、オープンソースデータベースのコスト効率性と簡素性を備えた、高性能の商用データベースの可用性とスピードをあわせもったエンジンです。Amazon Aurora は、MySQL よりも最大 5 倍のパフォーマンスを発揮するだけでなく、商用データベースのセキュリティ、可用性、および信頼性を 10 分の 1 のコストで実現します。

DynamoDB と Aurora を連携させるために、カスタムウェブ解析エンジンを構築して、毎秒数百万のウェブクリックが DynamoDB に登録されるようにしたとします。Amazon DynamoDB はこの規模で動作し、データを高速に取り込むことができます。また、このクリックストリームデータを Amazon Aurora などのリレーショナルデータベース管理システム (RDBMS) にレプリケートする必要があるとします。さらに、ストアドプロシージャまたは関数内で SQL の機能を使用して、このデータに対してスライスアンドダイスや、さまざまな方法でのプロジェクションを行ったり、他のトランザクション目的で使用したりするとします。

DynamoDB から Aurora に効率的にデータをレプリケートするには、信頼性の高いスケーラブルなデータレプリケーション (ETL) プロセスを構築する必要があります。この記事では、AWS Lambda Amazon Kinesis Firehose によるサーバーレスアーキテクチャを使用して、このようなプロセスを構築する方法について説明します。

ソリューションの概要

以下の図に示しているのは、このソリューションのアーキテクチャです。このアーキテクチャの背後には、次のような動機があります。

  1. サーバーレス – インフラストラクチャ管理を AWS にオフロードすることで、メンテナンスゼロのインフラストラクチャを実現します。ソリューションのセキュリティ管理を簡素化します。これは、キーやパスワードを使用する必要がないためです。また、コストを最適化します。さらに、DynamoDB Streams のシャードイテレーターに基づいた Lambda 関数の同時実行により、スケーリングを自動化します。
  2. エラー発生時に再試行可能 – データ移動プロセスには高い信頼性が必要であるため、プロセスは各ステップでエラーを処理し、エラーが発生したステップを再試行できる必要があります。このアーキテクチャではそれが可能です。
  3. 同時データベース接続の最適化 – 間隔またはバッファサイズに基づいてレコードをバッファすることで、Amazon Aurora への同時接続の数を減らすことができます。この方法は、接続タイムアウトを回避するのに役立ちます。
  4. 懸念部分の分離 – AWS Lambda を使用すると、データレプリケーションプロセスの各懸念部分を分離できます。たとえば、抽出フェーズを DynamoDB ストリームの処理として、変換フェーズを Firehose-Lambda 変換として、ロードフェーズを Aurora への一括挿入として分離できます。

以下に示しているのは、このソリューションのしくみです。

  1. DynamoDB Streams がデータソースです。DynamoDB Streams を使用すると、DynamoDB テーブル内の項目が変更されたときに、その変更を取得できます。AWS Lambda は、新しいストリームレコードを検出すると、Lambda 関数を同期的に呼び出します。
  2. Lambda 関数は、DynamoDB テーブルに新たに追加された項目をバッファし、これらの項目のバッチを Amazon Kinesis Firehose に送ります。
  3. Firehose は、受け取ったデータを Lambda 関数により変換して、Amazon S3 に配信します。Firehose に対してデータ変換を有効にしていると、Firehose は受け取ったデータをバッファし、バッファしたデータのバッチごとに、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは Lambda から Firehose に返されてバッファされます。
  4. Firehose は変換されたすべてのレコードを S3 バケットに配信します。
  5. Firehose は変換されないすべてのレコードも S3 バケットに配信します。ステップ 4 と 5 は同時に実行されます。Amazon SNS トピックをこの S3 バケットに登録して、以降の通知、修復、再処理に使用できます (通知に関する詳細はこのブログ記事では取り上げません)。
  6. Firehose が変換されたデータを S3 に正常に配信するたびに、S3 はイベントを発行することで Lambda 関数を呼び出します。この Lambda 関数は VPC 内で実行されます。
  7. Lambda 関数は Aurora データベースに接続し、SQL 式を実行して、S3 から直接テキストファイルにデータをインポートします。
  8. Aurora (VPC プライベートサブネット内で実行) は、S3 VPC エンドポイントを使用して S3 からデータをインポートします。

ソリューションの実装とデプロイ
次に、このソリューションを機能させるために必要な手順について説明します。以下の手順では、AWS CloudFormation スタックを起動して一連の AWS CLI コマンドを実行することで、VPC 環境を作成する必要があります。

AWS サービスを使用してこれらの手順を実行している間、AWS サービスの料金が適用されることがあります。

ステップ 1: ソリューションのソースコードをダウンロードする
このブログ記事で概説したソリューションでは、多くの Lambda 関数を使用し、また、多くの AWS Identity and Access Management (IAM) ポリシーおよびロールを作成します。このソリューションのソースコードは以下の場所からダウンロードします。

git clone https://github.com/awslabs/dynamoDB-data-replication-to-aurora.git

このリポジトリには、以下のフォルダ構造があります。このブログ記事の後続の手順を実行するために、lambda_iam フォルダに移動します。

ステップ 2: Firehose 配信用の S3 バケットを作成する
Amazon Kinesis Firehose を使用すると、Amazon S3 にリアルタイムのストリーミングデータを配信できます。そのためには、まず S3 バケットを作成します。次に、レコードの処理に失敗した場合に備えて、変換された最終のレコードとデータバックアップを保存するフォルダを作成します。

aws s3api create-bucket --bucket bucket_name

aws s3api put-object \
--bucket bucket_name \
--key processed/

aws s3api put-object \
--bucket bucket_name
--key tranformation_failed_data_backup/

 

ステップ 3: IAM ポリシー、S3 イベント通知、Firehose-S3 配信設定ファイルを変更する
次に、以下のファイルで、プレースホルダー AWS_REGION、

AWS_ACCOUNT_NUMBER、BUCKET_NAME をそれぞれ、お客様の AWS リージョン ID、AWS アカウント番号、ステップ 2 で作成した S3 バケットの名前に置き換えます。

 

·         aurora-s3-access-Policy.json

·         DynamoDb-Stream-lambda-AccessPolicy.json

·         firehose_delivery_AccessPolicy.json

·         lambda-s3-notification-config.json

·         s3-destination-configuration.json

·         firehose_delivery_trust_policy.json

ステップ 4: CloudFormation を使用して Aurora クラスターを設定する
次に、[Launch Stack] ボタンをクリックして AWS CloudFormation スタックを起動します。CloudFormation テンプレートは、VPC を作成し、その VPC のパブリックおよびプライベートサブネットを設定します。また、このテンプレートは、プライベートサブネット内で Amazon Aurora データベースクラスターを起動し、パブリックサブネット内でパブリック IP を割り当てた踏み台ホストも起動します。


ステップ 5: Aurora DB クラスターを設定する
CloudFormation スタックが完成したら、S3 バケット内のテキストファイルから DB クラスターにデータをロードするように、Aurora クラスターを変更する必要があります。以下に示しているのは、そのための手順です。

Amazon Aurora が Amazon S3 にアクセスできるようにします。そのためには、IAM ロールを作成し、先ほど作成した信頼およびアクセスポリシーをそのロールにアタッチします。

auroraS3Arn=$(aws iam create-role \
    --role-name aurora_s3_access_role \
    --assume-role-policy-document file://aurora-s3-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)

aws iam put-role-policy \
    --role-name aurora_s3_access_role \
    --policy-name aurora-s3-access-Policy \
    --policy-document file://aurora-s3-access-Policy.json

 

その IAM ロールを Aurora DB クラスターに関連付けます。そのためには、新しい DB クラスターパラメータグループを作成し、その DB クラスターに関連付けます。

aws rds add-role-to-db-cluster \
    --db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
    --role-arn $auroraS3Arn

aws rds create-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --db-parameter-group-family aurora5.6 \
    --description 'Aurora cluster parameter group - Allow access to Amazon S3'

aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --parameters "ParameterName=aws_default_s3_role,ParameterValue= $auroraS3Arn,ApplyMethod=pending-reboot"

aws rds modify-db-cluster \
	--db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
	--db-cluster-parameter-group-name webAnayticsclusterParamGroup

 

プライマリ DB インスタンスを再起動します。

aws rds reboot-db-instance \
--db-instance-identifier Output PrimaryInstanceId from CloudFormationF Stack 

ステップ 6: DynamoDB ストリームと、そのストリームを処理する Lambda 関数を設定する

1.    ストリームを有効にして新しい DynamoDB テーブルを作成します。以降の手順では、AWS Lambda 関数をストリームに関連付けることで、トリガーを作成します。

aws dynamodb create-table \
    --table-name web_analytics \
    --attribute-definitions AttributeName=page_id,AttributeType=S AttributeName=activity_dt,AttributeType=S \
    --key-schema AttributeName=page_id,KeyType=HASH AttributeName=activity_dt,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=50,WriteCapacityUnits=50 \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

2.    Lambda 実行ロールを作成します。

DdbStreamLambdaRole=$(aws iam create-role \
    --role-name DdbStreamLambdaRole \
    --assume-role-policy-document file://DynamoDB-Stream-lambda-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)


aws iam put-role-policy \
    --role-name DdbStreamLambdaRole \
    --policy-name DdbStreamProcessingAccessPolicy \
    --policy-document file://DynamoDb-Stream-lambda-AccessPolicy.json

3.  DynamoDB ストリームを処理する Lambda 関数を作成します。

aws lambda create-function \
    --function-name WebAnalyticsDdbStreamFunction \
    --zip-file fileb://ddbStreamProcessor.zip \
    --role $DdbStreamLambdaRole \
    --handler ddbStreamProcessor.handler \
    --timeout 300 \
    --runtime nodejs4.3

 

4.  その Lambda 関数を DynamoDB ストリームに関連付けることで、トリガーを作成します。

tableStreamArn=$(aws dynamodb describe-table --table-name web_analytics --query 'Table.LatestStreamArn' --output text)

aws lambda create-event-source-mapping \
    --function-name WebAnalyticsDdbStreamFunction \
    --event-source-arn $tableStreamArn \
    --batch-size 100 \
    --starting-position LATEST

ステップ 7: Firehose のデータ変換 Lambda 関数を作成して設定する

Lambda 実行ロールを作成します。

transRole=$(aws iam create-role \
            --role-name firehose_delivery_lambda_transformation_role \
            --assume-role-policy-document  file://firehose_lambda_transformation_trust_policy.json \
            --query 'Role.Arn' --output text)

aws iam put-role-policy \
        --role-name firehose_delivery_lambda_transformation_role \
        --policy-name firehose_lambda_transformation_AccessPolicy \
        --policy-document file://firehose_lambda_transformation_AccessPolicy.json

2.  データ変換 Lambda 関数を作成します。

aws lambda create-function \
    --function-name firehoseDeliveryTransformationFunction \
    --zip-file fileb://firehose_delivery_transformation.zip \
    --role $transRole \
    --handler firehose_delivery_transformation.handler \
    --timeout 300 \
    --runtime nodejs4.3 \
    --memory-size 1024

この関数は、受け取ったストリームのレコードを JSON スキーマに対して検証します。スキーマに一致したら、受け取った JSON レコードを解析し、カンマ区切り値 (CSV) 形式に変換します。

'use strict';

var jsonSchema = require('jsonschema');

var schema = { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { "Hits": { "type": "integer" }, "device": { "type": "object", "properties": { "make": { "type": "string" }, "platform": { "type": "object", "properties": { "name": { "type": "string" }, "version": { "type": "string" } }, "required": ["name", "version"] }, "location": { "type": "object", "properties": { "latitude": { "type": "string" }, "longitude": { "type": "string" }, "country": { "type": "string" } }, "required": ["latitude", "longitude", "country"] } }, "required": ["make", "platform", "location"] }, "session": { "type": "object", "properties": { "session_id": { "type": "string" }, "start_timestamp": { "type": "string" }, "stop_timestamp": { "type": "string" } }, "required": ["session_id", "start_timestamp", "stop_timestamp"] } }, "required": ["Hits", "device", "session"] };

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');

        var rec = JSON.parse(entry);
        console.log('Decoded payload:', entry);
        var milliseconds = new Date().getTime();

        var payl = JSON.parse(rec.payload.S);

        var jsonValidator = new jsonSchema.Validator();
        var validationResult = jsonValidator.validate(payl, schema);
        console.log('Json Schema Validation result = ' + validationResult.errors);

        if (validationResult.errors.length === 0) {

            const result = `${milliseconds},${rec.page_id.S},${payl.Hits},
				${payl.session.start_timestamp},
				${payl.session.stop_timestamp},${payl.device.location.country}` + "\n";

            const payload = (new Buffer(result, 'utf8')).toString('base64');
            console.log(payload);
            success++;

            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        }
        else {
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            }
        }

    });

    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });

};

ステップ 8: Firehose 配信ストリームを作成し、S3 にデータを配信するように設定する

Amazon S3 ターゲットを使用するとき、Firehose は S3 バケットにデータを配信します。配信ストリームを作成するには、IAM ロールが必要です。Firehose はその IAM ロールを引き受けることで、指定したバケットとキーにアクセスする権限を取得します。Firehose はまた、その IAM ロールを使用することで、Amazon CloudWatch ロググループにアクセスし、データ変換 Lambda 関数を呼び出す権限を取得します。

1.    S3 バケット、キー、CloudWatch ロググループ、データ変換 Lambda 関数にアクセスする権限を付与する IAM ロールを作成します。

aws iam create-role \
    --role-name firehose_delivery_role \
    --assume-role-policy-document  file://firehose_delivery_trust_policy.json

aws iam put-role-policy \
    --role-name firehose_delivery_role \
    --policy-name firehose_delivery_AccessPolicy \
    --policy-document file://firehose_delivery_AccessPolicy.json

 

2.  S3 ターゲット設定を指定して、Firehose 配信ストリームを作成します。

aws firehose create-delivery-stream \
    --delivery-stream-name webAnalytics \
    --extended-s3-destination-configuration='CONTENTS OF s3-destination-configuration.json file' 

3.  AWS マネジメントコンソールにサインインし、Firehose コンソールに移動します。webAnalytics という名前の配信ストリームを選択します。[Details] タブで、[Edit] を選択します。[Data transformation][Enabled] を、[IAM role] [firehose_delivery_role] を選択します。[Lambda function] で、[firehoseDeliveryTransformationFunction] を選択します。次に、[Save] を選択してこの設定を保存します。

ステップ 8: Lambda 関数を作成して、VPC リソースにアクセスするように設定する
S3 バケットから Amazon Aurora にデータをインポートするには、VPC 内のリソースにアクセスするように Lambda 関数を設定します。

1.    Lambda 関数の IAM 実行ロールを作成します。

auroraLambdaRole=$(aws iam create-role \
                --role-name lambda_aurora_role \
                --assume-role-policy-document file://lambda-aurora-Trust-Policy.json \
                --query 'Role.Arn' --output text)

aws iam put-role-policy \
    --role-name lambda_aurora_role \
    --policy-name lambda-aurora-AccessPolicy \
    --policy-document file://lambda-aurora-AccessPolicy.json

2.  プライベートサブネットやセキュリティグループなどの VPC 設定を指定する Lambda 関数を作成します。CLI の実行中に渡される環境変数 AuroraEndpoint、dbUser (データベースユーザー)、dbPassword (データベースパスワード) に正しい値を設定していることを確認します。これらの値については、CloudFormation スタック出力を参照してください。

aws lambda create-function \
    --function-name AuroraDataManagementFunction \
    --zip-file fileb://AuroraDataMgr.zip \
    --role $auroraLambdaRole \
    --handler dbMgr.handler \
    --timeout 300 \
    --runtime python2.7 \
    --vpc-config SubnetIds='Output PrivateSubnets from CloudFormation stack',SecurityGroupIds='Output DefaultSecurityGroupId from CloudFormation stack' \
    --memory-size 1024 \    
    --environment='    
                    {
                        "Variables": {
                            "AuroraEndpoint": "Output AuroraEndpoint from CloudFormation stack",
                            "dbUser": "Database User Name",
                            "dbPassword": "Database Password"
                        }
                    }'

 

Lambda 関数は Aurora データベースに接続します。この関数は、LOAD DATA FROM S3 SQL コマンドを実行して、S3 バケット内のテキストファイルから Aurora DB クラスターにデータをロードします。

import logging import pymysql import boto3 import sys import os
# rds settings
db_name = "Demo"
dbUser = os.environ['dbUser']
dbPassword = os.environ['dbPassword']
AuroraEndpoint = os.environ['AuroraEndpoint']

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
try:
    conn = pymysql.connect(AuroraEndpoint, user=dbUser, passwd=dbPassword, db=db_name, connect_timeout=5)
except:
    logger.error("ERROR: Unexpected error: Could not connect to Aurora instance.")
    sys.exit()

logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")

def handler(event, context):

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        s3location = 's3://' + bucket + '/' + key
        logger.info(s3location)

        sql = "LOAD DATA FROM S3 '" + s3location + "' INTO TABLE Demo.WebAnalytics FIELDS TERMINATED BY ',' " \
              "LINES TERMINATED BY '\\n' (timestarted, page_id, hits, start_time, end_time, country_code);"

        logger.info(sql)

        with conn.cursor() as cur:
            cur.execute(sql)
            conn.commit()
            logger.info('Data loaded from S3 into Aurora')

ステップ 9: S3 イベント通知を設定する
最後に、S3 がイベントを発行することで、前のステップで作成した Lambda 関数を呼び出すように、その関数を設定します。このプロセスの最初の手順は、Lambda 関数を呼び出すアクセス権限を S3 に付与することです。

1.    Lambda 関数を呼び出すアクセス権限を S3 に付与します。

aws lambda add-permission \
    --function-name AuroraDataManagementFunction \
    --action “lambda:InvokeFunction” \
    --statement-id Demo1234 \
    --principal s3.amazonaws.com \
    --source-arn ‘ARN of S3 Bucket created in Step 2’ \
    --source-account ‘AWS Account Id

2.    S3 バケット通知を設定します。

aws s3api put-bucket-notification-configuration \
    --bucket 'Name of S3 Bucket created in step 2' \
    --notification-configuration=' CONTENTS OF lambda-s3-notification-config.json ' 

 

ステップ 10: ソリューションをテストする
最後のステップは、ソリューションをテストすることです。

  1. このブログ記事のソースコードの TestHarness フォルダには、テストハーネスがあります。このテストハーネスは DynamoDB テーブルにデータを読み込みます。まず、TestHarness フォルダに移動し、コマンドノード loadDataToDDb.js を実行します。
  2. Secure Shell (SSH) を使用して、踏み台ホストに接続します。SSH を使用した接続の詳細については、EC2 のドキュメントを参照してください。
  3. 踏み台ホストのブートストラッププロセス中に MySQL クライアントがインストールされたため、以下のコマンドを使用して Aurora データベースに接続できます。パラメータ値を適切に変更していることを確認します。
/usr/bin/mysql -h DatabaseEndpoint -u DatabaseUsername --password=DatabasePassword 

4.    コネクションが成功したら、以下のコマンドを実行します。

 mysql> select count(*) from Demo.WebAnalytics;

このコマンドを実行した後、テーブルにレコードが読み込まれています。

テーブルにレコードが読み込まれていない場合、Firehose はそれらのレコードを S3 に送信する前にバッファしている可能性があります。これを回避するには、1 分くらい後に同じ SQL コードを再試行してください。この間隔は、現在設定されている S3 バッファ間隔の値に基づきます。このコードを再試行した後、Amazon Aurora にレコードが挿入されています。

 

まとめ

DynamoDB Streams と、Amazon Kinesis Firehose のデータ変換関数を使用すると、DynamoDB から Amazon Aurora などのデータソースにデータをレプリケートする強力でスケーラブルな方法を得られます。このブログ記事では、DynamoDB から Aurora へのデータのレプリケートを取り上げましたが、同じ一般的なアーキテクチャパターンを使用すれば、他のストリーミングデータを変換して、Amazon Aurora に取り込むことができます。

さらに、以下の関連するブログ記事を参照してください。

 

AWS Glue と Amazon S3 を使用してデータレイクの基礎を構築する

データレイクは、大量の様々なデータを扱うという課題に対処するため、データを分析および保存するための方法としてますます一般的になっています。データレイクを使うと、組織は全ての構造化データおよび非構造化データを1つの中央リポジトリに格納できます。データはそのまま保存できるため、あらかじめ定義されたスキーマに変換する必要はありません。

多くの組織は AWS をデータレイクとして使う価値を理解しています。例えば Amazon S3 は高い耐久性があり、コンピューティングとストレージの分離をしながら、オープンデータフォーマットをサポートする費用対効果の高いオブジェクトの開始ができ、全てのAWS 分析サービスと連携します。Amazon S3 はデータレイクの基礎を提供しますが、他のサービスを追加してビジネスニーズに合わせることができます。AWS のデータレイク構築の詳細については What is a Data Lake? を参照してください。

データレイクを使う主な課題は、データの検索とスキーマやデータフォーマットの理解であるため、Amazonは AWS Glue をリリースしました。AWS Glue は Amazon S3 データレイクからデータ構造と形式を発見することで、迅速にビジネスの洞察を導き出すために要する時間と労力を大幅に削減します。AWS Glue は Amazon S3 上のデータを自動的にクロールし、データフォーマットを特定し、他の AWS 分析サービスで使用するためのスキーマを提案します。

この記事では、AWS Glue を使って Amazon S3 上のデータをクロールする方法と他のAWSサービスで使用できるメタデータストアを構築するプロセスを説明します。

AWS Glue の特徴

AWS Glue はフルマネージドのデータカタログとETL(抽出、変換、ロード)サービスで、データの発見、変換、およびジョブスケジューリングなどの困難で時間のかかる作業を簡素化し自動化します。AWS Glue はデータソースをクロールし、CSV, Apache Parquet, JSON などの一般的なデータフォーマットとデータタイプ用に事前作成された Classifire を使用してデータカタログを構築します。

AWS Glue はモダンなデータアークテクチャーのコンポーネントである S3, Amazon RDSAmazon AthenaAmazon RedshiftAmazon Redshift Spectrum と統合されているため、データの移動と管理をシームレスに調整します。

AWS Glue データカタログは Apache Hive メタストアと互換性があり、Hive, Presto, Apache Spark, Apache Pig などの一般的なツールをサポートしています。 Amazon Athena, Amazon EMR, Amazon Redshift Spectrum とも直接統合されています。

さらに AWS Glue データカタログには、使いやすさとデータ管理のための以下の機能があります。

  • 検索でデータを発見する
  • 分類されたファイルの識別と解析
  • スキーマの変更をバージョン管理

詳細は AWS Glue 製品の詳細 を参照してください。

Amazon S3 データレイク

AWS Glue は S3 データレイクの必須コンポーネントで、モダンなデータ分析にデータカタログとデータ変換サービスを提供します。

 

 

上の図では様々な分析ユースケースに対してデータがステージングされています。最初にデータは生の形式で不変のコピーとして取り込まれます。次にデータは変換され各ユースケースに対してより価値あるものになります。この例では、生の CSV ファイルは Amazon Athena がパフォーマンスを向上しコストを削減するために使用する Apache Parquet に変換されています。

データはさらなる洞察を得るために他のデータセットとブレンドすることができます。AWS Glue クローラは、ジョブトリガーまたは事前定義されたスケジュールに基いてデータベースの各ステージごとにテーブルを作成します。この例では、S3 に新しいファイルが追加されるたびに AWS Lambda 関数を使って ETL プロセスを実行しています。このテーブルは、Amazon Athena, Amazon Redshift Spectrum, および Amazon EMR が標準 SQL または Apache Hive を使用して任意の段階でデータを照会するために使用できます。この構成は、さまざまなデータからビジネス価値を迅速かつ容易に導き出すためのアジャイルビジネスインテリジェンスを提供する一般的な設計パターンです。

チュートリアル

このチュートリアルでは、データベースを定義し、Amazon S3 バケットのデータを探索するクローラを設定し、テーブルを作成し、CSV ファイルを Parquet に変換し、Parquet データのテーブルを作成し、Amazon Athena でデータを照会します。

データを発見する

AWS マネージメントコンソールにサインインし AWS Glue コンソールを開きます。AWS Glue は分析のセクションにあります。対応リージョンは米国東部(バージニア北部)、米国東部(オハイオ)、米国西部(オレゴン)で今後も対応リージョンは頻繁に追加していきます。

データの発見の第一歩はデータベースを追加することです。データベースはテーブルの集まりです。

  1. コンソールで Add database を選択し、Database name nycitytaxi と入力し Create をクリックします。
  2. 左側のメニューの Tables をクリックし、テーブルは列の名前、データ型の定義、およびその他のデータセットに関するメタデータで構成されています。
  3. データベース nycitytaxi にテーブルを追加します。手動またはクローラを使ってテーブルを追加できます。 クローラは、データストアに接続し、順位付けされた Classifier を使用してデータのスキーマを決定するプログラムです。 AWS Glue はCSV, JSON, Avro などの一般的なファイルタイプの Classifier を提供します。grok パターンを使用してカスタム Classifier を作成することもできます。
  4. クローラを追加します。Data store に S3 、Include path に s3://aws-bigdata-blog/artifacts/glue-data-lake/data/ を選びます。この S3 バケットには2017年1月のグリーンタクシーの全ての乗車データがあります。
  5. Next をクリックします。
  6. IAM role でドロップダウンから AWSGlueServiceRoleDefault を選択します。
  7. Frequency   Run on demand を選択します。クローラはオンデマンド実行やスケジュール実行が可能です。
  8. Database は nycitytaxi を選んでください。スキーマの変更をAWS Glueがどのように処理するのかを理解することが重要で、適切な処理方法を選択することが可能です。この例ではいかなる更新でも(データカタログ上)のテーブルが更新されます。 スキーマ変更の詳細については  Cataloging Tables with a Crawler を参照ください。
  9. 手順を確認して、Finish をクリックし、クローラは実行準備ができているので Run it now を選択します。
  10. クローラが終了すると、テーブルが1つ追加されます。
  11. 左側のメニューの Tables をクリックし data を選択します。この画面ではテーブルのスキーマ、プロパティ、およびその他の重要な情報を説明します。

データを CSV 形式から Parquet 形式に変換する

これでデータを CSV から Parquet に変換するジョブを設定して実行することができます。Parquet は Amazon Athena や Amazon Redshift Spectrum などの AWS 分析サービスに適したカラムナフォーマットです。

  1. 左側のメニューの ETL の下の Jobs をクリックし、Add job をクリックします。
  2. Name に nytaxi-csv-parquet を入力します
  3. IAM role AWSGlueServiceRoleDefault を選択します
  4. This job runs  A proposed script generated by AWS Glue を選択します。
  5. スクリプトを保存する任意の S3 パスを入力します。
  6. 一時ディレクトリ用のS3パスを入力します。
  7. Next をクリックします。
  8. データソースとして data を選択します。
  9. Create tables in your data target にチェックを入れる。
  10. Formatで Parquet を選択します。
  11. 結果を保存する新しい場所(既存オブジェクトがない新しいプレフィックス場所)を選択します。
  12. スキーマのマッピングを確認し、Finish をクリックします。
  13. ジョブを表示します。この画面では完全なジョブの表示を提供し、ジョブを編集、保存、実行することができます。AWS Glue がこのスクリプトを作成しましたが、必要に応じて自分で作成することもできます。
  14. Save をクリックし、Run job をクリックします。

Parquet テーブルとクローラを追加する

ジョブ終了したら、クローラを使って Parquet データ用の新しいテーブルを追加します。

  1. Crawler namenytaxiparquet を入力します。
  2. Data store で S3 を選択します。
  3. Include path で ETL で選択したS3パスを入力します。
  4. IAM roleAWSGlueServiceRoleDefault を選択します。
  5. FrequencyRun on demand を選択します。
  6. Databasenycitytaxi を選択します。

 

クローラが終了した後、nycitytaxi データベースには、CSVデータ用テーブルと変換された Parquet データ用テーブルの2つのテーブルがあります。

Amazon Athena でデータ分析する

Amazon Athena は、標準 SQL を使って Amazon S3 のデータを簡単に分析できるインタラクティブなクエリサービスです。Athena は CSVデータを照会することができますが Parquet のファイルフォーマットにするとデータクエリの時間を大幅に削減できます。詳細については Analyzing Data in Amazon S3 using Amazon Athena を参照してください。

Amazon Athena で AWS Glue を使用するには、Athena データカタログを AWS Glue Data Catalog にアップグレードする必要があります。Athena データカタログのアップグレード詳細については、この step-by-step guide を参照してください。

  1. マネージメントコンソール で Athena を開きます。 クエリエディタのデータベース nycitytaxi に2つテーブルが表示されています。

標準SQLを使ってクエリできます。

  1. nytaxigreenparquet を選択します。
  2. Select * From “nycitytaxi”.”data” limit 10; を入力します。
  3. Run Query をクリックします。

 

まとめ

この記事は AWS Glue と Amazon S3 を使ってデータレイクの基礎を構築することがどれほど簡単かを示しています。 AWS Glue を使って S3のデータをクロールし、Apache Hive と互換性のあるメタストアを構築することで、AWS のアナリティクスサービスと一般的な Hadoop エコシステムでこのメタデータを使うことができます。この組み合わせは強力で使いやすいため、より早くビジネスの洞察を得ることができます。

追加情報

詳細については次のブログを参照してください。

 

翻訳は上原が担当しました。(原文はこちら)

Amazon Kinesis Firehose, Amazon Athena, Amazon QuickSightを用いたVPCフローログの分析

多くの業務や運用において、頻繁に更新される大規模なデータを分析することが求められるようになっています。例えばログ分析においては、振る舞いのパターンを認識したり、アプリケーションのフロー分析をしたり、障害調査をしたりするために大量のログの可視化が必要とされます。

VPCフローログAmazon VPCサービス内のVPCに属するネットワークインターフェースを行き来するIPトラフィック情報をキャプチャします。このログはVPC内部に潜む脅威やリスクを認識したり、ネットワークのトラフィック・パターンを調査するのに役立ちます。フローログはAmazon CloudWatchログに格納されます。いったんフローログを作成すれば、Amazon CloudWatchログを用いて見たり取り出したりすることができるようになります。

フローログは様々な業務を助けてくれます。例えば、セキュリティグループのルールを過度に厳しくしすぎたことによって特定のトラフィックがインスタンスに届かない事象の原因調査などです。また、フローログを、インスタンスへのトラフィックをモニタリングするためのセキュリティツールとして使うこともできます。

この記事はAmazon Kinesis FirehoseAWS LambdaAmazon S3Amazon Athena、そしてAmazon QuickSightを用いてフローログを収集し、格納し、クエリを実行して可視化するサーバーレス・アーキテクチャを構成する手順を示します。構成する中で、Athenaにおいてクエリにかかるコストや応答時間を低減させるための圧縮やパーティショニング手法に関するベストプラクティスを学ぶこともできることでしょう。

ソリューションのサマリ

本記事は、3つのパートに分かれています。

  • Athenaによる分析のためにVPCフローログをS3へ格納。このセクションではまずフローログをLambdaとFirehoseを用いてS3に格納する方法と、格納されたデータにクエリを発行するためAthena上のテーブルを作成する方法を説明します。
  • QuickSightを用いてログを可視化。ここではQuickSightとQuickSightのAthenaコネクタを用いて分析し、その結果をダッシュボードを通じて共有する方法を説明します。
  • クエリのパフォーマンス向上とコスト削減を目的とした、Athenaにおけるデータのパーティション化。このセクションではLambda関数を用いてS3に格納されたAthena用のデータを自動的にパーティション化する方法を示します。この関数はFirehoseストリームに限らず、他の手段でS3上に年/月/日/時間のプリフィックスで格納されている場合でも使用できます。

パーティショニングはAthenaにおいてクエリのパフォーマンス向上とコスト削減を実現するための3つの戦略のうちの1つです。他の2つの戦略としては、1つはデータの圧縮、そしてもう1つはApache Parquetなどの列指向フォーマットへの変換があります。本記事では自動的にデータを圧縮する方法には触れますが、列指向フォーマットへの変換については触れません。本ケースのように列指向フォーマットへの変換を行わない場合でも、圧縮やパーティショニングは常に価値のある方法です。さらに大きなスケールでのソリューションのためには、Parquetへの変換も検討して下さい。

VPCフローログを分析するためのサーバレスアーキテクチャ

以下の図はそれぞれのサービスがどのように連携するかを示しています。

VPC_Flowlogs_Ian_Ben

VPCにフローログを作成すると、ログデータはCloudWatchログのロググループとして発行されます。CloudWatchログのサブスクリプションを利用することにより、S3に書き込むためにFirehoseを用いたLambda関数に対して、リアルタイムにログデータイベントを送り込むことが可能になります。

 

いったんS3にログデータが格納され始めれば、Athenaを利用してSQLクエリをアドホックに投入することができます。ダッシュボードを構築したり、画面からインタラクティブにデータを分析したりすることを好む場合には、Athenaに加えQuickSightによるリッチな可視化を簡単に構成できます。

Athenaの分析を目的としたS3へのVPCフローログの送信

この章では、Athenaによるクエリを可能とするためにフローログデータをS3に送信する方法を説明します。この例ではus-east-1リージョンを使用していますが、AthenaとFirehoseが利用できるのであればどのリージョンでも可能です。

Firehoseデリバリーストリームの作成

既存もしくは新しいS3バケットを格納先とするFirehoseデリバリーストリームを作成するためには、この手順を参考にして下さい。ほとんどの設定はデフォルトで問題ありませんが、格納先のS3バケットへの書き込み権限を持つIAMロールを選択し、GZIP圧縮を指定して下さい。デリバリーストリームの名前は‘VPCFlowLogsDefaultToS3’とします。

VPCフローログの作成

まず、この手順に従ってデフォルトVPCのVPCフローログを有効にしましょう。(訳注:デフォルトVPC以外の任意のVPCで構いません。)

Firehoseに書き込むLambda用のIAMロールの作成

Firehoseに書き込むLambda関数を作成する前に、Firehoseにバッチ書き込みを許可するLambda用のIAMロールを作成する必要があります。次のように定義されるインラインアクセスポリシーを組み込んだ‘lambda_kinesis_exec_role’という名前のLambda用ロールを作成して下さい。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/VPCFlowLogsDefaultToS3"
            ]
        }
    ]
}

 

CloudWatchログからFirehoseに配信するLambda関数の作成

ログイベントをCloudWatchから先に作成したFirehoseデリバリーストリームである‘VPCFlowLogsDefaultToS3’に配信するためのLamdba関数を作成するには、次の手順を実施します。

  1. Lambdaのコンソールから一から作成を選択して新しいLambda関数を作成します。
  2. 基本的な情報ページでは関数に‘VPCFlowLogsToFirehose’と名付けます。さらにロールには先の手順で作成した‘lambda_kinesis_exec_role’を指定します。
  3. 関数コードにおいて、Python 2.7ランタイムを選択し、このGitHub上のコードをコードパネルにコピーします。環境変数にはDELIVERY_STREAM_NAMEという名前の変数を追加します。この変数の値には本手順の最初に作成したデリバリーストリームの名前(‘VPCFlowLogsDefaultToS3’)を設定します。また、タイムアウトは1分にします。

o_vpc-flow_2

Lambda関数に対するCloudWatchサブスクリプションの作成

CloudWatchログにて、次の手順を実施します。

  1. VPCフローログのロググループを選択します(フローログを作成したばかりであればロググループが表示されるまで数分待つ必要がある場合があります)。
  2. アクションからLambdaサービスへのストリーミングの開始を選択します。

  1. 先の手順で作成したLambda関数 ‘VPCFlowLogsToFirehose’ を選択します。
  2. ログの形式は Amazpn VPC フローログ を選択します。

Athenaでのテーブル作成

Amazon Athenaは、特に他のインフラストラクチャをプロビジョニングしたり管理することなく、標準的なSQLによりS3に格納されたデータを問い合わせることを可能にします。AthenaはCSV、JSON、ParquetやORCなど様々な標準的なデータ形式を扱え、問合せ前にそれらのデータを変換する必要はありません。スキーマを定義し、そしてAWSマネジメントコンソールのクエリエディタやAthena JDBCドライバを使用したプログラムからクエリを実行するだけです。

AthenaはHiveメタストアと互換性のあるデータカタログにデータベースとテーブル定義を格納します。本記事では、フローログを1つのテーブル定義として作成します。テーブルに対するDDLについてはこのセクションの後半で述べます。DDL実行前に、次の点に注意しておいて下さい。

    • DDL上の<bucket_and_prefix>はFirehoseからフローログの書き込み先として実際に作成したS3バケットの名前に書き換えます(プリフィックスも含めるのを忘れずに)。
    • CREATE TABLE定義にEXTERNALというキーワードが含まれています。もしこのキーワードを除くと、Athenaはエラーを返します。EXTERNALキーワードはS3に格納されている元データに影響を与えずに、データカタログへテーブルメタデータを格納されるようにします。もし外部テーブルを削除したとしても、カタログからはメタデータが削除されますが、S3上のデータは維持されます。
    • vpc_glow_logsテーブルのカラムはフローログレコード上のフィールドにマッピングされます。フローログレコードはスペース区切りの文字列を含みます。各行のフィールドを解析するために、Athenaはどのようにデータを扱うべきかを示すカスタムライブラリとしてシリアライズ-デシリアライズクラス、またはSerDeを用います。
  • ここでは、スペース区切りのフローログレコードを解析するためにSerDe用の正規表現を指定します。この正規表現は”input.regex”というSerDeのプロパティによって提供されます。

Athenaのクエリエディタに以下のDDLを入力し、Run Queryをクリックして下さい。

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

 

Athenaでのデータ問合せ

テーブルを作成した後は、テーブル名の隣にある小さなアイコンからPreview tableを実行し、レコードセットのサンプルを確認しましょう(訳注:VPCフローログをクリーニングせずに投入しているため、サンプリングされる箇所によってはnullのレコードが表示される場合もあります)。

o_vpc-flow_5

フローログを調査するために様々なクエリを実行することができます。ここでは例として、拒否されたトラフィックの中からソースIPアドレスのトップ25を取得してみます。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

o_vpc-flow_6

QuickSightによるログの可視化

QuickSightはわずか数度のクリックでAthena上のテーブルを可視化することを可能にします。AWSアカウントによってQuickSight上にサインアップすることができ、1GBのSPICEキャパシティ無料利用枠を持つユーザアカウント1つを取得できます。

QuickSightをAthenaに接続させる前に、ここで説明されている手順を参考に、QuickSightに対するAthena及び関連するS3バケットへのアクセス権限の付与を確認します。

QuickSightにログインし、Manage dataNew data setと選択します。データソースとしてAthenaを選びます。

o_vpc-flow_7

データソースに“AthenaDataSource”と名前を付けます。defaultスキーマ→vpc_flow_logsテーブルと選択します。

o_vpc-flow_8

Edit/Preview dataを選択します。starttimeとendtimeのデータは数値形式よりも日付形式が適しています。これらの2つのフィールドはフローログのキャプチャウィンドウとシステムに到達した時刻をUnix時間で表現しています。

o_vpc-flow_9

ここでSave and visualizeを選択します。

異なるstart timeのキャプチャウィンドウとその時の送信バイト数を見てみましょう。StartTimeとBytesをフィールドリストから選択することでそれが可能です。覚えておいて頂きたいのは、QuickSightはこれを自動的に通信量のタイムチャートとして可視化することです。異なる日付/時間単位への変更はとても簡単です。

vpc-flow_10

これは、1日の中の通信で大きなスパイクがあった例です。この例はプロットされている他の日と比べてこの日に大量の通信が発生していることを教えてくれます。

o_vpc-flow_11

データに含まれるポート、IPアドレスその他のファセットを通じて通信許可/拒否に関するリッチな分析を行うことも簡単です。作られた分析結果をダッシュボードとして同じ組織の他のQuickSightユーザに共有することもできます。

o_vpc-flow_12

クエリのパフォーマンス向上とコスト削減のためのデータのパーティション化

ここまで述べてきたソリューションでは、S3に対して頻繁にGZIP圧縮されたフローログを配置します。Firehoseはこれらのファイルをデリバリーストリームの作成時に指定したバケット内に /year/month/day/hour というキーで格納します。Athenaでvpc_flow_logsテーブルを作成した時に使用していた外部テーブル定義には、この時系列キースペース内にある全てのファイルが含まれています。

Athenaはスキャンされたデータの量に応じて、クエリ毎に課金されます。ここまでのソリューションでは、クエリを発行する毎にS3に配置されたすべてのファイルがスキャンされます。VPCフローログの数が増加するに従い、データのスキャン量も増加し、クエリの応答時間やコストの両方に影響を与えることになります。

データの圧縮、パーティショニング、そして列指向フォーマットへの変換によって、クエリのコストを削減し、パフォーマンスを向上させることが可能です。Firehoseにて、既に圧縮された状態でS3へデータを配置するように設定されています。ここではパーティショニングに着目します。(Apache Parquetのような列指向フォーマットへの変換については本記事の対象外とします)

テーブルをパーティショニングすることで、クエリ発行時のデータスキャン量を制限することができます。多くのテーブルに置いて、特に発行するクエリの大半に時間ベースでの範囲制限が含まれる場合には、時間別での分割が有効です。AthenaはHiveのパーティショニング形式を使用します。これにより、パーティショニングの体系が直接反映されたkey-valueペアを含む名前付けをされたフォルダにパーティションが分割されます(詳しくはAthenaのドキュメントを参照)。

Firehoseにより作成されたこのフォルダ構造(例:s3://my-vpc-flow-logs/2017/01/14/09/)は、Hiveパーティショニング形式(例:s3://my-vpc-flow-logs/dt=2017-01-14-09-00/)とは異なります。しかし、ALTER TABLE ADD PARTITION文を用いると、手動でパーティションを追加してそのパーティションをデリバリーストリームによって作成されたキー空間の一部分に割り当てることができます。

ここでは、S3で新しいファイルを受信した際にALTER TABLE ADD PARTITION文を実行するためにLambda関数とAthena JDBCドライバを使い、自動的にFirehoseのデリバリーストリームに対する新しいパーティションを作成する方法を示します。

Athenaにてクエリを実行するLamba用のIAMロールの作成

Lambda関数を作成する前に、Lambdaに対してAthena上でクエリを実行することを許可するためのIAMロールを作成する必要があります。ユーザーガイドも参考に、次のように定義されるインラインアクセスポリシーを組み込んだ ‘lambda_athena_exec_role’ という名前のロールを作成して下さい。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:RunQuery",
                "athena:GetQueryExecution",
                "athena:GetQueryExecutions",
                "athena:GetQueryResults"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::aws-athena-query-results-*"
            ]
        }
    ]
}

パーティション化されたvpc_flow_logテーブルの作成

前の手順でAthena上に定義したvpc_flow_log外部テーブルはパーティション化されていません。‘IngestDateTime’という名前のパーティションを付したテーブルを作成するために、元のテーブルを削除して、次に示すDDLにてテーブルを再作成します。

DROP TABLE IF EXISTS vpc_flow_logs;

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
PARTITIONED BY (IngestDateTime STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

Athenaのパーティションを生成するためのラムダ関数の作成

次の手順でLamda関数を作成します。

  1. GitHub上にあるLambda Javaのプロジェクトをクローンします。
  2. READMEファイルの手順に従って、ソースをコンパイルし、出力された.jarファイルをS3のバケットにコピーします。
  3. Lambdaコンソールから関数の作成を開始し、一から作成を選択します。
  4. 基本的な情報ページでは、名前に‘CreateAthenaPartitions’、既存のロールとして’lambda_athena_exec_role’を選択し、関数の作成をクリックします。
  5. 関数コードページに遷移しますランタイムからJava 8を選択します。
  6. コードエントリタイプは、Amazon S3からのファイルのアップロードを選択します。S3リンクのURLには、先ほどアップロードした.jarファイルへのURLを入力します。
  7. ハンドラには’com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3Event::handleRequest’と入力します。
  8. このLambda関数のためには、いくつかの環境変数を設定する必要があります。
  • PARTITION_TYPE: [Month, Day, Hour]のうちどれかの値を設定します。この環境変数はオプションです。もし設定しなかった場合、Lambda関数はデフォルトとして日次でパーティションを作成します。例:’Hour’
  • TABLE_NAME: <データベース名>.<テーブル名>という形式で指定します。必須です。例:‘default.vpc_flow_logs’.
  • S3_STAGING_DIR: クエリの出力が書き込まれるAmazon S3のロケーション。(Lambda関数はDDL文だけを実行していますが、Athenaはその結果もS3に対して書き込みます。前の手順で作成したIAMポリシーでは、クエリ出力バケット名は‘aws-athena-query-results-’で開始されると仮定しています)
  • ATHENA_REGION: Athenaが動作するリージョンです。例:’us-east-1′

o_vpc-flow_13

  1. 最後に、タイムアウトを1分に設定して保存します。

S3上の新しいオブジェクトをLambda関数に通知するための設定

VPCフローログデータを含むS3バケットのプロパティページにおいて、Eventsペインを開き、通知の追加を押して新たな通知を作成します。

  • 名前: FlowLogDataReceived
  • イベント: ObjectCreated(All)
  • 送信先: Lambda function
  • Lambdaドロップボックスから‘CreateAthenaPartitions’を選択

これで、FirehoseによりS3バケットにファイルが配置される度に、‘CreateAthenaPartitions’ 関数がトリガーされます。この関数は新たに受信したオブジェクトのキーを解析します。キーの year/month/day/hour という部分に基づいて、関数作成の際に環境変数で指定したPARTITION_TYPEの設定(Month/Day/Hour)に従い、そのファイルがどのパーティションに属するかを判断します。次に、このパーティションが既に存在するかどうかをAthenaに問い合わせます。存在しなければ新たにパーティションを作成し、S3のキー空間の関連部分にマッピングします。

このロジックを詳しく見てみましょう。時間単位のパーティションを作成するために ‘CreateAthenaPartitions’ 関数を作成し、Firehoseによりちょうどフローログデータのファイルが s3://my-vpc-flow-logs/2017/01/14/07/xxxx.gz に配信されたと仮定します。

新しいファイルのS3キーを見て、Lambda関数はそれが’2017-01-14-07’という時間単位のパーティションに属すると推測します。Athenaを確認すると、そのようなパーティションはまだ存在していないため、次のDDL文を実行します。

ALTER TABLE default.vpc_flow_logs ADD PARTITION (IngestDateTime='2017-01-14-07') LOCATION 's3://my-vpc-flow-logs/2017/01/14/07/';

もしLambda関数が日次のパーティションとして作成されていた場合には、新しいパーティションは‘s3://my-vpc-flow-logs/2017/01/14/’ とマッピングされます。月次であれば ‘s3://my-vpc-flow-logs/2017/01/’ となります。
パーティションは、各ログがS3に取り込まれた日時を表しています。これは、各パーティションに含まれる個々のレコードのStartTimeやEndTimeの値よりも後の時間であることに注意して下さい。

Athenaを用いたパーティション化されたデータへの問合せ

これで、問合せはパーティションを利用できるようになりました。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE ingestdatetime > '2017-01-15-00'
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

過去3時間以内に取り込まれたデータを問い合わせるには、次のような問合せを実行します(ここでは時間単位のパーティショニングを行っていると仮定します)。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE date_parse(ingestdatetime, '%Y-%m-%d-%H') >= 
      date_trunc('hour', current_timestamp - interval '2' hour)
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

次の2つのスクリーンショットが示しているのは、パーティションを利用することでスキャンするデータ量を減らせるということです。そうすることで、クエリのコストと応答時間が削減できます。1つめのスクリーンショットはパーティションを無視したクエリの結果です。
vpc-flow_1o_6
2つめのスクリーンショットは、WHERE句でパーティションの使用を指示したクエリの結果です。
o_vpc-flow_17
このように、パーティションを用いることで2つめのクエリは半分の時間で済み、データのスキャン量は最初のクエリの1/10以下にできたことがわかります。

結論

以前は、ログ分析というのは特定のクエリのためだけに大規模なデータを準備しなければならなかったり、大規模なストレージやマシンリソースを準備したり運用したりする必要がありました。Amazon AthenaとAmazon QuickSightによって、ログデータの発行から格納、分析、そして視覚化まで、より柔軟性高く実現することができるようになっています。クエリを実行してデータを視覚化するのに必要なインフラストラクチャに焦点を当てるのではなく、ログの調査に集中できるようになるのです。

著者について

ben_snively_90
Ben Snively – a Public Sector Specialist Solutions Architect. 彼は政府や非営利団体、教育組織の顧客におけるビッグデータや分析プロジェクトについて、AWSを用いたソリューションの構築を通じた支援に従事しています。また、彼は自宅にIoTセンサーを設置してその分析も行っています。

 

Ian_pic_1_resized
Ian Robinson – a Specialist Solutions Architect for Data and Analytics. 彼はヨーロッパ・中東・アジア地域において顧客が持つデータを結びつけることによる価値の創出のためにAWSを利用することを支援しています。また彼は現在、1960年代のダーレク(Dalek)の複製を修復する活動をしています。
(翻訳はSA五十嵐が担当しました。原文はこちら)

関連文書

Analyze Security, Compliance, and Operational Activity Using AWS CloudTrail and Amazon Athena

Amazon Elasticsearch Service が VPC をサポート

本日より、NAT インスタンスやインターネットゲートウェイを必要とせずに Amazon VPC から Amazon Elasticsearch Service ドメインに接続できるようになりました。Amazon ES の VPC サポートは設定も簡単で信頼性が高く、セキュリティをさらに強化することができます。VPC サポートでは、その他のサービスと Amazon ES 間のトラフィックはパブリックインターネットから分離されており AWS ネットワーク内で維持されます。既存の VPC セキュリティグループを使用してネットワークアクセスを管理できます。また、AWS Identity and Access Management (IAM) ポリシーを使って保護機能を強化することもできます。Amazon ES ドメインの VPC サポートは追加費用なしにご利用いただけます。

ご利用開始にあたって

VPC での Amazon Elasticsearch Service ドメインの作成は簡単です。クラスター作成に使ういつもの手順を行い [VPC access] を選択します。

これだけです。その他の手順はありません。これで VPC からドメインにアクセスできるようになりました。

主要事項

VPC をサポートするにあたり、Amazon ES は少なくても 1 つの VPC サブネットにエンドポイントを配置します。Amazon ES はクラスター内の各データノードの VPC に Elastic Network Interface (ENI) を配置します。各 ENI はサブネットの IPv4 範囲内からプライベート IP アドレスを使用し、パブリック DNS ホスト名を受け取ります。ゾーン対応を有効にすると、Amazon ES は異なるアベイラビリティーゾーンで 2 つのサブネットにエンドポイントを作成することで、より優れたデータの耐久性を提供しています。

クラスター内のノード数には IP アドレスの三倍の数を確保しておく必要があります。ゾーン対応を有効にしている場合は、その数を分割できます。Amazon ES 専用の別のサブネットを作成するのが理想的です。

注意点:

  • 現在、既存のドメインを VPC に移動することはできません (その逆も同様)。VPC サポートを利用するには、新しいドメインを作成しデータを移行してください。
  • 現時点で Amazon ES は VPC 内にあるドメインの Amazon Kinesis Firehose をサポートしていません。

詳しくは「Amazon ES ドキュメント (Amazon ES documentation)」をご覧ください。

Randall

Amazon Redshift Spectrumによるセキュリティとコンプライアンスのためのデータベース監査ログの分析

(補足:本記事は2017年6月にAWS Bigdata Blogにポストされた記事の翻訳です。一部の記載を現時点の状況に合わせて更新してあります)

クラウドサービスの採用が増加するにつれて、組織は重要なワークロードをAWSに移行しています。これらのワークロードの中には、セキュリティとコンプライアンスの要件を満たすために監査が必要な機密データを格納、処理、分析するものがあります。監査人が良くする質問は、誰がどの機密データをいつ照会したのか、いつユーザが最後に自分の資格情報を変更/更新したのか、誰が、いつシステムにログインしたかということです。

デフォルトでは、Amazon Redshiftは、ユーザーの接続情報、変更情報、アクティビティに関連するすべての情報をデータベースに記録します。ただし、ディスク領域を効率的に管理するために、ログの使用状況と使用可能なディスク容量に応じて、ログは2〜5日間のみ保持されます。より長い時間ログデータを保持するには、データベース監査ロギングを有効にします。有効にすると、Amazon Redshiftは指定したS3バケットに自動的にデータを転送します。

Amazon Redshift Spectrumにより、Amazon S3に格納されたデータにクエリすることを可能にし、さらにAmazon Reshift のテーブルと結合することも可能です。 Redshift Spectrumを使い、S3に格納されている監査データを確認し、すべてのセキュリティおよびコンプライアンス関連の質問に答えることができます。AVRO、Parquet、テキストファイル(csv、pipe delimited、tsv)、シーケンスファイル、およびRCファイル形式、ORC、Grokなどのファイルをサポートしています。 gzip、snappy、bz2などのさまざまな圧縮タイプもサポートしています。

このブログでは、S3に保存されたAmazon Redshift の監査データを照会し、セキュリティーやコンプライアンスの質問への回答を提供する方法を説明します。

作業手順

次のリソースを設定します。

  • Amazon Redshift クラスタとパラメータグループ
  • Amazon Redshift に Redshift Spectrumアクセスを提供するIAMロールとポリシー
  • Redshift Spectrum外部表

前提条件

  • AWS アカウントを作成する
  • AWS CLI にて作業ができるように設定する
  • Amazon Redshift にアクセスできる環境を用意する。(psqlやその他クライアント)
  • S3バケットを作成する

クラスタ要件

Amazon Redshift クラスタは、次の条件を満たす必要があります。

  • 監査ログファイルを格納しているS3バケットと同じリージョンにあること
  • バージョン1.0.1294以降であること
  • ログ蓄積用のS3バケットに読み込み、PUT権限を設定されていること
  • AmazonS3ReadOnlyAccessとAmazonAthenaFullAccessの少なくとも2つのポリシーを追加したIAMロールにアタッチしていること

Amazon Redshift のセットアップ

ユーザーのアクティビティーをロギングするために、新しいパラメータグループを作ります。


aws redshift create-cluster-parameter-group --parameter-group-name rs10-enable-log --parameter-group-family Redshift-1.0 --description "Enable Audit Logging" 
aws redshift modify-cluster-parameter-group --parameter-group-name rs10-enable-log --parameters '{"ParameterName":"enable_user_activity_logging","ParameterValue":"true"}'

Amazon Redshift クラスタを上記で作成したパラメータグループを使い作成します。

aws redshift create-cluster --node-type dc1.large --cluster-type single-node --cluster-parameter-group-name rs10-enable-log --master-username <Username> --master-user-password <Password> --cluster-identifier <ClusterName>

クラスターが出来るまで待ち、作成されたらロギングを有効にします。

aws redshift enable-logging --cluster-identifier <ClusterName> --bucket-name <bucketname>

※S3のバケットポリシーなどはこちらを御覧ください。
※もしくは下記のようにマネージメントコンソールからログ用のS3のバケットを新規で作成するとバケットポリーが設定された状態のバケットが作成できます。

 

Redshift Spectrumをセットアップします。

Redshift Spectrumをセットアップするために、IAM ロールとポリシー、External Database,External Tablesを作成します。

IAM ロールとポリシー

Redshift データベースからS3バケットにアクセスするためのIAMロールを作成します。
RedshiftAssumeRole.json ファイルを作成し、下記のコマンドを実行してください。

aws iam create-role --role-name RedshiftSpectrumRole --assume-role-policy-document file://RedshiftAssumeRole.json

RedshiftAssumeRole.json
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
			"Service": "redshift.amazonaws.com"
		},
		"Action": "sts:AssumeRole"
		}
	]
}

AmazonS3ReadOnlyAccess および AmazonAthenaFullAccess の2つのポリシーをアタッチします。

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name RedshiftSpectrumRole
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name RedshiftSpectrumRole

作成したロールをAmazon Redshift クラスタに追加します。 <ClusterName> と <accountNumber> を自身のものに置き換えます。

aws redshift modify-cluster-iam-roles --cluster-identifier <ClusterName> --add-iam-roles arn:aws:iam::<accountNumber>:role/RedshiftSpectrumRole

ここまでの操作で、Amazon Redshift クラスタから S3 にアクセスできるように、Redshift Spectrum は設定されました。

External DatabaseとSchema

Amazon Redshift データベースにログインして、外部データベースとスキーマ、外部表を作成し、S3 に格納されたデータにアクセスできるよう設定します。

例えばpsql でアクセスする場合は下記がコマンド例になります。
本手順で作成している場合は、dev という名前のデータベースが作成されています。

psql -h <ご自身の設定したものを確認して変更ください>.ap-northeast-1.redshift.amazonaws.com -p 5439 -U dbadmin -d dev -W

Amazon Redshift で作成された外部データベースは、Amazon Athena データカタログに格納することが出来、Athena からも直接アクセスできます。

※本Blog (英語版)が書かれたあとにAWS Glue がリリースしておりますので、こちらも参考にしてください。

監査ログデータを照会するには、Amazon Redshift で外部データベースとスキーマを作成します。 DDL を実行する前に、以下のアカウント番号、ロール名、リージョン名を更新してください。

CREATE external SCHEMA auditLogSchema
FROM data catalog
DATABASE 'auditLogdb'
iam_role 'arn:aws:iam::<AccountNumber>:role/<RoleName>'
REGION '<regionName>'
CREATE external DATABASE if not exists;

REGION パラメータは、Athena データカタログのリージョンを指定します。 デフォルトの値は、Amazon Redshift クラスタと同じリージョンになります。(東京リージョンは、ap-northeast-1 になります。)

外部表の作成

Redshift Spectrum は S3 上のデータに対してクエリ可能になる外部表が作成可能です。 外部表は読取り専用であり、 現在、Spectrum を使用して S3 上のデータを変更することはできません。外部表は、表名の前にスキーマ名を付けた形で指定します。

3つの異なる監査ログ・ファイルを照会するための下記3つの表を作成します。

・ユーザーDDL:ユーザーが実施した DDL(CREATEやDROPなど)を記録します。
・ユーザー接続:成功または失敗したすべてのログオン情報をログに記録します。
・ユーザーアクティビティ:ユーザーが実行したすべてのクエリを記録します。
ユーザーDDL とユーザー接続のデータファイル形式は、パイプ区切りのテキストファイルです。 どちらのファイルもgzipユーティリティを使用して圧縮されています。 ユーザーアクティビティログはフリーフローテキストです。 各クエリは新しい行で区切られます。 このログも、gzip ユーティリティを使用して圧縮されています。

次のクエリのS3 バケットの場所を自身のバケットになおして実行してください。

CREATE external TABLE auditlogschema.userlog
(
userid INTEGER,
username CHAR(50),
oldusername CHAR(50),
action CHAR(10),
usecreatedb INTEGER,
usesuper INTEGER,
usecatupd INTEGER,
valuntil TIMESTAMP,
pid INTEGER,
xid BIGINT,
recordtime VARCHAR(200)
)
row format delimited
fields terminated by '|'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

CREATE external TABLE auditlogschema.connectionlog
(
event CHAR(50) ,
recordtime VARCHAR(200) ,
remotehost CHAR(32) ,
remoteport CHAR(32) ,
pid INTEGER ,
dbname CHAR(50) ,
username CHAR(50) ,
authmethod CHAR(32) ,
duration BIGINT ,
sslversion CHAR(50) ,
sslcipher CHAR(128) ,
mtu INTEGER ,
sslcompression CHAR(64) ,
sslexpansion CHAR(64) ,
iamauthguid CHAR(36)
)
row format delimited
fields terminated by '|'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

CREATE external TABLE auditlogschema.activitylog
(
logtext VARCHAR(20000)
)
row format delimited
lines terminated by '\n'
stored as textfile
location 's3://<bucketName>/AWSLogs/<accountNumber>/redshift/<regionName>/YYYY/MM/DD/';

guest ユーザーを作成して簡単な作業をしログを作成する。

ユーザー “guest”  を作成してログインし、 “person” という表を作成します。次に、テーブルに対してクエリーを実行します。

管理ユーザーとしてログインし、新しいユーザー “guest” を作成します。

CREATE USER guest PASSWORD 'Temp1234';

ユーザー  “guest” としてログインし、以下の DDL を実行します。

CREATE TABLE person (id int, name varchar(50),address VARCHAR(500));

INSERT INTO person VALUES(1,'Sam','7 Avonworth sq, Columbia, MD');
INSERT INTO person VALUES(2,'John','10125 Main St, Edison, NJ');
INSERT INTO person VALUES(3,'Jake','33w 7th st, NY, NY');
INSERT INTO person VALUES(4,'Josh','21025 Stanford Sq, Stanford, CT');
INSERT INTO person VALUES(5,'James','909 Trafalgar sq, Elkton, MD');

guest  でログインしている間に、person テーブルでいくつかのクエリを実行して、アクティビティログを生成します。

SELECT * 
  FROM person;

SELECT * 
  FROM person 
 WHERE name='John';

次に、上記で作成した3つの外部表(それぞれのログ)毎に、1つのクエリーの具体例を説明します。

ユーザーDDL ログ

次のクエリは、ユーザー guest がその日に実行した作業が確認できます。

SELECT username
,action
,recordtime 
  FROM auditlogschema.userlog 
 WHERE action in ('create','alter','drop','rename') 
   AND username='guest';

ユーザー接続ログ

次のクエリでは、ユーザー guest がログインした remotehost名、時刻を確認できます。

SELECT event
,recordtime
,remotehost 
  FROM auditlogschema.connectionlog 
 WHERE length(username) >0 
   AND username='guest' 
ORDER BY recordtime;

ユーザーアクティビティログ

次のクエリは、誰がいつ、person表 にアクセスしたか、またその際に流したクエリを確認できます。

SELECT pu.usename
	,substring(logtext,2,strpos(logtext,'UTC')+1)UTCTime
	,substring(logtext,strpos(logtext,'LOG:')+5) query
  FROM auditlogschema.activitylog al,pg_user pu
 WHERE logtext like '%xid=%'
   AND logtext not like '%SELECT 1%'
   AND logtext not like '%SET %'
   AND logtext like '%from person%'
   AND substring(substring(logtext,strpos(logtext,'userid=')+7),1,strpos(substring(logtext,strpos(logtext,'userid=')+7),' '))=pu.usesysid;

 

まとめ

このBlogでは、Amazon Redshift に新たに追加された機能 (Redshift Spectrum) を使用して、S3に格納されている監査ログデータを照会し、セキュリティおよびコンプライアンス関連の質問に簡単に回答する方法について説明しました。

Amazon Redshift Spectrum は2017年10月20日現在、東京リージョンでも利用可能となっております。

Amazon Redshift Spectrum の詳細については、こちらをご覧ください。 新機能についての詳細は、「Get Started」をご覧ください。

翻訳:パートナーソリューションアーキテクト 相澤