Amazon Web Services ブログ

Amazon Athena を使用して高度な分析を行い、Amazon DynamoDB データの視覚化を行う

Amazon DynamoDB サービスでは、1 秒あたり数十億件のアイテムと数百万回のリクエストの中から膨大な分析値を取得することができます。ただし、その分析値を取得するには、データをエクスポートする必要があります。DynamoDB テーブルから分析プラットフォームにデータをコピーすることで、情報を豊富に抽出することができます。これを行うには、優れたアーキテクチャのビッグデータパイプラインが、トランザクションプロセスを分析から切り離すのに大変便利です。このブログ記事では、DynamoDB テーブルから Amazon S3 にデータを移行するビッグデータパイプラインを構築する方法を説明します。これは、完全管理型の Presto クエリサービスである Amazon Athena を使用して、高度な分析が実行でき、Amazon QuickSight を用いて、視覚化およびアドホック分析を構築することも可能です。

デカップリングしたビッグデータアプリケーションにはたいてい、ストレージとコンピューティングを分離する共通のパイプラインがあり、そのため、新しい処理技術が生まれた際にはそれを活用することができます。デカップリングにより、データの耐久性に影響を与えることなく、複数の分析エンジン用の計算リソースを柔軟にプロビジョニングできるようになります。また、パイプラインを設計して、ストレージと処理の段階を繰り返し、下流のアプリケーションがすばやく使用できる形式でデータを整形することも可能です。

ビッグデータパイプラインの設計には、3 つの大きな特性が作用しています。

  • パイプライン全体の遅延 – データから正しい情報を得るまでにどれくらいの時間を要するでしょうか? 数千分の 1 秒、数分、あるいは数日?
  • データのスループット – どれくらいのデータを取り込んで処理する必要がありますか? 数ギガバイト、数テラバイト、あるいは数ペタバイト?
  • コスト – アプリケーションのための目標予算はいくらですか? AWS の最も費用対効果の高いオプションが、普通、適切な選択と言えるでしょう。

ビッグデータパイプラインを設計する際に考慮すべきことは他にも、データ構造、アクセスパターン、データの温度、可用性と耐久性、そしてサービスが完全に管理されているかどうかなどがあります。これらの特性に基づいてジョブに適切なツールを使用することは、優れたアーキテクチャを持つビッグデータパイプラインにとって重要です。

階層化されているビッグデータパイプライン

階層化されたビッグデータパイプラインを解説する前に、このソリューションで利用する主なサービスと機能を見てみましょう。

パイプラインでの DynamoDB の機能

DynamoDB で用いる主要なコンポーネントは、テーブル、項目、および属性です。テーブルは項目の集合であり、各項目は属性の集合です。DynamoDB はプライマリキーを使って、テーブルの各項目を識別します。セカンダリインデックスを使用すると、クエリの柔軟性が向上します。詳細については、「Amazon DynamoDB の仕組み」を参照してください。これは「DynamoDB 開発者ガイド」の中にあります。

DynamoDB TTL (Time To Live) を使用すれば、ストレージコストを削減する手段として、すでに関連性がなくなった項目を自動的に削除することができます。このブログ記事では、テーブルの TTL を有効にし、ttl 属性を使用して削除のタイムスタンプを設定します。TTL の詳細については、「Time To Live: 仕組み」を参照してください。

次に、DynamoDB ストリームを使用して、項目レベルの変更の時間順のシーケンスを取得します。これで、Amazon S3 にコピーできます。DynamoDB ストリームの詳細については、「DynamoDB ストリーム を使用したテーブルアクティビティのキャプチャ」を参照してください。

パイプラインでの AWS Lambda の機能

AWS Lambda は、サーバーをプロビジョニングあるいは管理を行うことなく、コードを実行できるコンピューティングサービスです。Lambda は必要な時のみコードを実行し、毎秒何千ものリクエストにまで自動的に拡張します。Lambda は、高可用性を保持し、サーバーとオペレーティングシステムの保守、およびパッチ適用を行います。Lambda では、コードが実行されている時に消費したコンピューティングの時間分だけを支払います。

