AWS Compute Blog

Indexing Amazon DynamoDB Content with Amazon Elasticsearch Service Using AWS Lambda

September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.

Stephan Hadinger
Sr Mgr, Solutions Architecture

Mathieu Cadet Account Representative

NOTE: It was recently brought to our attention that this post contains instructions that reference a now deprecated Lambda blueprint. We are in the process of updating this post to correct this.

A lot of AWS customers have adopted Amazon DynamoDB for its predictable performance and seamless scalability. The main querying capabilities of DynamoDB are centered around lookups using a primary key. However, there are certain times where richer querying capabilities are required. Indexing the content of your DynamoDB tables with a search engine such as Elasticsearch would allow for full-text search.

In this post, we show how you can send changes to the content of your DynamoDB tables to an Amazon Elasticsearch Service (Amazon ES) cluster for indexing, using the DynamoDB Streams feature combined with AWS Lambda.

 

Architectural overview

Here’s a high-level overview of the architecture:

DynamoDB Streams to Elasticsearch bridge

We’ll cover the main steps required to put this bridge in place:

  1. Choosing the DynamoDB tables to index and enabling DynamoDB Streams on them.
  2. Creating an IAM role for accessing the Amazon ES cluster.
  3. Configuring and enabling the Lambda blueprint.

 

Choosing the DynamoDB table to index

In this post, you look at indexing the content of a product catalog in order to provide full-text search capabilities. You’ll index the content of a DynamoDB table called all_products, which is acting as the catalog of all products.

Here’s an example of an item stored in that table:

{
  "product_id": "B016JOMAEE",
  "name": "Serverless Single Page Apps: Fast, Scalable, and Available",
  "category": "ebook",
  "description": "AWS Lambda - A Guide to Serverless Microservices
                  takes a comprehensive look at developing 
                  serverless workloads using the new
                  Amazon Web Services Lambda service.",
  "author": "Matthew Fuller",
  "price": 15.0,
  "rating": 4.8
}

Enabling DynamoDB Streams

In the DynamoDB console, enable the DynamoDB Streams functionality on the all_products table by selecting the table and choosing Manage Stream.

Enabling DynamoDB Streams

Multiple options are available for the stream. For this use case, you need new items to appear in the stream; choose either New image or New and old images. For more information, see Capturing Table Activity with DynamoDB Streams.

DynamoDB Streams Options

After the stream is set up, make a good note of the stream ARN. You’ll need that information later, when configuring the access permissions.

Finding a DynamoDB Stream ARN

Creating a new IAM role

The Lambda function needs read access to the DynamoDB stream just created. In addition, the function also requires access to the Amazon ES cluster to submit new records for indexing.

In the AWS Identity and Access Management (IAM) console, create a new role for the Lambda function and call it ddb-elasticsearch-bridge.

Creating new IAM role

As this role will be used by the Lambda function, choose AWS Lambda from the AWS Service Roles list.

Attaching policy to the role

On the following screens, choose the AWSLambdaBasicExecutionRole managed policy, which allows the Lambda function to send logs to Amazon CloudWatch Logs.

Configuring access to the Amazon ES cluster

First, you need a running Amazon ES cluster. In this example, create a search domain called inventory. After the domain has been created, note its ARN:

Attaching policy to the role

In the IAM console, select the ddb-elasticsearch-bridge role created earlier and add two inline policies to that role:

Attaching policy to the role

Here’s the policy to add to allow the Lambda code to push new documents to Amazon ES (replace the resource ARN with the ARN of your Amazon ES cluster):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:ESHttpPost"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:us-east-1:0123456789:domain/inventory/*"
        }
    ]
}

