In this module, you’ll use AWS Lambda to process data from the wildrydes Amazon Kinesis stream created earlier. We’ll create and configure a Lambda function to read from the stream and write records to an Amazon DynamoDB table as they arrive.

Time to complete module: 25 Minutes

Services used:
• Amazon DynamoDB
• AWS IAM
• AWS Lambda
• Amazon Kinesis Data Streams

serverless-real-time-data-processing-mod-3
  • Step 1. Create an Amazon DynamoDB table

    Use the Amazon DynamoDB console to create a new DynamoDB table. Call your table UnicornSensorData and give it a Partition key called Name of type String and a Sort key called StatusTime of type String. Use the defaults for all other settings.

    After you’ve created the table, note the Amazon Resource Name (ARN) for use in the next section.


    a. Go to the AWS Management Console, choose Services then select DynamoDB under Database.

    b. Click Create table.

    c. Enter UnicornSensorData for the Table name.

    d. Enter Name  for the Partition key and select String for the key type.

    e. Tick the Add sort key checkbox. Enter StatusTime for the Sort key and select String for the key type.

    f. Leave the Use default settings box checked and choose Create.

    g. Scroll to the Table details section of your new table's properties and note the Amazon Resource Name (ARN). You will use this in the next step.

    3_stream-processing-dynamodb-create

    (click to zoom)

    3_stream-processing-dynamodb-create
  • Step 2. Create an IAM role for your Lambda function

    Use the IAM console to create a new role. Name WildRydesStreamProcessorRole and select Lambda for the role type. Attach the managed policy called AWSLambdaKinesisExecutionRole to this role in order to grant permissions for your function to read from Amazon Kinesis streams and to log to Amazon CloudWatch Logs. Create a policy that allows DynamoDB BatchWriteItem access to the DynamoDB table created in the last section and attach it to the new role.


    a. From the AWS Console, click on Services and then select IAM in the Security, Identity & Compliance section.

    b. Select Policies from the left navigation and then click Create policy.

    c. Using the Visual editor, we're going to create an IAM policy to allow our Lambda function access to the DynamoDB table created in the last section. To begin, select Service, begin typing DynamoDB in Find a service, and click DynamoDB.

    d. Select Action, begin typing BatchWriteItem in Filter actions, and tick the BatchWriteItem checkbox.

    e. Select Resources, select Add ARN in table, and construct the ARN of the DynamoDB table you created in the previous section by specifying the Region, Account, and Table Name.

    f. In Region, enter the AWS Region in which you created the DynamoDB table in the previous section, e.g.: us-east-1.

    g. In Account, enter your AWS Account ID which is a twelve digit number, e.g.: 123456789012. To find your AWS account ID number in the AWS Management Console, click on Support in the navigation bar in the upper-right, and then click Support Center. Your currently signed ID appears in the upper-right corner below the Support menu.

    h. In Table Name, enter UnicornSensorData.

    You should see your ARN in the Specify ARN for table field and it should look similmar to: arn:aws:dynamodb:us-east-1:513094544575:table/UnicornSensorData

    i. Select Add.

    j. Select Review policy.

    k. Enter WildRydesDynamoDBWritePolicy in the Name field.

    l. Select Create policy.

    m. Select Roles from the left navigation and then select Create role.

    n. Select Lambda for the role type from the AWS service section.

    o. Select Next: Permissions.

    p. Begin typing AWSLambdaKinesisExecutionRole in the Filter text box and check the box next to that role.

    q. Begin typing WildRydesDynamoDBWritePolicy in the Filter text box and check the box next to that role.

    r. Click Next: Tags and then Next: Review.

    s. Enter WildRydesStreamProcessorRole for the Role name.

    t. Click Create role.

  • Step 3. Create a Lambda function to process the stream

    Create a Lambda function called WildRydesStreamProcessor that will be triggered whenever a new record is available in the wildrydes stream. Use the provided index.js implementation for your function code. Create an environment variable with the key TABLE_NAME and the value UnicornSensorData. Configure the function to use the WildRydesStreamProcessor  role created in the previous section.


    a. Go to the AWS Management Console, choose Service then select Lambda under Compute.

    b. Select Create a function.

    c. Enter WildRydesStreamProcessor in the Name field.

    d. Select Node.js 10.x from Runtime.

    e. Select WildRydesStreamProcessorRole from the Existing role dropdown.

    f. Select Create function.

    g. Scroll down to the Function code section.

    h. Copy and paste the JavaScript code below into the code editor.

    'use strict';
    
    const AWS = require('aws-sdk');
    const dynamoDB = new AWS.DynamoDB.DocumentClient();
    const tableName = process.env.TABLE_NAME;
    
    exports.handler = function(event, context, callback) {
      const requestItems = buildRequestItems(event.Records);
      const requests = buildRequests(requestItems);
    
      Promise.all(requests)
        .then(() => callback(null, `Delivered ${event.Records.length} records`))
        .catch(callback);
    };
    
    function buildRequestItems(records) {
      return records.map((record) => {
        const json = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        const item = JSON.parse(json);
    
        return {
          PutRequest: {
            Item: item,
          },
        };
      });
    }
    
    function buildRequests(requestItems) {
      const requests = [];
    
      while (requestItems.length > 0) {
        const request = batchWrite(requestItems.splice(0, 25));
    
        requests.push(request);
      }
    
      return requests;
    }
    
    function batchWrite(requestItems, attempt = 0) {
      const params = {
        RequestItems: {
          [tableName]: requestItems,
        },
      };
    
      let delay = 0;
    
      if (attempt > 0) {
        delay = 50 * Math.pow(2, attempt);
      }
    
      return new Promise(function(resolve, reject) {
        setTimeout(function() {
          dynamoDB.batchWrite(params).promise()
            .then(function(data) {
              if (data.UnprocessedItems.hasOwnProperty(tableName)) {
                return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
              }
            })
            .then(resolve)
            .catch(reject);
        }, delay);
      });
    }
    3_stream-processing-lambda-basic-information

    (click to zoom)

    3_stream-processing-lambda-basic-information

    i. In the Environment variable section, enter an environment variable with Key TABLE_NAME and Value UnicornSensorData.

    j. In the Basic settings section. Set the Timeout to 1 minute.

    k. Scroll up, click Add Trigger, and select Kinesis.

    l. In the Trigger configuration section, select wildrydes-summary from Kinesis Stream.

    m. Leave Batch size set to 100 and Starting position set to Latest.

    n. Select Enable trigger.

    o. Select Add.

    p. Select Save.

    3_stream-processing-lambda-environment-variables

    (click to zoom)

    3_stream-processing-lambda-environment-variables
    3_stream-processing-lambda-basic-settings

    (click to zoom)

    3_stream-processing-lambda-basic-settings
    3_stream-processing-trigger-designer

    (click to zoom)

    3_stream-processing-trigger-designer
  • Step 4. Monitor the Lambda function

    Verify that the trigger is properly executing the Lambda function. View the metrics emitted by the function and inspect the output from the Lambda function.


    a. Run the producer to start emiting sensor data to the stream with a unicorn name.

    ./producer -name Rocinante

    b. Return to the WildRydesStreamProcessor Lambda function. Select the Monitoring tab and explore the metrics available to monitor the function. Select View Logs in CloudWatch to explore the function's log output.

  • Step 5. Query the DynamoDB table

    Using the AWS Management Console, query the DynamoDB table for data for a specific unicorn. Use the producer to create data from a distinct unicorn name and verify those records are persisted.


    a. Select Services then select DynamoDB in the Database section.

    b. Select Tables from the left-hand navigation.

    c. Select UnicornSensorData.

    d. Select the Items tab. Here you should see each per-minute data point for each Unicorn for which you're running a producer.

    3_stream-processing-dynamodb-items
    3_stream-processing-dynamodb-items
  • Recap & Tips


    🔑 You can subscribe Lambda functions to automatically read batches of records off your Kinesis stream and process them if records are detected on the stream.

    🔧 In this module, you’ve created a Lambda function that reads from the Kinesis stream of summary unicorn data and saves each row to DynamoDB.

In the next module, you’ll create an Amazon Kinesis Data Firehose to deliver data from the Amazon Kinesis stream created in the first module to Amazon Simple Storage Service (Amazon S3) in batches. You’ll then use Amazon Athena to run queries against our raw data in place.