AWS Database Blog

DynamoDB Streams Use Cases and Design Patterns

Gowri Balasubramanian is a solutions architect at Amazon Web Services.

This post describes some common use cases you might encounter, along with their design options and solutions, when migrating data from relational data stores to Amazon DynamoDB.

We will consider how to manage the following scenarios:

  • How do you set up a relationship across multiple tables in which, based on the value of an item from one table, you update the item in a second table?
  • How do you trigger an event based on a particular transaction?
  • How do you audit or archive transactions?
  • How do you replicate data across multiple tables (similar to that of materialized views/streams/replication in relational data stores)?

Relational databases provide native support for transactions, triggers, auditing, and replication. Typically, a transaction in a database refers to performing create, read, update, and delete (CRUD) operations against multiple tables in a block. A transaction can have only two states—success or failure. In other words, there is no partial completion.

As a NoSQL database, DynamoDB is not designed to support transactions. Although client-side libraries are available to mimic the transaction capabilities, they are not scalable and cost-effective. For example, the Java Transaction Library for DynamoDB creates 7N+4 additional writes for every write operation. This is partly because the library holds metadata to manage the transactions to ensure that it’s consistent and can be rolled back before commit.

You can use DynamoDB Streams to address all these use cases. DynamoDB Streams is a powerful service that you can combine with other AWS services to solve many similar problems. When enabled, DynamoDB Streams captures a time-ordered sequence of item-level modifications in a DynamoDB table and durably stores the information for up to 24 hours. Applications can access a series of stream records, which contain an item change, from a DynamoDB stream in near real time.

AWS maintains separate endpoints for DynamoDB and DynamoDB Streams. To work with database tables and indexes, your application must access a DynamoDB endpoint. To read and process DynamoDB Streams records, your application must access a DynamoDB Streams endpoint in the same Region.

Figure 1: Accessing DynamoDB and DynamoDB Streams

DynamoDB Streams supports the following stream record views:

  • KEYS_ONLY—Only the key attributes of the modified item
  • NEW_IMAGE—The entire item, as it appears after it was modified
  • OLD_IMAGE—The entire item, as it appears before it was modified
  • NEW_AND_OLD_IMAGES—Both the new and the old images of the item

You can process DynamoDB streams in multiple ways. The most common approaches use AWS Lambda or a standalone application that uses the Kinesis Client Library (KCL) with the DynamoDB Streams Kinesis Adapter. The KCL is a client-side library that provides an interface to process DynamoDB stream changes. It is modified by the DynamoDB Streams Kinesis Adapter to understand the unique record views returned by the DynamoDB Streams service. AWS Lambda executes your code based on a DynamoDB Streams event (insert/update/delete an item). In this approach, AWS Lambda polls the DynamoDB stream and, when it detects a new record, invokes your Lambda function and passes in one or more events.

DynamoDB Streams design patterns

The following figure shows a reference architecture for different use cases using DynamoDB Streams and other AWS services.

Figure 2: DynamoDB Streams design pattern reference architecture

Let’s consider a sample use case of storing and retrieving invoice transactions from a DynamoDB table named InvoiceTransactions. A single invoice can contain thousands of transactions per client. InvoiceNumber is the partition key, and TransactionIdentifier is the sort key to support uniqueness as well as provide query capabilities using InvoiceNumber. The following table shows the schema design.

Table Name: InvoiceTransactions

Partition Key Sort Key Attribute1 Attribute2 Attribute3 Attribute4
InvoiceNumber TransactionIdentifier Amount Trans_country Invoice_dt InvoiceDoc
1212121 Client1_trans1xxxx $100 USA 06062016 {JSON Doc1}
1212121 Client1_trans2xxxx $500 USA 06062016 {JSON Doc2}
1212122 Client2_trans1xxx $200 UK 06062016 {JSON Doc3}
1212121 Client2_trans1xxxx $500 China 06062016 {JSON Doc4}

Now, assume that you insert the following new item.

1212123 Client3_trans1xxx $1000 USA

After the item is inserted, the DynamoDB stream has the following entry.

View Type Values
New image TransactionIdentifier= Client3_trans1xxx,InvoiceNumber=1212123,Amount-$1000,Trans_country=USA
Keys only InvoiceNumber=1212123, TransactionIdentifier= Client3_trans1xxx,

Now, let’s assume that, due to the nature of this use case, the application requires auditing, searching, archiving, notifications, and aggregation capabilities whenever a change happens in the InvoiceTransactions table. Let’s examine how you can process the stream data to address different types of use cases.

