AWS Developer Blog

AWS Chalice now supports Amazon Kinesis and Amazon DynamoDB Streams

Version 1.21.0 of AWS Chalice, a framework for creating serverless applications in Python, adds support for two new event sources in AWS Lambda. You can now configure a Lambda function to be automatically invoked whenever a record is added to an Amazon Kinesis stream or whenever an Amazon DynamoDB table is updated. Chalice automatically handles configuring the event sources and adding the appropriate permissions to allow the various AWS services to invoke your Lambda function.

Let’s look at an example from the “Notifications/Messaging” section from this blog post. We’ll create a Lambda function that examines items added to a DynamoDB table and flags any items that need more attention.

Initial Setup

First we’ll create a new virtual environment and install Chalice using pip. You should have Chalice version 1.21.0 or higher installed. We’re also using Python 3.7 for our sample application.

$ python3 -m venv /tmp/venv37
$ . /tmp/venv37/bin/activate
$ pip install chalice
$ chalice --version
chalice 1.21.0, python 3.7.3, darwin 18.7.0
$ chalice new-project ddb-stream-demo
$ cd ddb-stream-demo

We’ll also be using the AWS Command Line Interface (CLI) to help test our application. You can follow the installation guide to install the AWS CLI.

Application Code

Our sample application will monitor changes to an existing DynamoDB table that models a sample transactions table. The table schema consists of an InvoiceId as our primary key, and a TransactionId for our sort key. An item also contains an Amount field which contains a numerical value. We want to check for any items that have an Amount of 0. For example:

InvoiceId TransactionId Amount Expected
12345 Client1_12345 50 OK
12345 Client2_12345 100 OK
12345 Client4_12345 0 FLAG

For our sample application, we’ll flag an item by emitting an error log message, but you could also take other actions such as publishing to an Amazon SNS topic or sending an event to Amazon EventBridge.

We’ll update our app.py file with our new event handler. Below is the contents of our app.py file.

import os
from chalice import Chalice

app = Chalice(app_name='ddb-stream-demo')
app.debug = True


@app.on_dynamodb_record(stream_arn=os.environ['TABLE_STREAM_ARN'])
def on_table_update(event):
    for record in event:
        process_record(record)


def process_record(record):
    # We're not interested in deleted items.
    if record.event_name == 'DELETE':
        return
    new_item = record.new_image
    try:
        amount = int(new_item['Amount']['N'])
    except KeyError:
        pass
    if amount == 0:
        # Error processing logic here.  We could
        # send an SNS notification, emit a CloudWatch Event, etc.
        # For demo purposes, we're just logging when this happens.
        app.log.error("Amount of 0 found for record %s",
                      record.keys)

We’re creating a Lambda function, on_table_update, that will be invoked whenever new records are sent to a DynamoDB stream. We configure this using the @app.on_dynamodb_record decorator. Instead of hard-coding the stream ARN in our app.py file, we’re retrieving this value from the TABLE_STREAM_ARN environment variable. We’ll configure this variable in the next section when we deploy our application.

Every time our Lambda function is invoked, the event input parameter can contain multiple records. Each record is associated with a change to our DynamoDB table. The event input argument is iterable, which lets you extract each record in the event. The process_record function inspects a single record. First, we make sure that the record isn’t for an item deletion by checking the record.event_name attribute. We’ll then try to examine the Amount key and verify its amount is not 0. If it is 0, we’ll log an error message, which we can examine through Amazon CloudWatch Logs.

Deploying and Testing our Application

We split our deployment into two processes. The first process is for managing the resources needed by our application. This contains the Amazon DynamoDB table and its stream configuration. The second process is for the deployment of our Chalice application. To manage the deployment of our DynamoDB table, we’ll use the AWS CLI and AWS CloudFormation. Create a resources.yaml file with the following contents.

AWSTemplateFormatVersion: "2010-09-09"
Description:
  Resources used by our application.
Parameters:
  ReadCapacity:
    Type: Number
    Default: 5
  WriteCapacity:
    Type: Number
    Default: 5
Resources:
  MyApplicationTable:
    Type: AWS::DynamoDB::Table
    Properties:
      KeySchema:
        - AttributeName: InvoiceId
          KeyType: HASH
        - AttributeName: TransactionId
          KeyType: RANGE
      AttributeDefinitions:
        - AttributeName: InvoiceId
          AttributeType: S
        - AttributeName: TransactionId
          AttributeType: S
      ProvisionedThroughput:
        ReadCapacityUnits: !Ref ReadCapacity
        WriteCapacityUnits: !Ref WriteCapacity
      StreamSpecification:
        StreamViewType: NEW_IMAGE
