Amazon DynamoDB is a non-relational key/value store database that provides incredible single-digit millisecond response times for reading or writing, and is unbounded by scaling issues. But as in any key/value store, it can be tricky to store data in a way that allows you to retrieve it efficiently. The most efficient method is to fetch the exact key of the item that you’re looking for. But if you need to search for data based on some attribute that isn’t part of the sorting key, you might wonder how to get the exact search result in the most cost-effective and efficient way.
DynamoDB provides filter expressions as one potential solution that you can use to refine the results of a Query operation. Although filtering is done on the server side before results are sent back, the read costs are calculated on the Query operation before the filter is applied. In some cases, the cost may be too high.
In this post, we discuss an alternative approach to querying on multiple attributes in DynamoDB, which is to concatenate attributes on a key. This approach allows you to get exactly the item you want, and it works great when you have a read pattern that requires all attribute values that would exist in the key to be present when you fetch an item from your DynamoDB table.
To query an item on multiple attributes without filter expressions, you must create a new global secondary index (GSI) with a new attribute as the partition key that contains the concatenation of the attributes for your query. The issue with this method is that the partition key for the GSI is a new attribute and is empty for all items in the table. If a partition key for a GSI has no value, it doesn’t exist in the index, and so the new GSI that you create is empty.
The solution is to retrofit the table with the concatenated attribute values for the new attribute so that your GSI is populated.
Problem in-depth
Suppose that you have the following item:
{
id: "1234567890abcdef",
resourceId: "1234567890abcdef",
resourceName: "thing1",
action: "viewed",
accessedBy: "joe1",
timestamp: "2017-04-01T00:00.000"
}
This item exists in the following table:
table-ResourceAccessAudit
id (PrimaryKey) |
resourceId |
resourceName |
action |
accessedBy |
timestamp |
111aaa |
123bbb |
Thing1 |
viewed |
joe1 |
2017-05-01T00:00:00.000 |
111aab |
123bbb |
Thing1 |
viewed |
jane |
2017-05-01T00:00:01.000 |
111aac |
123ccc |
Thing2 |
edited |
jane |
2017-05-01T00:00:25.000 |
And the Java model is defined as follows:
@DynamoDBTable(tableName = "table-ResourceAccessAudit")
public class Audit {
private String id;
private String resourceId;
private String resourceName;
private String action;
private String accessedBy;
private String timestamp;
@DynamoDBHashKey
public String getId() {return id;}
public void setId(String id) {this.id = id;}
public String getResourceId() {return resourceId;}
public void setResourceId(String resourceId) {this.resourceId = resourceId;}
public String getResourceName() {return resourceName;}
public void setResourceName(String resourceName) {this.resourceName = resourceName;}
public String getAction() {return action;}
public void setAction(String action) {this.action = action;}
public String getAccessedBy() {return accessedBy;}
public void setAccessedBy(String accessedBy) {this.accessedBy = accessedBy;}
public String getTimestamp() {return timestamp;}
public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}
This table stores audit information about when users access a resource. It records the method of access and a time stamp of when the resource was accessed. From the table, you need to determine what times a user has accessed a resource in a certain way. For example, what times has joe1
viewed resource 123bbb
?
As mentioned before, to perform the query without scanning or using filter expressions, you must create a GSI that has a partition key based on the attributes that you want to look up. But you can’t create a partition key for an index that spans multiple attributes.
In this example, the solution is to create a new field that consists of multiple attributes for your key. You can take the string values of the resourceId
, action
, and accessedBy
as the partition key and select timestamp
as the sort key.
You need to create a new attribute named resourceId-Action-AccessedBy
to contain these values and reference it as your partition key. The long attribute name in this example is used for readability. In a real use case with large datasets, you should use shorter attribute names because attribute names count towards the total data size used.
The item mentioned earlier now becomes the following:
{
id: "1234567890abcdef",
resourceId-Action-AccessedBy: "1234567890abcdef-viewed-joe1",
resourceId: "123456789abcdef",
resourceName: "thing1",
action: "viewed",
accessedBy: "joe1",
timestamp: "2017-04-01T00:00.000"
}
The table now becomes the following:
table-ResourceAccessAudit
id (PrimaryKey) |
resourceId-Action-AccessedBy (GSI HashKey) |
resourceId |
resourceName |
action |
accessedBy |
timestamp (GSI SortKey) |
111aaa |
|
123bbb |
Thing1 |
viewed |
joe1 |
2017-05-01T00:00:00.000 |
111aab |
|
123bbb |
Thing1 |
viewed |
jane |
2017-05-01T00:00:01.000 |
111aac |
|
123ccc |
Thing2 |
edited |
jane |
2017-05-01T00:00:25.000 |
111aad |
123ccc-viewed-bill |
123ccc |
Thing2 |
viewed |
bill |
2017-05-01T00:01:05.000 |
111aae |
123ccc-viewed-joe1 |
123ccc |
Thing2 |
viewed |
joe1 |
2017-05-01T00:01:48.000 |
And the Java model is now as follows:
@DynamoDBTable(tableName = "table-ResourceAccessAudit")
public class Audit {
private String id;
private String resourceId;
private String resourceName;
private String action;
private String accessedBy;
private String timestamp;
@DynamoDBHashKey
public String getId() {return id;}
public void setId(String id) {this.id = id;}
@DynamoDBIndexHashKey(globalSecondaryIndexName = "GSI")
@DynamoDBAttribute(attributeName = "resourceId-Action-AccessedBy")
public String getResourceIdAndActionAndAccessedBy() {return StringUtils.joinWith("-", {this.resourceId, this.action, this.accessedBy});}
public String getResourceId() {return resourceId;}
public void setResourceId(String resourceId) {this.resourceId = resourceId;}
public String getResourceName() {return resourceName;}
public void setResourceName(String resourceName) {this.resourceName = resourceName;}
public String getAction() {return action;}
public void setAction(String action) {this.action = action;}
public String getAccessedBy() {return accessedBy;}
public void setAccessedBy(String accessedBy) {this.accessedBy = accessedBy;}
@DynamoDBIndexRangeKey(globalSecondaryIndexName = "GSI")
public String getTimestamp() {return timestamp;}
public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}
Now you can get items where a resource ID, user, and action are given. You can also filter on the range key for the year, month, day, etc., or for the original use case: What times has joe1
viewed resource 123bbb
? In Java, the query looks like the following example:
//Get all times joe1 viewed 123bbb.
Index newIndex = table.getIndex("GSI");
ItemCollection<QueryOutcome> queryResults = newIndex.query(String.join("-", "123bbb", "viewed resource", "joe1"));
//Get all times joe1 viewed 123bbb for March 2017.
ItemCollection<QueryOutcome> queryResultsForMarch = newIndex.query(
String.join("-", "123bbb", "viewed resource", "joe1"),
new RangeKeyCondition("timestamp").beginsWith("2017-03"));
But because this is a new attribute, it doesn’t exist for pre-existing items, which causes many items to not be included in the index. You must create a retrofitting script to generate the data for this new attribute on existing items so that they are included in the index. If this isn’t done, the query that you created the GSI for returns incomplete results. If a query is performed for all the times that jane
has edited 123ccc
, no results are returned. The previous table shows how the data in the table looks without retrofitting the existing items with data for the new attribute.
Retrofitting the table
There are two approaches for retrofitting the new attribute with data: an in-place update or updating to a new table.
The in-place approach is generally simpler to write. It just updates the items in place without regard to existing reads and writes to the table, and it requires no new services or products. This is a good option for a lightly accessed table and a small number or items. When you write the item back to the table with the DynamoDBMapper
, the new attribute is written automatically, and the existence of the new attribute makes it available in the new GSI.
The other approach for retrofitting is to update to a new table. You do this by writing items to a new table and writing a replicator that listens to the DynamoDB stream for the table. It listens for writes and updates in the original table and performs the write and update if the items exist in the new table. After the new table is created and replicating and you verify that the new items are as expected, you can switch your code to use the new table.
This is a good approach if you have a large number of items that take too long to update in place, or if you have heavy read/write volume on the table and don’t want to increase your throughput for an in-place update. This approach is available as a package and is documented in the DynamoDB Developer Guide.
You can also create your own replicator by using AWS Lambda, and writing one Lambda function to process writes from the table’s stream and write them to a new table. Then, create a second Lambda function to scan and write items that don’t yet exist in the new table. The benefit with this approach is that there is no infrastructure to manage because Lambda functions are serverless.
Before deciding which option to use, you should consider the Read/Write Unit costs and your table’s configured throughput.
Update-in-place approach
For this example, we use the simpler approach of updating in place with a script. The following shows the Java solution.
Java solution
package com.amazon.brass.scripts;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBIndexRangeKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBScanExpression;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedParallelScanList;
import org.apache.commons.lang3.StringUtils;
public class UpdateInPlace<T> {
public static final String CREDENTIALS_PROPERTIES = "credentials.properties";
public static final String REGION = "us-west-2";
private BlockingQueue<T> queue = new LinkedBlockingQueue<>(10000);
protected DynamoDBMapper mapper;
protected AtomicLong itemsScanned;
private SegmentScanTask scanner;
private UpdateWorker updater;
/**
* Scans a table associated with an annotated DynamoDB model with a variable number of scan threads. Makes use of
* parallel scanning. But the Mapper version of parallel scanning splits the scanning for you. The benefit is that
* by splitting the scan into multiple segments you can have control over the number of reads per partition.
* @see <a href="http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ScanJavaDocumentAPI.html">Parallel Scanning</a>
*/
public UpdateInPlace(AmazonDynamoDB ddb, int totalSegments, String filterExpression, Class<T> model) {
scanner = new SegmentScanTask(totalSegments, filterExpression, model);
updater = new UpdateWorker();
mapper = new DynamoDBMapper(ddb);
}
public void start() {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(scanner);
executor.submit(updater);
executor.shutdown();
}
public int getItemsLeftToUpdateSize() {
return queue.size();
}
public long getItemsScannedCount() {
return itemsScanned.get();
}
private class SegmentScanTask implements Runnable {
private int totalSegments;
private String filterExpression;
private Class<T> modelClass;
protected SegmentScanTask(int totalSegments, String filterExpression, Class<T> modelClass) {
this.totalSegments = totalSegments;
this.filterExpression = filterExpression;
this.modelClass = modelClass;
}
public void run() {
DynamoDBScanExpression scanExpression = new DynamoDBScanExpression();
if(!StringUtils.isEmpty(filterExpression)) {
scanExpression.setFilterExpression(filterExpression);
}
/*
* ITERATION_ONLY PaginationStrategy will not keep previous results in the returned list
* allowing previously scanned items to be eligible for GC. The default LAZY_LOADING strategy
* retains all items in the list.
*/
PaginatedParallelScanList<T> items = mapper.parallelScan(modelClass,
scanExpression,
totalSegments,
DynamoDBMapperConfig.PaginationLoadingStrategy.ITERATION_ONLY.config());
Iterator<T> itr = items.iterator();
T currentItem;
while (itr.hasNext()) {
try {
currentItem = itr.next();
if (currentItem != null) {
queue.put(currentItem);
itemsScanned.addAndGet(1);
}
} catch (InterruptedException | RuntimeException e) {
System.err.println(e);
}
}
}
}
private class UpdateWorker implements Runnable {
public void run() {
while(true) {
T item = null;
List<T> items = new ArrayList<>(10);
try {
/*
* If we receive 0 items from the queue poll for an item with a timeout.
* If we do not receive any items after timing out we assume the scanner is done.
*/
if(queue.drainTo(items, 10) == 0) {
item = queue.poll(10, TimeUnit.SECONDS);
if(item == null) {
//Found null item. End of queue.
break;
}
items.add(item);
}
mapper.batchSave(items);
} catch(InterruptedException | RuntimeException e) {
System.err.println(e);
break;
}
}
}
}
@DynamoDBTable(tableName = "table-ResourceAccessAudit")
protected static class Audit {
private String id;
private String resourceId;
private String resourceName;
private String action;
private String accessedBy;
private String timestamp;
@DynamoDBHashKey
public String getId() {return id;}
public void setId(String id) {this.id = id;}
@DynamoDBIndexHashKey(globalSecondaryIndexName = "GSI")
public String getResourceIdAndActionAndAccessedBy() {return String.join("-", this.resourceId, this.action, this.accessedBy);}
public String getResourceId() {return resourceId;}
public void setResourceId(String resourceId) {this.resourceId = resourceId;}
public String getResourceName() {return resourceName;}
public void setResourceName(String resourceName) {this.resourceName = resourceName;}
public String getAction() {return action;}
public void setAction(String action) {this.action = action;}
public String getAccessedBy() {return accessedBy;}
public void setAccessedBy(String accessedBy) {this.accessedBy = accessedBy;}
@DynamoDBIndexRangeKey(globalSecondaryIndexName = "GSI")
public String getTimestamp() {return timestamp;}
public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}
public static void main(String[] args) {
AmazonDynamoDB ddb = AmazonDynamoDBClientBuilder.standard()
.withCredentials(new PropertiesFileCredentialsProvider(CREDENTIALS_PROPERTIES))
.withRegion(REGION)
.build();
UpdateInPlace<Audit> updateInPlaceJob = new UpdateInPlace<>(ddb, 10, null, Audit.class);
updateInPlaceJob.start();
while(updateInPlaceJob.getItemsLeftToUpdateSize() > 0) {
System.out.print("Scanned: " + updateInPlaceJob.getItemsScannedCount() + "\t\t UpdateQueueSize: " + updateInPlaceJob.getItemsLeftToUpdateSize() + "\t\t\r");
}
System.out.println("Done.");
}
}
Summary
We started with a DynamoDB table that had the correct keys selected for our initial use case. But a new use case was added in which we needed to query data in a way that was inefficient for our current schema.
We considered using filter expressions, but we decided against it due to the additional consumed ReadUnits
that would have been consumed for values that matched but were ultimately filtered out. Instead, we decided to introduce a new GSI that fit our use case better.
In designing the schema for the new GSI, we decided to make the key contain a concatenation of the required values in our new read pattern. This enabled us to get exact matches in our query without any filtering and provided a more efficient query. In doing so, we needed to rewrite the data for the GSI’s new keys that we introduced because an empty key excludes an item from a GSI.
After creating and running a script to retrofit the existing data into the new attribute, we can query for the exact item that we’re requesting, and we have some flexibility with the range key without needing to scan a table or filter results.
About the Author
Scott Todd is a software development engineer at Amazon.