AWS Big Data Blog

Processing Amazon DynamoDB Streams Using the Amazon Kinesis Client Library

Asmita Barve-Karandikar is an SDE with DynamoDB

Customers often want to process streams on an Amazon DynamoDB table with a significant number of partitions or with a high throughput. AWS Lambda and the DynamoDB Streams Kinesis Adapter are two ways to consume DynamoDB streams in a scalable way.

While Lambda lets you run your application without having to manage infrastructure, using the DynamoDB Streams Kinesis Adapter gives you more control over the behavior of your application–mainly, the state of stream-processing. And if your application requires more sophisticated record processing, such as buffering or aggregating records based on some criterion, using the adapter might be preferable.

The Amazon Kinesis Client Library (KCL) provides useful abstractions over the low-level Amazon Kinesis Streams API. The adapter implements the Amazon Kinesis Streams interface so that KCL can consume DynamoDB streams. By managing tasks like load balancing and checkpointing for you, the KCL lets you focus on writing code for processing your stream records. However, it is important to understand the configurable properties that you can tune to best fit your use case.

In this post, I demystify the KCL by explaining some of its important configurable properties and estimate its resource consumption. The KCL version used is amazon-kinesis-client 1.6.2.

KCL overview

To begin, your application needs to create one or more KCL workers that pull data from DynamoDB streams for your table. Apart from reading stream data, the KCL tracks worker state and progress in a DynamoDB table that I will call the “leases table” (the name and provisioned throughput of the table are configurable).

The following diagram shows a single KCL worker in action. The DynamoDB Streams Kinesis Adapter, which sits between DynamoDB Streams and the KCL, is not shown for the sake of simplicity.

In the figure above, the KCL worker consumes shards in a DynamoDB stream and stores the state of processing in the leases table. The properties configured on the worker the various coordination tasks shown above such as syncing leases and checkpointing.

Designing your application

When you develop applications for processing DynamoDB streams, first estimate the amount of data that you need to handle. The number of partitions in your DynamoDB table and the write throughput per partition determine the workload on the KCL, while the configurations on KCL workers determine their ability to deal with this workload efficiently.

Calculate the number of partitions in your DynamoDB table

You can estimate the number of partitions in your DynamoDB table based on your provisioned throughput and the storage size of your data, as detailed in the Understand Partition Behavior topic. This is roughly equal to the number of active shards (shards which are receiving data) in your stream, and determines the number of “leases” that the KCL worker is actively processing.

Calculate throughput per partition

The following formula is the approximate rate of incoming data per shard that your KCL application needs to handle, assuming even access patterns across hash keys on your DynamoDB table:

Total Provisioned Throughput / Partitions = Throughput Per Partition

Configure KCL worker properties

The configurations of a KCL worker determine its ability to process records at par with DynamoDB streams. In general, be aware of the impact of the properties listed in this section to avoid any possibility of losing data (DynamoDB stream records persist only for 24 hours). It may also be necessary to tune (change the defaults of) a number of these configurations if one or both of the following apply to you:

  • Your table has more than 50 partitions
  • The throughput per partition is higher than 100 tps

FailoverTimeMillis: Duration of a lease for a worker in milliseconds

A KCL worker periodically renews its leases to indicate that it is healthy and progressing as expected. A worker is considered as having problems (“ZOMBIE”) if it hasn’t renewed its lease in this time interval, and its leases are reassigned to other workers. This parameter also determines the frequency with which a worker looks for new leases to work on.

Let’s call these two tasks “Lease Renewer” and “Lease Taker”, respectively. The following chart approximates the number of runs of these tasks in a 5-min period depending on the value of FailoverTimeMillis. The figure below shows the impact of the FailoverTimeMillis property.

As you can see in graph above:

  • The lower the value, the more “Lease Renewer” and “Lease Taker” tasks completed per unit of time. As shown in the first figure in this post, each “Lease Renewer” operation makes a write to the DynamoDB leases table, while each “Lease Taker” operation makes a scan of the table. Thus, the lower the value of FailoverTimeMillis, the higher the consumption on the DynamoDB table.
  • A low value of FailoverTimeMillis also indicates less time available for the “Lease Renewer” operation, which may be insufficient for streams with a large number of shards.

