AWS Compute Blog
Indexing Amazon DynamoDB Content with Amazon Elasticsearch Service Using AWS Lambda
September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.
NOTE: It was recently brought to our attention that this post contains instructions that reference a now deprecated Lambda blueprint. We are in the process of updating this post to correct this.
A lot of AWS customers have adopted Amazon DynamoDB for its predictable performance and seamless scalability. The main querying capabilities of DynamoDB are centered around lookups using a primary key. However, there are certain times where richer querying capabilities are required. Indexing the content of your DynamoDB tables with a search engine such as Elasticsearch would allow for full-text search.
In this post, we show how you can send changes to the content of your DynamoDB tables to an Amazon Elasticsearch Service (Amazon ES) cluster for indexing, using the DynamoDB Streams feature combined with AWS Lambda.
Architectural overview
Here’s a high-level overview of the architecture:
We’ll cover the main steps required to put this bridge in place:
- Choosing the DynamoDB tables to index and enabling DynamoDB Streams on them.
- Creating an IAM role for accessing the Amazon ES cluster.
- Configuring and enabling the Lambda blueprint.
Choosing the DynamoDB table to index
In this post, you look at indexing the content of a product catalog in order to provide full-text search capabilities. You’ll index the content of a DynamoDB table called all_products
, which is acting as the catalog of all products.
Here’s an example of an item stored in that table:
{
"product_id": "B016JOMAEE",
"name": "Serverless Single Page Apps: Fast, Scalable, and Available",
"category": "ebook",
"description": "AWS Lambda - A Guide to Serverless Microservices
takes a comprehensive look at developing
serverless workloads using the new
Amazon Web Services Lambda service.",
"author": "Matthew Fuller",
"price": 15.0,
"rating": 4.8
}
Enabling DynamoDB Streams
In the DynamoDB console, enable the DynamoDB Streams functionality on the all_products
table by selecting the table and choosing Manage Stream.
Multiple options are available for the stream. For this use case, you need new items to appear in the stream; choose either New image or New and old images. For more information, see Capturing Table Activity with DynamoDB Streams.
After the stream is set up, make a good note of the stream ARN. You’ll need that information later, when configuring the access permissions.
Creating a new IAM role
The Lambda function needs read access to the DynamoDB stream just created. In addition, the function also requires access to the Amazon ES cluster to submit new records for indexing.
In the AWS Identity and Access Management (IAM) console, create a new role for the Lambda function and call it ddb-elasticsearch-bridge
.
As this role will be used by the Lambda function, choose AWS Lambda
from the AWS Service Roles
list.
On the following screens, choose the AWSLambdaBasicExecutionRole
managed policy, which allows the Lambda function to send logs to Amazon CloudWatch Logs.
Configuring access to the Amazon ES cluster
First, you need a running Amazon ES cluster. In this example, create a search domain called inventory
. After the domain has been created, note its ARN:
In the IAM console, select the ddb-elasticsearch-bridge
role created earlier and add two inline policies to that role:
Here’s the policy to add to allow the Lambda code to push new documents to Amazon ES (replace the resource ARN with the ARN of your Amazon ES cluster):
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"es:ESHttpPost"
],
"Effect": "Allow",
"Resource": "arn:aws:es:us-east-1:0123456789:domain/inventory/*"
}
]
}
Important: you need to add /*
to the resource ARN as depicted above.
Next, add a second policy for read access to the DynamoDB stream (replace the resource ARN with the ARN of your DynamoDB stream):
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams"
],
"Effect": "Allow",
"Resource": [
"arn:aws:dynamodb:us-east-1:0123456789:table/all_products/stream/2016-02-16T23:13:07.600"
]
}
]
}
Enabling the Lambda blueprint
When you log into the Lambda console and choose Create a Lambda Function, you are presented with a list of blueprints to use. Select the blueprint called dynamodb-to-elasticsearch
.
Next, select the DynamoDB table all_products
as the event source:
Then, customize the Lambda code to specify the Elasticsearch endpoint:
Finally, select the ddb-elasticsearch-bridge
role created earlier to give the Lambda function the permissions required to interact with DynamoDB and the Amazon ES cluster:
Testing the result
You’re all set!
After a few records have been added to your DynamoDB table, you can go back to the Amazon ES console and validate that a new index for your items has been automatically created:
Playing with Kibana (Optional)
Elasticsearch is commonly used with Kibana for visual exploration of data.
To start querying the indexed data, create an index pattern in Kibana. Use the name of the DynamoDB table as an index pattern:
Kibana automatically determines the best type for each field:
Use a simple query to search the product catalog for all items in the category book
containing the word aws
in any field:
Other considerations
Indexing pre-existing content
The solution presented earlier is ideal to ensure that new data is indexed as soon it is added to the DynamoDB table. But what about pre-existing data stored in the table?
Luckily, the Lambda function used earlier can also be used to process data from an Amazon Kinesis stream, as long as the format of the data is similar to the DynamoDB Streams records.
Provided that you have an Amazon Kinesis stream set up as an additional input source for the Lambda code above, you can use the (very naive) sample Python3 code below to read the entire content of a DynamoDB table and push it to an Amazon Kinesis stream called ddb-all-products
for indexing in Amazon ES.
import json
import boto3
import boto3.dynamodb.types
# Load the service resources in the desired region.
# Note: AWS credentials should be passed as environment variables
# or through IAM roles.
dynamodb = boto3.resource('dynamodb', region_name="us-east-1")
kinesis = boto3.client('kinesis', region_name="us-east-1")
# Load the DynamoDB table.
ddb_table_name = "all_products"
ks_stream_name = "ddb-all-products"
table = dynamodb.Table(ddb_table_name)
# Get the primary keys.
ddb_keys_name = [a['AttributeName'] for a in table.attribute_definitions]
# Scan operations are limited to 1 MB at a time.
# Iterate until all records have been scanned.
response = None
while True:
if not response:
# Scan from the start.
response = table.scan()
else:
# Scan from where you stopped previously.
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
for i in response["Items"]:
# Get a dict of primary key(s).
ddb_keys = {k: i[k] for k in i if k in ddb_keys_name}
# Serialize Python Dictionaries into DynamoDB notation.
ddb_data = boto3.dynamodb.types.TypeSerializer().serialize(i)["M"]
ddb_keys = boto3.dynamodb.types.TypeSerializer().serialize(ddb_keys)["M"]
# The record must contain "Keys" and "NewImage" attributes to be similar
# to a DynamoDB Streams record. Additionally, you inject the name of
# the source DynamoDB table in the record so you can use it as an index
# for Amazon ES.
record = {"Keys": ddb_keys, "NewImage": ddb_data, "SourceTable": ddb_table_name}
# Convert the record to JSON.
record = json.dumps(record)
# Push the record to Amazon Kinesis.
res = kinesis.put_record(
StreamName=ks_stream_name,
Data=record,
PartitionKey=i["product_id"])
print(res)
# Stop the loop if no additional records are
# available.
if 'LastEvaluatedKey' not in response:
break
Note: In the code example above, you are passing the name of the source DynamoDB table as an extra record attribute SourceTable
. The Lambda function uses that attribute to build the Amazon ES index name. Another approach for passing that information is tagging the Amazon Kinesis stream.
Now, create the Amazon Kinesis stream ddb-all-products
and then add permissions to the ddb-elasticsearch-bridge
role in IAM to allow the Lambda function to read from the stream:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:Get*",
"kinesis:DescribeStream"
],
"Resource": [
"arn:aws:kinesis:us-east-1:0123456789:stream/ddb-all-products"
]
}
]
}
Finally, set the Amazon Kinesis stream as an additional input source to the Lambda function:
Neat tip: Doing a full re-index of the content this way will not create duplicate entries in Amazon ES.
Paying attention to attribute types
With DynamoDB, you can use different types for the same attribute on different records, but Amazon ES expects a given attribute to be of only one type. Similarly, changing the type of an existing attribute after it has been indexed in Amazon ES causes problems and some searches won’t work as expected.
In these cases, you must rebuild the Amazon ES index. For more information, see Reindexing Your Data in the Elasticsearch documentation.
Conclusion
In this post, you have seen how you can use AWS Lambda with DynamoDB to index your table content in Amazon ES as changes happen.
Because you are relying entirely on Lambda for the business logic, you don’t have to deal with servers at any point: everything is managed by the AWS platform in a highly available and scalable fashion. To learn more about Lambda and serverless infrastructures, see the Microservices without the Servers blog post.
Now that you have added full-text search to your DynamoDB table, you might be interested in exposing its content through a small REST API. For more information, see Using Amazon API Gateway as a proxy for DynamoDB.