Lambda は、オブジェクトが Amazon S3 バケットに配置されるといったイベントに応じて、コードを呼び出す機能があります。今回の場合、DynamoDB テーブルに対する更新に呼応して、Lambda がコードを呼び出します。この記事では、DynamoDB ストリームがトリガーした Lambda 関数を作成し、Amazon Kinesis Data Firehose を使用して、Amazon S3 に項目をコピーします。

例えば、データセット全体に対し分析を行いたい場合には、Amazon S3 にすべての項目をコピーできます。TTL で削除された項目だけをコピーすることもできます。TTL によって削除される項目のレコードには、ユーザーにが削除した項目と区別するために、追加のメタデータ属性が含まれています。TTL 削除のための userIdentity フィールド (次の例に示す) は、DynamoDB が TTL の一部として削除を実行したことを表します。

"userIdentity": {
                "Type": "Service",
                "PrincipalId": "dynamodb.amazonaws.com"
            }

TTL データのみを移行するには、eventNameREMOVE で、かつ userIdentityPrincipalID (dynamodb.amazonaws.comと同じ) を含むレコードのみをコピーする Lambda 関数を変更してください。

Lambda をパイプラインの一部として使用すると、データを Amazon S3 に格納する前に、ライトトランスフォーメーション (light transformation) を実行する機会がさらに増えます。このソリューションでの Lambda 関数は、ネストされた JSON データの一部を平坦化します。これにより、今後他のデータセットとの結合が簡単になります。

パイプラインでの Amazon Athena の機能

Athenaは、標準 SQL を使用して Amazon S3 で直接データを分析できるようにするインタラクティブなクエリサービスです。Athena はサーバーレスであるため、インフラストラクチャの設定や管理は不要であり、また、実行したクエリにのみ課金されます。Athena は自動的にクエリを拡張、つまり並列実行するため、大規模なデータセットや複雑なクエリがあっても、高速で結果が出ます。Athena の設定の詳細については、「セットアップ」 (「Athena ユーザーガイド」の中にある) を参照してください。

DynamoDB のデータは Amazon S3 に JSON 形式で格納されます。通常、データを SQL クエリに適した形式に変換するには、抽出、変換、およびロード (ETL) プロセスが必要です。一方で、Athena はスキーマオンリードと呼ばれるアプローチを使用しています。これは、クエリ実行時にスキーマをデータに投影することができるものです。これにより、データのロードや ETL が不要になります。この記事でのソリューションは、JSON シリアライザ/デシリアライザ (SerDe) を使用します。これで、生の JSON レコードを解析し、Hive データ定義言語を用いて外部テーブルを作成することができます。スキーマが作成されたら、データのクエリを開始できます。

すべてをまとめる

説明したすべての事項について、エンドツーエンドのパイプラインを見てみましょう。次の図は、ソリューションの仕組みを示しています。

  1. アプリケーションは、データを DynamoDB テーブルに書き込みます。このブログ記事の目的に沿って、サンプルのデータセットを使用して、DynamoDB テーブルにデータを取り込むためのサンプル Python 関数を用意します。
  2. DynamoDB TTL テーブルの設定では、テーブル内のタイムスタンプ属性に基づいて、項目が期限切れになったり、削除されたりします。
  3. DynamoDB ストリームは、項目レベルの変更の時間順のシーケンスを取得します。
  4. Lambda 関数は、DynamoDB ストリームをリッスンし、項目を Kinesis Data Firehose 配信ストリームに書き込みます。提供された機能は、すべての新しい項目を Kinesis Data Firehose 配信ストリームに配置します。関数を変更し、userIdentity フィールドの追加のメタデータ属性に基づいて、TTL が削除した項目のレコードのみを入れることができます。
  5. Kinesis Data Firehose がデータを Amazon S3 に送信します。
  6. データが Amazon S3 に配置されたら、Athena を使って外部テーブルを作成し、パーティションを設定してデータのクエリを開始します。
  7. Amazon QuickSight を設定して、データを視覚化し、Athena または Amazon S3 のデータのアドホッククエリを直接実行することもできます。
  8. アプリケーションは DynamoDB からホットデータを直接クエリでき、Athena API または Amazon QuickSight を視覚化して分析データにアクセスすることもできます。