Implementing transactional capabilities with multiple tables
The best way to achieve transactional capabilities with DynamoDB is to use conditional update expressions with multiple tables and perform various actions based on the stream data. Note that the changes can be applied only in an eventually consistent manner. The following are a few examples.

  • Aggregation: Consider a scenario in which you need to maintain the total invoice amount in dollars for the given invoice for reporting purposes. This should be dynamically updated when new items are inserted into the InvoiceTransaction table and kept up to date. To capture the total, create a new table with the following layout.

Table Name: InvoiceTotal

Partition Key Number Attribute String Attribute
InvoiceNumber Total Update_date
1212121 $1100 06062016
1212122 $200 06072016

Whenever there is a change in the InvoiceTransactions table, you update the total. Let’s try to do that using an update expression like the following:

TableName": "InvoiceTotal",
"Key": {
"InvoiceNumber": 1212121 }
UpdateExpression = "ADD Total :Amount SET update_date = :date"

The :Amount value can be read from the DynamoDB update stream whenever a new item is added to the InvoiceTransaction table, and :date can be the current date. The ADD token is the command token. For a numeric attribute, it adds the specified value to the attribute. SET is another command token. It means that all the attributes that follow will have their values set.

  • Conditional aggregation: Consider a scenario in which you need to record the number of transactions per location for those invoices where an owner is assigned for verification. In this case, you create a new table named InvoiceAction with the following layout.
    Table Name: InvoiceAction

    InvoiceNumber

    (Partition Key)

    Trans_country

    (Sort Key)

    Verify_action Total_trans
    1212121 USA admin@com 2
    1212121 UK admin@co.uk 1
    1212121 China

    Whenever there is a new transaction in the InvoiceTransactions table, you update the total using an update expression with a conditional write operation like the following:

    TableName": "InvoiceAction",
    "Key": {
    "InvoiceNumber": 1212121, Trans_country=”USA” }
    UpdateExpression = "SET Total_Trans = Total_trans+1"
    ConditionExpression = “attribute_exists(verify_action)”

    This operation fails with ConditionalCheckFailedException for those countries where there is no owner assigned—for example, China in this scenario.

  • Replication: In relational databases such as Oracle, there is a concept of materialized views that is typically used to replicate data from a master table to child tables for query purposes (query offload/data replication). Likewise, using DynamoDB streams, you can replicate the InvoiceTransactions table to different Regions for low-latency reporting and disaster recovery. You can use the DynamoDB Cross-Region Replication Library to replicate the data to other AWS Regions in near real time. For more information, see Cross-Region Replication and this re:Invent video.

Archiving/auditing
Use case: Suppose that there is a business requirement to store all the invoice transactions for up to 7 years for compliance or audit requirements. Also, the users should be able to run ad hoc queries on this data.

Solution: DynamoDB is ideal for storing real-time (hot) data that is frequently accessed. After a while, depending on a use case, the data isn’t hot any more, and it’s typically archived in storage systems like Amazon S3. You can design a solution for this using Amazon Kinesis Firehose and S3. Kinesis Firehose is a managed service that you can use to load the stream data into Amazon S3, Amazon Redshift, or Amazon Elasticsearch Service through simple API calls. It can also batch, compress, and encrypt the data before loading it, which minimizes the amount of storage used at the destination and increases security.

The following describes the high-level solution.

  • Use Amazon Kinesis Firehose. Create a delivery stream, such as S3, for storing the stream data from DynamoDB. By default, Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH before putting objects to Amazon S3. You can modify this folder structure by adding your top-level folder with a forward slash (for example, Invoice/YYYY/MM/DD/HH to store the invoice transactions).
  • Use Lambda or a KCL application to read the DynamoDB stream, and write the data using Kinesis Firehose by calling the PutRecord or PutRecordBatch If you use Lambda, there is ready-to-use sample code from GitHub to copy stream records to Kinesis Firehose.
  • Amazon Kinesis Firehose batches the data and stores it in S3 based on either buffer size (1–128 MB) or buffer interval (60–900 seconds). The criterion that is met first triggers the data delivery to Amazon S3.
  • You can use Amazon Athena for ad hoc querying of the data for audit/compliance purposes.
  • Implement S3 lifecycle policies to move the older data to S3 – IA (for infrequent access) or Amazon Glacier to further reduce the cost.
  • You can also use DynamoDB Time to Live (TTL), which simplifies archiving by automatically deleting items based on the time stamp attribute. For example, you can designate Invoice_dt as a TTL attribute by storing the value in epoch format. For an implementation example, see the blog post Automatically Archive Items to S3 Using DynamoDB TTL with AWS Lambda and Amazon Kinesis Firehose.

Reporting
Use case:  How can you run real-time fast lookup against DynamoDB?