Outputs:
  TableName:
    Value: !Ref MyApplicationTable
  StreamArn:
    Value: !GetAtt MyApplicationTable.StreamArn

This template is creating a DynamoDB table with an InvoiceId primary key and a TransactionId sort key. We also provide a stream specification with a stream view type of NEW_IMAGE which will include the new DynamoDB item as part of the stream record. To deploy this application we’ll use the AWS CLI.

$ aws cloudformation deploy --template-file resources.yaml --stack-name AppResources

Once this deployment is complete, we’ll map the DynamoDB stream ARN to an environment variable in our application. We can retrieve the stream ARN using the describe-stacks command.

$ aws cloudformation describe-stacks  --stack-name AppResources \
  --output text
  --query "Stacks[].Outputs[?OutputKey=='StreamArn'][] | [0].OutputValue"

arn:aws:dynamodb:us-west-2:1234:table/AppResources-MyApplicationTable-ABCD/stream/2020-09-30T21:05:20.854

Now we add this ARN value to our .chalice/config.json file as the TABLE_STREAM_ARN environment variable.

{
  "stages": {
    "dev": {
      "api_gateway_stage": "api",
      "environment_variables": {
        "TABLE_STREAM_ARN": "arn:aws:dynamodb:us-west-2:1234:table/AppResources-MyApplicationTable-ABCD/stream/2020-09-30T21:05:20.854"
      }
    }
  },
  "version": "2.0",
  "app_name": "ddb-stream-demo"
}

We’re now ready to deploy our application using the chalice deploy command.

$ chalice deploy
Creating deployment package.
Creating IAM role: ddb-stream-demo-dev
Creating lambda function: ddb-stream-demo-dev-on_table_update
Subscribing ddb-stream-demo-dev-on_table_update to DynamoDB stream arn:aws:dynamodb:us-west-2:1234:table/AppResources-MyApplicationTable-ABCD/stream/2020-09-30T21:05:20.854
Resources deployed:
  - Lambda ARN: arn:aws:lambda:us-west-2:1234:function:ddb-stream-demo-dev-on_table_update

Our application is now deployed and ready to test. To test our application, we’ll create several items in our DynamoDB table. This will trigger our Lambda function which will examine each item. We should only see a log message from our Lambda function if we create an item with an Amount of 0. We can extract the DynamoDB table name from the stream ARN we’ve configured in our application, or we can query it directly from the CloudFormation stack outputs.

$ aws cloudformation describe-stacks  --stack-name AppResources \
  --query "Stacks[].Outputs[?OutputKey=='TableName'][] | [0].OutputValue"
  --output text

AppResources-MyApplicationTable-ABCD

We’ll now use the AWS CLI to create new items using the aws ddb put command.

$ TABLE_NAME=AppResources-MyApplicationTable-ABCD
$ echo '{"InvoiceId": "12345", "TransactionId": "Client1_12345", "Amount": 50}' | aws ddb put $TABLE_NAME -
$ echo '{"InvoiceId": "12345", "TransactionId": "Client2_12345", "Amount": 100}' | aws ddb put $TABLE_NAME -
$ echo '{"InvoiceId": "12345", "TransactionId": "Client3_12345", "Amount": 150}' | aws ddb put $TABLE_NAME -
$ echo '{"InvoiceId": "12345", "TransactionId": "Client4_12345", "Amount": 0}' | aws ddb put $TABLE_NAME -

The last item we’ve created should trigger an error log message because it has an Amount of 0. We can confirm this by using the chalice logs command.

$ chalice logs -n on_table_update
2020-09-30 21:39:58.039000 5ff9ce ddb-stream-demo - ERROR - Amount of 0 found for record {'InvoiceId': {'S': '12345'}, 'TransactionId': {'S': 'Client4_12345'}}

We can see our expected log message indicating that one of our items had an Amount of 0.

Next Steps

You can continue to experiment by modifying your app.py file and running chalice deploy. To delete your Lambda function, you can run the chalice delete command. You can also try using the new Kinesis event handler support in Chalice, which has a similar API to DynamoDB streams.

We encourage you to try out this new functionality and let us know what you think!