Getting throttled on the DynamoDB leases table or workers losing leases constantly are indicators that you need to tune up the value of FailoverTimeMillis. More details about monitoring the state will be discussed in an upcoming post.

ShardSyncIntervalMillis:  Interval between shard sync tasks

A worker periodically syncs up shards and leases by creating leases for new shards and deleting leases for shards that have completed processing. The frequency of syncing is determined by this parameter. Shards in a DynamoDB stream that are closed for writes can be considered as completed after all the records from these shards are read, and may be removed from the leases table.

A “ShardSync” operation may make both reads and writes on the DynamoDB table. Lowering this value increases the consumption on the leases table. Keeping the default value is recommended for this parameter.

InitialLeaseTableReadCapacityProvisioned read capacity for DynamoDB leases table

A scan request is made to the DynamoDB leases table each time the worker does one of the following:

  • Syncs leases with shards
  • Looks for new leases to work on

The read capacity required on the DynamoDB leases table is proportional to the number of leases (or number of shards) and how frequently “Lease Taker” runs, as well as how frequently the shard syncing happens. In most cases, you may need to increase this value as the default of 10 may be too low and can result in throttling.

InitialLeaseTableWriteCapacityProvisioned write capacity for the DynamoDB leases table

A write request goes to the DynamoDB leases table every time the worker does one of the following:

  • Checkpoints its shards
  • Renews its leases (to indicate its aliveness)
  • Syncs leases with shards and finds new leases to be added or completed leases to be deleted

The write capacity required is proportional to the number of leases (number of shards), how frequently “Lease Renewer” runs, and how frequently your application checkpoints. In most cases, you may need to increase this value from the default of 10 to avoid throttling.

MaxRecords:  Number of records to fetch from a DynamoDB stream in a single getRecords call

This property determines how many records you have to process per shard in memory at a time. The DynamoDB Streams Kinesis Adapter has an internal limit of 1000 for the maximum number of records you can get at a time from a shard. Setting this value too low might prevent the application from keeping up with the streams throughput. I recommend keeping this value at 1000.

Property summary

Estimating resource consumption

Estimating the CPU and memory footprint of your application helps you determine the types of machines to provision and how many.

CPU

A worker spins up one thread per shard in its list of assignments. The thread may be initializing the shard, processing it, shutting down on shard completion, or blocking on the parent shard to complete.

Number of threads spun up = Number of shards assigned + T

In the formula above, T is the number of additional threads required for tasks such as shard syncing, lease taking, and renewing.

If your application is processing at par with streams, the number of shards assigned is equal to the number of active shards in the stream and the number of partitions in your table. If the application is running behind, the list of assigned shards includes active shards and the parent shards still being processed. The children of these shards are in the “blocking on parent” state.

By default, KCL uses a cached threadpool for all these threads, meaning that there are no limits on the number of threads it can spin up. You can initialize the worker by supplying a bounded threadpool as a parameter, but you might impact the lease coordination tasks if the KCL were to run out of threads. A better way of bounding threads per worker is to use the parameter “MaxLeasesForWorker” which I’ll discuss in detail in an upcoming post.

Memory

  • A major portion of memory is consumed by the stream records being processed:

Number of in-memory records per shard * Size of each stream record * Number of shards being processed

In the above formula, “Number of shards being processed” is roughly equal to the number of partitions in your table. If your application is up to speed with streams, it is likely processing all the active shards; if it is running behind, it may be processing a combination of parent shards and active shards.

  • If you are using the “getData” method of the “RecordAdapter” class of the DynamoDB Streams Kinesis Adapter to get the corresponding byte array for a record, multiply the above by 2, because every record in memory is also represented as a byte buffer.
  • Other sources of consumption include shard metadata and leases information.

Conclusion

In this post, I described the workings of the KCL and the performance impact of some important configurations on your application processing DynamoDB streams. In upcoming posts, I plan to cover the configurations relevant to the multi-worker scenario and go into details of monitoring the state of your KCL application with the help of application logs and CloudWatch metrics.

If you have any questions or suggestions, please leave a comment below.

Updated: To learn more about DynamoDB Streams, see AWS SDE Daniela Miao discuss DynamoDB Streams and Cross-region Replication in this recent AWS Seattle Big Data Meetup recording.

——————————-

Related

Building a Graph Database on AWS Using Amazon DynamoDB and Titan

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.