このモジュールでは、AWS Lambda を使用して、作成済みの Amazon Kinesis ストリーム wildrydes からのデータを処理します。Lambda 関数を作成および設定してストリームからデータを読み取り、取得したレコードを Amazon DynamoDB に書き込みます。

モジュールの所要時間: 25 分

使用するサービス:
• Amazon DynamoDB
• AWS IAM
• AWS Lambda
• Amazon Kinesis Data Streams

serverless-real-time-data-processing-mod-3
  • ステップ 1:Amazon DynamoDB テーブルを作成する

    Amazon DynamoDB コンソールを使用して、新しい DynamoDB テーブルを作成します。UnicornSensorData という名前のテーブルに、Name という名前で文字列タイプのパーティションキーと、StatusTime という名前で文字列タイプのソートキーを追加します。その他の設定にはすべてデフォルトを使用します。

    テーブルを作成したら、Amazon リソースネーム (ARN) を書き留めておきます。これは次のセクションで使用します。


    a.AWS マネジメントコンソールの [サービス] で、「データベース」の下にある [DynamoDB] を選択します。

    b.[テーブルの作成] をクリックします。

    c.[テーブル名] に「UnicornSensorData」と入力します。

    d.[パーティションキー] に「 Name 」と入力し、キータイプに [文字列] を選択します。

    e.[ソートキーの追加] チェックボックスをオンにします。[ソートキー] に「StatusTime」と入力し、キータイプに [文字列] を選択します。

    f.[デフォルト設定の使用] ボックスをオンのままにし、[作成] を 選択 します。

    g.新規テーブルのプロパティの [テーブルの詳細] までスクロールし、Amazon リソースネーム (ARN) を書き留めておきます。このリソースネームは次のセクションで使用します。

    3_stream-processing-dynamodb-create

    (クリックして拡大)

    3_stream-processing-dynamodb-create
  • ステップ 2:Lambda 関数の IAM ロールを作成する

    IAM コンソールを使用して新しいロールを作成します。ロールに「WildRydesStreamProcessorRole」という名前を付け、ロールタイプに [Lambda] を選択します。このロールに「AWSLambdaKinesisExecutionRole」という管理ポリシーを添付して、関数に対して Amazon Kinesis Streams からのデータ読み込みと、Amazon CloudWatch Logs へのログ記録についてのアクセス許可を付与できるようにします。前のセクションで作成した DynamoDB テーブルへの DynamoDB BatchWriteItem アクセスを許可するポリシーを作成し、新しく作成したロールに添付します。


    a.AWS コンソールの [Servies] をクリックし、「セキュリティ、アイデンティティ、コンプライアンス」セクションにある [IAM] を選択します。

    b.左側のナビゲーションから [ポリシー] を選択し、[ポリシーの作成] をクリックします。

    c.ビジュアルエディタを使用して、前のセクションで作成した DynamoDB テーブルへのアクセス権を Lambda 関数に与える IAM ポリシーを作成します。最初に [サービスの選択] をクリックし、[サービスの検索] 検索ボックスに「DynamoDB」と入力し、表示された [DynamoDB] をクリックします。

    d.[アクション] を選択して [フィルタアクション] 検索ボックスに「BatchWriteItem」と入力し、表示された [BatchWriteItem] チェックボックスをオンにします。

    e.[リソース] を選択します。[table] の [ARN の追加] をクリックして、[Region]、[Account]、[Table name] を指定し、前のセクションで作成した DynamoDB テーブルの ARN を構成します。

    f.[Region] には、前のセクションで DynamoDB テーブルを作成した AWS リージョンを入力します (例: us-east-1)。

    g.[Account] には、自分の AWS アカウント ID を 12 桁の番号で入力します (例: 123456789012)。AWS マネジメントコンソールで AWS アカウント ID 番号を確認するには、ナビゲーションバーの右上にある [サポート] を選択し、[サポートセンター] をクリックします。 現在サインインしている ID が、右上隅のサポートメニューの下に表示されます。

    h.[テーブル名] に「UnicornSensorData」と入力します。

    [table の ARN の指定] フィールドの ARN は「arn:aws:dynamodb:us-east-1:513094544575:table/UnicornSensorData」のようになっているはずです。

    i.[追加] をクリックします。

    j.[Review policy] をクリックします。

    k.[名前] フィールドに「WildRydesDynamoDBWritePolicy」と入力します。

    l.[Create policy] をクリックします。

    m.左側のナビゲーションから [ロール] を選択し、[ロールの作成] をクリックします。

    n.[AWS サービス] セクションで、ロールタイプに [Lambda] を選択します。

    o.[次のステップ: アクセス権限] をクリックします。

    p. [フィルタ] テキストボックスに「AWSLambdaKinesisExecutionRole」と入力し、そのロールの隣にあるチェックボックスをオンにします。

    q.[フィルタ] テキストボックスに「WildRydesDynamoDBWritePolicy」と入力し、そのロールの隣にあるチェックボックスをオンにします。

    r.[Next: Tags] 、続いて [Next: Review] をクリックします。

    s.[ロール名] に「WildRydesStreamProcessorRole」と入力します。

    t.[ロールの作成] をクリックします。

  • ステップ 3:ストリームを処理する Lambda 関数を作成する

    wildrydes ストリームで新しいレコードが利用可能になるとトリガされる、「WildRydesStreamProcessor」という名前の Lambda 関数を作成します。関数のコードには、あらかじめ用意された index.js を使用します。キーが「TABLE_NAME」、値が「UnicornSensorData」の環境変数を作成します。前のセクションで作成したロール「WildRydesStreamProcessor」を使用するように関数を設定します。


    a.AWS マネジメントコンソールの [サービス] で、[コンピューティング] の下にある [Lambda] を選択します。

    b.[関数の作成] をクリックします。

    c.[名前] フィールドに「WildRydesStreamProcessor」と入力します。

    d.[Node.js 10.x] を [Runtime] で選択します。

    e.[既存のロール] のドロップダウンの一覧から [WildRydesStreamProcessorRole] を選択します。

    f.[関数の作成] をクリックします。

    g.[関数コード] セクションまで下にスクロールします。

    h.以下の JavaScript コードをコピーして、コードエディタに貼り付けます。

    'use strict';
    
    const AWS = require('aws-sdk');
    const dynamoDB = new AWS.DynamoDB.DocumentClient();
    const tableName = process.env.TABLE_NAME;
    
    exports.handler = function(event, context, callback) {
      const requestItems = buildRequestItems(event.Records);
      const requests = buildRequests(requestItems);
    
      Promise.all(requests)
        .then(() => callback(null, `Delivered ${event.Records.length} records`))
        .catch(callback);
    };
    
    function buildRequestItems(records) {
      return records.map((record) => {
        const json = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        const item = JSON.parse(json);
    
        return {
          PutRequest: {
            Item: item,
          },
        };
      });
    }
    
    function buildRequests(requestItems) {
      const requests = [];
    
      while (requestItems.length > 0) {
        const request = batchWrite(requestItems.splice(0, 25));
    
        requests.push(request);
      }
    
      return requests;
    }
    
    function batchWrite(requestItems, attempt = 0) {
      const params = {
        RequestItems: {
          [tableName]: requestItems,
        },
      };
    
      let delay = 0;
    
      if (attempt > 0) {
        delay = 50 * Math.pow(2, attempt);
      }
    
      return new Promise(function(resolve, reject) {
        setTimeout(function() {
          dynamoDB.batchWrite(params).promise()
            .then(function(data) {
              if (data.UnprocessedItems.hasOwnProperty(tableName)) {
                return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
              }
            })
            .then(resolve)
            .catch(reject);
        }, delay);
      });
    }
    3_stream-processing-lambda-basic-information

    (クリックして拡大)

    3_stream-processing-lambda-basic-information

    i.[環境変数] セクションで、環境変数のキーに「TABLE_NAME」、に「UnicornSensorData」を入力します。

    j.[Basic settings] セクションで、[Timeout] を 1 分に設定します

    k.上方にスクロールし、[Add Trigger]、[Kinesis] の順にクリックします。

    l.[Trigger configuration] セクションで、[wildrydes-summary] ([Kinesis Stream] にある) を選択します。

    m.[バッチサイズ] は 100 のままにし、[開始位置] を「最新」に設定します。

    n.[トリガーの有効化] チェックボックスをオンにします。

    o.[追加] をクリックします。

    p. [保存] をクリックします。

    3_stream-processing-lambda-environment-variables

    (クリックして拡大)

    3_stream-processing-lambda-environment-variables
    3_stream-processing-lambda-basic-settings

    (クリックして拡大)

    3_stream-processing-lambda-basic-settings
    3_stream-processing-trigger-designer

    (クリックして拡大)

    3_stream-processing-trigger-designer
  • ステップ 4:Lambda 関数を監視する

    トリガーにより Lambda 関数が正しく実行されているか検証します。関数によって発行されたメトリクスを表示し、Lambda 関数からの出力を確認します。


    a.プロデューサーを実行して、ユニコーン名によるストリームへのセンサーデータの出力を開始します。

    ./producer -name Rocinante

    b.WildRydesStreamProcessor Lambda 関数に戻ります。[Monitoring] タブをクリックし、関数のモニタリングに使用できるメトリクスを確認します。[View Logs in CloudWatch] をクリックし、関数のログ出力を確認します。

  • ステップ 5:DynamoDB テーブルに対するクエリを発行する

    AWS マネジメントコンソールを使用して、DynamoDB テーブルに対して特定のユニコーンのデータを問い合わせます。プロデューサーを使用して、識別可能なユニコーンの名前が付けられたデータを作成し、そのレコードが存在していることを確認します。


    a.[サービス] の「データベース」セクションにある [DynamoDB] を選択します。

    b.左側のナビゲーションで [テーブル] を選択します。

    c.[UnicornSensorData] を選択します。

    d.[Items] タブをクリックします。ここには、プロデューサーを実行している各ユニコーンに対して、毎分ごとの各データポイントが表示されます。

    3_stream-processing-dynamodb-items
    3_stream-processing-dynamodb-items
  • まとめとヒント


    🔑 Lambda 関数をサブスクライブして、Kinesis ストリームからレコードのバッチを自動的に読み取り、ストリームでレコードを検出した場合にそれを処理できます。

    🔧 このモジュールではユニコーンのサマリデータの Kinesis ストリームからデータを読み取り、各行を DynamoDB に保存する Lambda 関数を作成しました。

次のモジュールでは、Amazon Kinesis Data Firehose を作成して、最初のモジュールで作成した Amazon Kinesis ストリームから Amazon Simple Storage Service (Amazon S3) にデータをまとめて配信します。その後、Amazon Athena を使用して、配信した raw データに対するクエリを実行します。