このモジュールでは、Amazon Kinesis Data Firehose の配信ストリームを作成し、最初のモジュールで作成した Amazon Kinesis ストリームから Amazon Simple Storage Service (Amazon S3) にバッチデータを配信します。その後、Amazon Athena を使用し、配信した未加工のデータにクエリを実行します。

このモジュールのアーキテクチャは、最初のモジュールで作成した Amazon Kinesis ストリームを土台に構築します。Amazon Kinesis Data Firehose を使用してデータをまとめ、Amazon S3 に配信してアーカイブします。Amazon Athena を使用し、Amazon S3 バケットに格納した未加工データに対してアドホックなクエリを実行します。

モジュールの所要時間: 15 分

使用するサービス
• Amazon Kinesis Data Firehose
• Amazon S3
• Amazon Athena

serverless-real-time-data-processing-mod-4
  • ステップ 1:Amazon S3 バケットを作成する

    コンソールまたは CLI を使用して S3 バケットを作成します。バケット名はグローバルに一意である必要があることに注意してください。例えば、「wildrydes-data-自分の名前」などを使用することをお勧めします。


    a.AWS マネジメントコンソールで [サービス] を選択し、[ストレージ] の [S3] を選択します。

    b.[+ バケットの作成] を選択します。

    c.バケットには、wildrydes-data-yourname などグローバルに一意な名前を指定します。

    d.バケットに使用するリージョンを選択します。

    e.[次へ] を 3 回クリックし、[バケットの作成] をクリックします。

  • ステップ 2:Amazon Kinesis Data Firehose 配信ストリームを作成する

    wildrydes」という名前で Amazon Kinesis Data Firehose 配信ストリームを作成し、wildrydes ストリームからデータを取得し、そのコンテンツを前のセクションで作成した S3 バケットにバッチ配信するよう設定します。


    a.AWS マネジメントコンソールで [サービス] を選択し、[分析] の [Kinesis] を選択します。

    b.[配信ストリームの作成] を選択します。

    c.[Delivery stream name (配信先のストリーム名)] に「wildrydes」と入力します。

    d.[ソース] で [Kinesis stream] を選択し、ソースストリームは [wildrydes] を選択します。

    e.[次へ] を選択します。

    f.[Record transformation (レコードの変換)] と [Record format conversion (レコードの変換形式)] を無効のままにし、[次へ] を選択します。

    g.[Desitation] で [Amazon S3 ] を選択します。

    h.[S3 バケット] で、前のセクションで作成したバケット (wildrydes-data-johndoeなど) を選択します。

    i.[次へ] を選択します。

    j.[S3 Buffer (S3 へのバッファ)] の [Buffer interval (バッファの間隔)] に「60」と入力し、S3 への配信頻度を 1 分に 1 度に設定します。

    k.ページの一番下にスクロールして、[IAM ロール] で [Create new or Choose (新しく作成または選択)] を選択します。新しいタブで [許可] を選択します。

    l.[次へ] を選択します。配信ストリームの詳細を確認し、[Create delivery stream (配信ストリームを作成)] を選択します。

  • ステップ 3:Amazon Athena テーブルを作成する

    Amazon Athena テーブルを作成し、JSON SerDe を使用して Amazon S3 の未加工データにクエリを実行します。テーブルに「wildrydes」と名前を付け、未加工データに以下の属性を含めます。

    • Name (string)
    • StatusTime (timestamp)
    • Latitude (float)
    • Longitude (float)
    • Distance (float)
    • MagicPoints (int)
    • HealthPoints (int)

    a.[サービス] を選択し、[分析] セクションで [Athena ] を選択します。

    b.表示に従って [今すぐ始める] を選択し、モーダルダイアログの右上で x をクリックして最初に表示されるチュートリアルを終了します。

    c.以下の SQL ステートメントをコピーして貼り付け、テーブルを作成します。LOCATION 句では、「YOUR_BUCKET_NAME_HERE 」のプレースホルダーを自分のバケット名に置き換えます。

    CREATE EXTERNAL TABLE IF NOT EXISTS wildrydes (
           Name string,
           StatusTime timestamp,
           Latitude float,
           Longitude float,
           Distance float,
           HealthPoints int,
           MagicPoints int
         )
         ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
         LOCATION 's3://YOUR_BUCKET_NAME_HERE/';

    d.[クエリの実行] を選択します。

    Amazon Athena を初めて使用するユーザーの方には、Amazon S3 でのクエリ結果格納場所をセットアップする手順が、メッセージとして表示される場合があります。その場合は、既存の S3 にフォルダーを作成するか、athena-query-results-yourname のような名前で新規の S3 バケットを作成します。その上で、それらを Athena 用のクエリ結果保存場所として設定します。

    e.左側のナビゲーションに表示されているデータベース sampledb の下でリストを確認し、作成されたテーブル wildrydes が追加されていることを確認します。

  • ステップ 4.バッチ処理されたデータファイルを確認する

    AWS マネジメントコンソールを使用し、Kinesis Data Firehose の配信ターゲットとして指定した S3 バケットを開きます。Firehose によって、バッチ処理されたデータファイルがバケットに配信されていることを確認します。ファイルの 1 つをダウンロードし、テキストエディタで開いて内容を確認します。


    a.[サービス] を選択し、[ストレージ] セクションで [S3] を選択します。

    b.[Search for buckets (バケットを検索)] のテキスト入力欄に、最初のセクションで作成したバケット名を入力します。

    c.バケットを選択し、年、月、日、時間のフォルダをそれぞれ開き、バケットにファイルが配信されていることを確認します。

    d.ファイルの 1 つを選択し、[ダウンロード] を選択します。テキストエディタでファイルを開き、内容を確認します。

  • ステップ 5.データファイルにクエリを実行する

    Amazon Athena テーブルにクエリを実行し、Kinesis Data Firehose から S3 に配信されたすべてのレコードを表示します。


    a.[サービス] を選択し、[分析] セクションで [Athena ] を選択します。

    b.以下の SQL クエリをコピーして貼り付けます。

    SELECT * FROM wildrydes

    c.[クエリの実行] を選択します。

    data-lake-query-results

    (クリックして拡大)

    data-lake-query-results
  • まとめとヒント


    🔑 Amazon Kinesis Data Firehose は、Amazon S3 などの送信先にリアルタイムストリーミングデータを配信するフルマネージドサービスです。Amazon Athena では、標準 SQL を使用して未加工のデータに対してアドホックなクエリを実行できます。

    🔧 このモジュールでは、Kinesis Data Firehose の配信ストリームを作成して Kinesis ストリームから Amazon S3 バケットにデータを配信しました。Athena を使用して、S3 のデータにクエリを実行しました。