ソリューションをデプロイする

提供された AWS CloudFormation テンプレートは、DynamoDB テーブル、DynamoDB ストリーム、Amazon S3 バケット、Kinesis Data Firehose 配信ストリーム、および Lambda 関数をデプロイします。次のセクションの「ソリューションの検証とデータのクエリ」に従って、Athena と Amazon QuickSight を手動で設定する必要があります。

ソリューションをデプロイするには

  1. Lambda 関数コード ddb-to-firehose.py をダウンロードし、選んだ Amazon S3 バケットにアップロードしてください。Amazon S3 バケットがソリューションをデプロイする予定の同じ AWS リージョンにあることを確認してください。バケット名を書き留めます。
  2. CloudFormation テンプレート を GitHub からデスクトップにダウンロードしてください。
  3. CloudFormation コンソールに移動し、[CreateStack] を選択します。
  4. Amazon S3 にテンプレートをアップロードする」を選択し、先ほどダウンロードした ddbathenablog_cf.yaml テンプレートをブラウジングします。[Next] を選択します。
  5. スタック名と DynamoDB テーブル名 (Movies など) を指定します。ステップ 1 で Lambda 関数コードをアップロードしたバケット名も指定します。[Next] を選択します。
  6. [Next] を選択する ([Option] の中にある)
  7. 最後のページで、[I acknowledge that AWS CloudFormation might create IAM resources] チェックボックスをオンにし、[Create] を選択します。
  8. AWS CloudFormation スタック (AWS リソースのコレクション) が完全にデプロイされ、次のセクションに移動する前にステータスが CREATE_COMPLETE と表示されるまで、数分間待ちます。

ソリューションの検証とデータのクエリ

ソリューションを検証し、DynamoDB テーブルを迅速に追加するために、この記事にリンクされているデータファイルと Python 関数をダウンロードしてください。Python スクリプトが、Internet Movie Database (IMDb) から数千の映画に関する情報を含むサンプルデータファイルを読み込みます。各映画のファイルには、年、タイトル、info という JSON マップがあります。次のコード例は、データがどのようなものかを示しています。

{
    "year" : 2013,
    "title" : "Turn It Down, Or Else!",
    "info" : {
        "directors" : [
            "Alice Smith",
            "Bob Jones"
        ],
        "release_date" : "2013-01-18T00:00:00Z",
        "rating" : 6.2,
        "genres" : [
            "Comedy",
            "Drama"
        ],
        "image_url" : "http://ia.media-imdb.com/images/N/O9ERWAU7FS797AJ7LU8HN09AMUP908RLlo5JF90EWR7LJKQ7@@._V1_SX400_.jpg",
        "plot" : "A rock band plays their music at high volumes, annoying the neighbors.",
        "rank" : 11,
        "running_time_secs" : 5215,
        "actors" : [
            "David Matthewman",
            "Ann Thomas",
            "Jonathan G. Neff"
       ]
    }
}