Solution: Design the DynamoDB table schema based on the reporting requirements and access patterns. For example, if you need to do real-time reporting of invoice transactions, you can access invoice or transaction data from the DynamoDB table directly by using the Query or GetItem API calls. Design your schema with an appropriate hash key (or hash sort key) for query purposes. Additionally, you can create LSIs and GSIs to support queries using different attributes against the table.

Example:  The following queries are candidates for real-time dashboards. In this example, the table invoiceTotal contains the attributes total, update_date, etc., and is partitioned on invoice_number. The invoiceTransactions table contains InvoiceNumber and TransactionIdentifier. It is partitioned on both the attributes, using InvoiceNumber as the partition key and Transaction_Identifier as the sort key (composite primary key). For your real-time reports, you have the following requirements:

  1. Report all the transactions for a given InvoiceNumber.Solution: Invoke the Query API using InvoiceNumber as the key.
  2. Report all transactions for a given InvoiceNumber where TransactionIdentifier begins with client1_.xxx.
    Solution: Invoke the Query API using InvoiceNumber as the partition key and a key condition expression with TransactionIdentifier.
  3. Report the total by InvoiceNumber.
    Solution: Invoke the GetItem API with InvoiceNumber as the key.

Use case: How do you run analytical queries against data that is stored in DynamoDB?

Solution:  You don’t. DynamoDB is not suitable for running scan operations or fetching a large volume of data because it’s designed for fast lookup using partition keys. Additionally, there are a number of constraints (lack of support for powerful SQL functions such as group by, having, intersect, and joins) in running complex queries against DynamoDB. So, to run analytical queries against data that is stored in DynamoDB, you have to export the data from DynamoDB to a more suitable data store—such as Amazon Redshift.

The following summarizes the solution:

  • Amazon Redshift is a managed data warehouse solution that provides out-of-the-box support for running complex analytical queries.
  • Use Lambda or a KCL application to read the DynamoDB stream. Write the data using Kinesis Firehose by calling PutRecord or PutRecordBatch APIs to Amazon Redshift.
  • Kinesis Firehose uses an intermediate S3 bucket and the COPY command for uploading the data into Amazon Redshift.
  • Use Amazon QuickSight or standard BI tools to run queries against Amazon Redshift.
  • For information about implementing a data pipeline using Kinesis Firehose, Amazon Redshift, and Amazon QuickSight, see the blog post Amazon Kinesis – Setting up a Streaming Data Pipeline.

Alternate options

Example: Queries like the following can be best served from Amazon Redshift.

  • Get the daily run rate from the invoiceTransactions table for the last 3 years.
  • Select max(invoice_amount), sum(total_amount), and count(invoice_trans) from the Invoice Transactions group_by YYYYHHMMDD.

Notifications/messaging
Use case: Assume a scenario in which you have the InvoiceTransactions table, and if there is a zero value inserted or updated in the invoice amount attribute, the concerned team must be immediately notified to take action.

Solution: Build a solution using DynamoDB Streams, AWS Lambda, and Amazon SNS to handle such scenarios.

The following summarizes the solution:

  • Define SNS topic and subscribers (Email or SMS).
  • Use Lambda to read the DynamoDB stream and check whether the invoice amount is zero. Then, publish a message to the SNS topic, for example: “Take immediate action for Invoice number 1212121 as zero value is reported in the InvoiceTransactions table as on YYMMHH24MISS.”
  • Subscribers receive notifications in near real-time fashion and can take appropriate action.

For more information about this implementation, see the blog post Building NoSQL Database Triggers with Amazon DynamoDB and AWS Lambda.

Use case: Assume a scenario in which if there is a new entry for an invoice, the data must be sent to a downstream payment-processing system.

Solution: You can build a solution using DynamoDB Streams, AWS Lambda, Amazon SNS, and Amazon SQS to handle such scenarios. Let’s assume that the downstream payment system expects an SQS message to trigger a payment workflow.

The following summarizes the solution:

  • Define an Amazon SNS topic with Amazon SQS as a subscriber. For details, see the Amazon SNS documentation.
  • Use Lambda to read the DynamoDB stream and check whether there is a new invoice transaction, and send an Amazon SNS message.
  • The SNS message delivers the message to the SQS queue.
  • As soon as the message arrives, the downstream application can poll the SQS queue and trigger a processing action.

Search
Use case: How do you perform free text searches in DynamoDB? For example, assume that the InvoiceTransactions table contains an attribute InvoiceDoc as a Map data type to store the JSON document as described in the following table. How do you filter the particular client transaction or query the data (quantity for printers/desktops, vendor names like %1%, etc.) within the attribute stored as a document in DynamoDB?

