AWS Database Blog
Filter, transform, and load your DynamoDB table exports using AWS Glue
In this post, we show how you can load (import) an Amazon DynamoDB full or incremental table export into a second DynamoDB table with precise control over what gets loaded, at what write rate, and with the ability to observe the progress. This technique helps drive large-scale data migrations and synchronizations where you want maximum control. We provide the mechanism described here as a command in the open source Bulk Executor for DynamoDB, which is a set of utilities to execute bulk commands against DynamoDB tables. Using the command, you can load DynamoDB exports with control, transforming items, filtering data, and customizing what lands in your destination table using the command’s optional transform parameter.
Background
DynamoDB supports both full and incremental table exports. Both types of exports are service-driven, don’t consume read capacity, and can be initiated through the AWS Management Console, command line, or SDK. Performing an export requires you to enable Point in Time Recovery (PITR) on the table.
A full export contains all items in a table at a specified point in time. The export process sends the data as a set of objects to Amazon Simple Storage Service (Amazon S3). The files follow either the DDB-JSON format or Ion format with one item per line. The incremental export format is slightly different. It contains only the items modified during a specified time period and includes each item’s primary key, last modification time within the specified period, (optionally) an early image of what the item was at the start of the period, and a new image of what the item was at the end of the period. Having a view of both the early and new images of an item helps you to see how that item changed during the exported period of time.
As a native feature, DynamoDB supports loading a full export into a new table which is created during the load process. AWS prices this feature per GB of uncompressed data processed during the load. The technique described in this post supplements this feature with load rate control, custom item manipulation, filtered item loading, and progress tracking for existing tables.
DynamoDB currently has no native feature to load data based on an incremental export. With the technique described here, you can load these files with the same flexibility as full exports.
Usage
The Bulk Executor supports loading full and incremental exports using the “load-export” command. The installation instructions explain how to set up the tool. Then after setup, usage is as follows:
Replace <target> with the name of the destination table. This table must exist already. It can be empty or have data. The load will overwrite existing items where keys match.
The s3-path parameter specifies the full path to the Amazon S3 location containing your exported data, for example s3://<bucket-name>/prod/AWSDynamoDB/01716790307109-5f9d6aaa. Exported data can be a full or incremental export. Data formats are detected and handled automatically.
The optional transform parameter accepts the name of a Python file containing user-defined logic for manipulating each item before it is loaded. As the load processes each item, it calls the Python file’s transform_full_record(FullExportRecord) or transform_incremental_record(IncrementalExportRecord)function depending on whether it’s a full or incremental load, respectively. Within this function, you can choose to return:
- The record: load (or delete) the item without adjustment
- An empty list: Suppress the load (or delete) of this item
- A modified record: load the item with an adjustment
- A list of multiple items: Fan out to write multiple items from one source record
The function is fully customizable, so you can manipulate any aspect of the upcoming action. Refer to the readme file in the repository for detailed information on bootstrapping in the transform module code.
An AWS Glue job performs the load with parallel executors to handle arbitrarily large data sets and with rate-limiting available via standard Bulk Executor rate-limiting parameters.
Use cases
The ability to load full and incremental exports supports a variety of use cases. In the following section, we outline two common use cases:
Isolated table synchronization
A common use case is loading a series of incremental exports to keep two tables synchronized in a detached and unidirectional manner (without using global tables and their active-active propagation). This can be useful to keep a staging or development table in line with production, keep a third party’s table copy in sync with your own, or keep a table copy in an isolated environment. It is usually more cost effective and time efficient to apply incremental exports than to create the second table entirely each time.
A previous post discussed how to drive continuous data retention.
Transforming data
Another common use case is using the transformation feature to filter or modify data as you move it from one table to another.
For example, you can use a filtering function to copy a portion of data from one table to another by performing a full export and then doing a transformation with attribute-driven filtering. You can evaluate each item’s attributes during the load to determine whether that item should propagate into the second table. You might use this if you’re a software as a service (SaaS) company, have a table with multiple tenants and want to break one tenant out. Do this by driving a load of only that tenant’s data. Or, if you want a new copy of a table holding only recent data, you can execute a load with a transform that only allows items with a recent timestamp.
Another use is as a modifier. You can load but suppress Personally Identifiable Information (PII) like SSN and DOB attributes as they’re loaded into the downstream table. You can also adjust the timestamp attribute from an ISO 8601 string to a numeric epoch format. With a fan-out function you can split a list of values from a single item into individual items for individual values. You can even adjust your key schema.
You can also choose to use the transform function to migrate to a new key schema. For example, you might have the exported data contain last_name and first_name as the partition key (PK) and sort key (SK) respectively. The destination table has a new field called user_id as the PK. With the transform function, you can successfully inject a GUID as user_id into each item while it’s being loaded and maintaining other existing attributes, so you can effectively migrate to a new key schema.
Technical internals
The load-export command uses the Bulk Executor framework to drive execution. The Bulk Executor supports pluggable commands, so we wrote the load-export command initially as a custom command, then contributed it to the official repository.
Validation
Before the load process starts, the code fully validates the export based on its manifest to verify all files exist and their signatures match the manifest. This protects against any accidental corruption someone might have introduced to the export data.
Reading
The load process uses AWS Glue to load the Amazon S3 objects into a Spark Resilient Distributed Dataset (RDD) and detects if it’s a full or incremental export.
Transformation
The transform capability exposes two key functions. One for transforming records during a full load and another for incremental loads. The source Amazon S3 record currently being processed is passed as a parameter into this function. This record exposes all its internal data through a dictionary giving you full control over the load process. See the following example, demonstrating how to only include items that have a status attribute with the value active.
The transform code is entirely under your control. Usually for efficiency, you will have it perform a straightforward algorithmic check, but you can also query external sources while making your decision, for example, querying a CRM or another database, if you can handle the latency.
Parallelization and rate-limiting
The command then spreads the data across AWS Glue workers to write to DynamoDB in parallel. Each AWS Glue slot operates as an independent writer. To prevent this mass parallelism from possibly overwhelming the table’s throughput capacity, we use the Bulk Executor’s distributed rate-limiting feature, so you can use a configurable read or write capacity.
Verification
After the load completes, you can verify the integrity of the data by comparing the two tables using the bulk diff tool capability. If both tables match, you should see an output as such:
Cost considerations
The two main drivers of cost during a load are:
- DynamoDB table write consumption: Based on the number and size of items written .
- AWS Glue Data Processing Units (DPU): Based on the number of workers, type of workers, and duration of execution during the AWS Glue job.
The following execution shows a test run of a full load against a full export holding 100 million records. We used the default worker type (G.1X) and the default AWS Glue worker count (auto-scaling up to 220). To increase load speed, we created an on-demand table pre-warmed at 240,000 write units per second. We told the bulk tool to use all 240,000. Total execution time was 12 min 27 seconds.
The run estimates the DynamoDB costs early based on the size of the load, here at $69.50 for approximately 100,000,000 writes of small items. The output shows the AWS Glue cost at the end. With pricing at $0.44 per DPU hour in us-east-1, it’s about $10.
Limitations
The load-export command supports exports using the DDB-JSON format only, and loads will overwrite existing items with the same primary key. You can modify the code if you want alternate behavior.
Conclusion
In this post, we outlined how to bulk load DynamoDB full and incremental exports using the Bulk Executor. You can choose exactly which data to load and what shape by using optional transformation capabilities, while built-in rate-limiting prevents write capacity saturation. Get started by cloning the Bulk Executor repository and running your first load today.