ソリューションを検証し、DynamoDB テーブルを作成するには、

  1. LoadMovieData.py スクリプトと moviedata.zip スクリプトをダウンロードしてください。moviedata.jsonファイルを、moviedata.zip アーカイブから抽出します。
  2. LoadMovieData.py スクリプトを編集し、region_name および dynamodb の値を更新します。Table を使用して、AWS リージョンとテーブル名を一致させます。
    dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
    table = dynamodb.Table('movies')
  3. コマンドウィンドウ を開き、次の Python スクリプトを実行してデータをアップロードします。
    PATH\LoadMovieData.py PATH\moviedata.json

    レコードが DynamoDB テーブルに読み込まれると、次の出力が表示されます。

    10 rows inserted
    20 rows inserted
    30 rows inserted
    …
    4600 rows inserted
  4. DynamoDB コンソールを開き、[Table] を選択し、次に、[Movies] テーブルを、そして [Items] を選択します。次のスクリーンショットに示すように、最初の 100 項目を表示する必要があります。テーブルの最初の 100 項目のスクリーンショット
  5. Amazon S3 コンソールを開き、このソリューションで作成したバケットを開きます。バケット名は、AWS CloudFormation コンソールでスタック名を、そして [Resources] を選択することで確認できます。Amazon S3 のデータは、次のスクリーンショットのように表示されます。Amazon S3 のデータのスクリーンショット
    Amazon S3 の各オブジェクトには、Data Firehose バッファ設定に基づく動画レコードがいくつか含まれています。
  6. Athena コンソールを開きます。お好みのデータベースを選択し、LOCATION を動画データを含む S3 バケットに置き換えて、次の Hive DDL ステートメントを貼り付けます。Athena を初めて使用する場合は、「ご利用開始にあたって」 (「Athena ユーザーガイド」の中にあります) を参照して、まずサンプルデータベースを作成してください。
    CREATE EXTERNAL TABLE `movies`(
      `year` int,
      `createtime` int,
      `title` string,
      `expiretime` int,
      `info` string,
      `actor1` string,
      `actor2` string,
      `director1` string,
      `director2` string,
      `genre1` string,
      `genre2` string,
      `rating` double)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    LOCATION
      's3://your-bucket-name/path/to/data/'

    Athena コンソールからデータをクエリすることができます。例えば、次のクエリを実行して genre1 の平均評価を取得します。

    SELECT genre1, avg(rating) as avg_rating FROM "sampledb"."movies" group by genre1 order by avg_rating desc
  7. Amazon QuickSight コンソールを開きます。Amazon QuickSight を初めて使用する場合は、「Amazon QuickSight でのデータ分析の開始方法」 (「Amazon QuickSight ユーザーガイド」の中にあります) を参照してください。 [Manage Data] を選択し、次に [New data set] を選択します。「Create a Data Set」ページで Athena を選択し、お好みのデータソース名 (movies など) を入力し、[Create data source] を選択します。この例が次のスクリーンショットにあります。新しい Athena データソースを作成する
  8. ここの画像に表示されている「Choose your table」ページで、データベースとmovies テーブルを選択し、[Select] を選択します。「Choose your table」ページのスクリーンショット
  9. Finish data set creation」ページで [Direct query your data] を選択し、[Visualize] を選択します。データを視覚化するための [Finish data set creation] ページのスクリーンショット
  10. これで、簡単な視覚化を作成できます。Y 軸には genre1actor1、X 軸には rating (平均) を入力します。次のスクリーンショットはこれを示しています。データ視覚化のスクリーンショット
  11. 平均評価が genre1 で表示できるようになりました。actor1 で評価を表示するには、[Drama] を選択し、次に [Drill down to actor1] を選択しますactor1 統計にドリルダウンするスクリーンショット
    結果は、actor1 ビューによる平均評価を示します。これは genre1 内にあります。データドリルダウンの結果のスクリーンショット

結論

この記事のサンプル解析と視覚化ソリューションは、DynamoDB テーブルのデータに含まれる正しい情報を活用する方法を示しています。この記事では、階層化したビッグデータパイプラインの構築を紹介し、これで、DynamoDB テーブルから Amazon S3 バケットにデータをすばやく移行することができるようになりました。次に、Athena で高度な分析を行い、Amazon QuickSight で視覚化します。このソリューションは、あらゆる DynamoDB データを Amazon S3 にコピーできる柔軟性を備えています。DynamoDB TTL を使用して、期限切れのレコードのみを移行することもできます。この種類のデカップリングしたソリューションでは、レイテンシー、スループット、およびコストという重要な考慮事項に基づいて、パイプラインの各手順において最適な分析サービスを利用できます。


著者について

Roger Dahlstrom はアマゾン ウェブ サービスのソリューションアーキテクトです。 AWS のお客様と協力して、データベースプロジェクトに関する指導と技術支援を行い、AWS をしている場合にソリューションの価値の向上を目指しています。

 

 

 

 

Ilya Epshteyn はアマゾン ウェブ サービスのプリンシパルソリューションアーキテクトです。