Partition Key Sort Key Attribute4
InvoiceNumber TransactionIdentifier InvoiceDoc
1212121 Client1_trans1xxxx
Partition Key	Sort Key	Attribute4
InvoiceNumber	TransactionIdentifier 	InvoiceDoc
1212121	Client1_trans1xxxx	{ 
  "Vendor Name": "Vendor1 Inc",   "NumberofItems": "5",
  "ItemsDesc": [     
{  "laptops": {         "Quantity": 2 },
   "desktops": {         "Quantity": 1      },
   "printer": {         "Quantity": 2      }     }
  ] }

 

Solution: DynamoDB is not suitable for free text search against large volumes of data. We recommend using Amazon Elasticsearch Service (Amazon ES) to address such requirements.

The following describes the solution:

  • In this design, the DynamoDB InvoiceTransactions table is used as the primary data store. An Amazon ES cluster is used to serve all types of searches by indexing the InvoiceTransactions table.
  • Using DynamoDB streams, any update/delete or new item on the main table is captured and processed using AWS Lambda. Lambda makes appropriate calls to Amazon ES for indexing the data in near real time.
  • For more details about this architecture, see the blog post Indexing Amazon DynamoDB Content with Amazon Elasticsearch Service Using AWS Lambda.

Elasticsearch also supports all kinds of free-text queries, including ranking and aggregation of results. Another advantage of this approach is extensibility. Elasticsearch Query can be easily modified to add new filters, and Amazon ES does it out of the box. So, for example, if you add a new attribute in DynamoDB, it’s automatically available for querying in Amazon ES.

Best practices for working with DynamoDB Streams
Keep in mind the following best practices when you are designing solutions that use DynamoDB Streams:

  • DynamoDB Streams enables you to build solutions using near real-time synchronization of data. It doesn’t enforce consistency or transactional capability across many tables. This must be handled at the application level. Also, be aware of the latency involved (sub second) in the processing of stream data as data is propagated into the stream. This helps you define the SLA regarding data availability for your downstream applications and end users.
  • All item-level changes will be in the stream, including deletes. Your application should be able to handle deletes, updates, and creations.
  • Design your stream-processing layer to handle different types of failures. Make sure that you store the stream data in a dead letter queue such as SQS or S3, for later processing in the event of a failure.
  • Failures can occur in the application that reads the events from the stream. You can design the application to minimize the risk and blast radius. To that end, try not to update too many tables with the same code. In addition, you can design your tables so that you update multiple attributes of a single item (instead of five different items, for example). You can also define your processing to be idempotent, which can allow you to retry safely. You should also catch different exceptions in your code and decide if you want to retry or ignore these records and put them in a DLQ for further analysis.
  • Be aware of the following constraints while you are designing consumer applications:
  • Data retention in streams is 24 hours.
  • No more than two processes should be reading from a stream shard at the same time.
  • We recommend that you consider Lambda for stream processing whenever possible because it is serverless and therefore easier to manage. First, evaluate if Lambda can be used. If it can’t be, then use the Kinesis Client Library (KCL). The following comparison table can help you decide.
Parameters AWS Lambda Kinesis Client Library
Deployment Lambda polls the DynamoDB stream and invokes your function/code as soon as it detects the new record. You write your custom application using KCL with DynamoDB Streams Kinesis Adapter and host it in an EC2 instance.
Manageability Lambda automatically scales based on the throughput. You must manage the shards, monitoring, scaling, and checkpointing process in line with KCL best practices. (For details, see this design blog post and this monitoring blog post.)
How it works For every DynamoDB partition, there is a corresponding shard and a Lambda function poll for events in the stream (shard). Based on the batch size you specify, it fetches the records, processes it, and then fetches the next batch.
  • Enumerates the shards within the stream.
  • Coordinates shard associations with other workers (if any).
  • Instantiates a record processor for every shard it manages.
  • Pulls records from the stream.
  • Pushes the records to the corresponding record processor.
  • Checkpoints processed records.
Execution time Lambda Maximum execution duration per request is 300 seconds. There are no restrictions.
Availability Lambda is a managed service and is fully available. There are no maintenance windows or scheduled downtimes required. The application must be hosted in an EC2 Auto Scaling group for High Availability.
Other considerations Note the impact of the Lambda payload limit on using Lambda to consume stream messages. Any of the _IMAGE types can exceed it, especially for larger items. Therefore you should specify the batch size accordingly.

Summary
DynamoDB Streams is a powerful service that you can combine with other AWS services to create practical solutions for migrating from relational data stores to DynamoDB. This post outlined some common use cases and solutions, along with some best practices that you should follow when working with DynamoDB Streams.

If you have questions or suggestions, please comment below.