Important: you need to add /* to the resource ARN as depicted above.

Next, add a second policy for read access to the DynamoDB stream (replace the resource ARN with the ARN of your DynamoDB stream):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:dynamodb:us-east-1:0123456789:table/all_products/stream/2016-02-16T23:13:07.600"
            ]
        }
    ]
}

Enabling the Lambda blueprint

When you log into the Lambda console and choose Create a Lambda Function, you are presented with a list of blueprints to use. Select the blueprint called dynamodb-to-elasticsearch.

dynamodb-to-elasticsearch blueprint

Next, select the DynamoDB table all_products as the event source:

Lambda event source

Then, customize the Lambda code to specify the Elasticsearch endpoint:

Customizing the blueprint

Finally, select the ddb-elasticsearch-bridge role created earlier to give the Lambda function the permissions required to interact with DynamoDB and the Amazon ES cluster:

Choosing a role

Testing the result

You’re all set!

After a few records have been added to your DynamoDB table, you can go back to the Amazon ES console and validate that a new index for your items has been automatically created:

Amazon ES indices

Playing with Kibana (Optional)

Elasticsearch is commonly used with Kibana for visual exploration of data.

To start querying the indexed data, create an index pattern in Kibana. Use the name of the DynamoDB table as an index pattern:

Kibana Index pattern

Kibana automatically determines the best type for each field:

Kibana Index pattern

Use a simple query to search the product catalog for all items in the category book containing the word aws in any field:

Kibana Index pattern

Other considerations

Indexing pre-existing content

The solution presented earlier is ideal to ensure that new data is indexed as soon it is added to the DynamoDB table. But what about pre-existing data stored in the table?

Luckily, the Lambda function used earlier can also be used to process data from an Amazon Kinesis stream, as long as the format of the data is similar to the DynamoDB Streams records.

Provided that you have an Amazon Kinesis stream set up as an additional input source for the Lambda code above, you can use the (very naive) sample Python3 code below to read the entire content of a DynamoDB table and push it to an Amazon Kinesis stream called ddb-all-products for indexing in Amazon ES.

import json
import boto3
import boto3.dynamodb.types

# Load the service resources in the desired region.
# Note: AWS credentials should be passed as environment variables
# or through IAM roles.
dynamodb = boto3.resource('dynamodb', region_name="us-east-1")
kinesis = boto3.client('kinesis', region_name="us-east-1")

# Load the DynamoDB table.
ddb_table_name = "all_products"
ks_stream_name = "ddb-all-products"
table = dynamodb.Table(ddb_table_name)

# Get the primary keys.
ddb_keys_name = [a['AttributeName'] for a in table.attribute_definitions]

# Scan operations are limited to 1 MB at a time.
# Iterate until all records have been scanned.
response = None
while True:
    if not response:
        # Scan from the start.
        response = table.scan()
    else:
        # Scan from where you stopped previously.
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])

    for i in response["Items"]:
        # Get a dict of primary key(s).
        ddb_keys = {k: i[k] for k in i if k in ddb_keys_name}
        # Serialize Python Dictionaries into DynamoDB notation.
        ddb_data = boto3.dynamodb.types.TypeSerializer().serialize(i)["M"]
        ddb_keys = boto3.dynamodb.types.TypeSerializer().serialize(ddb_keys)["M"]
        # The record must contain "Keys" and "NewImage" attributes to be similar
        # to a DynamoDB Streams record. Additionally, you inject the name of
        # the source DynamoDB table in the record so you can use it as an index
        # for Amazon ES.
        record = {"Keys": ddb_keys, "NewImage": ddb_data, "SourceTable": ddb_table_name}
        # Convert the record to JSON.
        record = json.dumps(record)
        # Push the record to Amazon Kinesis.
        res = kinesis.put_record(
            StreamName=ks_stream_name,
            Data=record,
            PartitionKey=i["product_id"])
        print(res)

    # Stop the loop if no additional records are
    # available.
    if 'LastEvaluatedKey' not in response:
        break

Note: In the code example above, you are passing the name of the source DynamoDB table as an extra record attribute SourceTable. The Lambda function uses that attribute to build the Amazon ES index name. Another approach for passing that information is tagging the Amazon Kinesis stream.

Now, create the Amazon Kinesis stream ddb-all-productsand then add permissions to the ddb-elasticsearch-bridge role in IAM to allow the Lambda function to read from the stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:DescribeStream"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:0123456789:stream/ddb-all-products"
            ]
        }
    ]
}

Finally, set the Amazon Kinesis stream as an additional input source to the Lambda function:

Amazon Kinesis input source

Neat tip: Doing a full re-index of the content this way will not create duplicate entries in Amazon ES.

Paying attention to attribute types

With DynamoDB, you can use different types for the same attribute on different records, but Amazon ES expects a given attribute to be of only one type. Similarly, changing the type of an existing attribute after it has been indexed in Amazon ES causes problems and some searches won’t work as expected.

In these cases, you must rebuild the Amazon ES index. For more information, see Reindexing Your Data in the Elasticsearch documentation.

Conclusion

In this post, you have seen how you can use AWS Lambda with DynamoDB to index your table content in Amazon ES as changes happen.

Because you are relying entirely on Lambda for the business logic, you don’t have to deal with servers at any point: everything is managed by the AWS platform in a highly available and scalable fashion. To learn more about Lambda and serverless infrastructures, see the Microservices without the Servers blog post.

Now that you have added full-text search to your DynamoDB table, you might be interested in exposing its content through a small REST API. For more information, see Using Amazon API Gateway as a proxy for DynamoDB.