AWS Database Blog

Filter, transform, and load your DynamoDB table exports using AWS Glue

In this post, we show how you can load (import) an Amazon DynamoDB full or incremental table export into a second DynamoDB table with precise control over what gets loaded, at what write rate, and with the ability to observe the progress. This technique helps drive large-scale data migrations and synchronizations where you want maximum control. We provide the mechanism described here as a command in the open source Bulk Executor for DynamoDB, which is a set of utilities to execute bulk commands against DynamoDB tables. Using the command, you can load DynamoDB exports with control, transforming items, filtering data, and customizing what lands in your destination table using the command’s optional transform parameter.

Background

DynamoDB supports both full and incremental table exports. Both types of exports are service-driven, don’t consume read capacity, and can be initiated through the AWS Management Console, command line, or SDK. Performing an export requires you to enable Point in Time Recovery (PITR) on the table.

A full export contains all items in a table at a specified point in time. The export process sends the data as a set of objects to Amazon Simple Storage Service (Amazon S3). The files follow either the DDB-JSON format or Ion format with one item per line. The incremental export format is slightly different. It contains only the items modified during a specified time period and includes each item’s primary key, last modification time within the specified period, (optionally) an early image of what the item was at the start of the period, and a new image of what the item was at the end of the period. Having a view of both the early and new images of an item helps you to see how that item changed during the exported period of time.

As a native feature, DynamoDB supports loading a full export into a new table which is created during the load process. AWS prices this feature per GB of uncompressed data processed during the load. The technique described in this post supplements this feature with load rate control, custom item manipulation, filtered item loading, and progress tracking for existing tables.

DynamoDB currently has no native feature to load data based on an incremental export. With the technique described here, you can load these files with the same flexibility as full exports.

Usage

The Bulk Executor supports loading full and incremental exports using the “load-export” command. The installation instructions explain how to set up the tool. Then after setup, usage is as follows:

./bulk load-export --table <target> --s3-path <s3://bucket/path/to/data> [--transform <transform_module>]

Replace <target> with the name of the destination table. This table must exist already. It can be empty or have data. The load will overwrite existing items where keys match.

The s3-path parameter specifies the full path to the Amazon S3 location containing your exported data, for example s3://<bucket-name>/prod/AWSDynamoDB/01716790307109-5f9d6aaa. Exported data can be a full or incremental export. Data formats are detected and handled automatically.

The optional transform parameter accepts the name of a Python file containing user-defined logic for manipulating each item before it is loaded. As the load processes each item, it calls the Python file’s transform_full_record(FullExportRecord) or transform_incremental_record(IncrementalExportRecord)function depending on whether it’s a full or incremental load, respectively. Within this function, you can choose to return:

  1. The record: load (or delete) the item without adjustment
  2. An empty list: Suppress the load (or delete) of this item
  3. A modified record: load the item with an adjustment
  4. A list of multiple items: Fan out to write multiple items from one source record

The function is fully customizable, so you can manipulate any aspect of the upcoming action. Refer to the readme file in the repository for detailed information on bootstrapping in the transform module code.

An AWS Glue job performs the load with parallel executors to handle arbitrarily large data sets and with rate-limiting available via standard Bulk Executor rate-limiting parameters.

Use cases

The ability to load full and incremental exports supports a variety of use cases. In the following section, we outline two common use cases:

Isolated table synchronization

A common use case is loading a series of incremental exports to keep two tables synchronized in a detached and unidirectional manner (without using global tables and their active-active propagation). This can be useful to keep a staging or development table in line with production, keep a third party’s table copy in sync with your own, or keep a table copy in an isolated environment. It is usually more cost effective and time efficient to apply incremental exports than to create the second table entirely each time.

A previous post discussed how to drive continuous data retention.

Transforming data

Another common use case is using the transformation feature to filter or modify data as you move it from one table to another.

For example, you can use a filtering function to copy a portion of data from one table to another by performing a full export and then doing a transformation with attribute-driven filtering. You can evaluate each item’s attributes during the load to determine whether that item should propagate into the second table. You might use this if you’re a software as a service (SaaS) company, have a table with multiple tenants and want to break one tenant out. Do this by driving a load of only that tenant’s data. Or, if you want a new copy of a table holding only recent data, you can execute a load with a transform that only allows items with a recent timestamp.

Another use is as a modifier. You can load but suppress Personally Identifiable Information (PII) like SSN and DOB attributes as they’re loaded into the downstream table. You can also adjust the timestamp attribute from an ISO 8601 string to a numeric epoch format. With a fan-out function you can split a list of values from a single item into individual items for individual values. You can even adjust your key schema.

You can also choose to use the transform function to migrate to a new key schema. For example, you might have the exported data contain last_name and first_name as the partition key (PK) and sort key (SK) respectively. The destination table has a new field called user_id as the PK. With the transform function, you can successfully inject a GUID as user_id into each item while it’s being loaded and maintaining other existing attributes, so you can effectively migrate to a new key schema.

Technical internals

The load-export command uses the Bulk Executor framework to drive execution. The Bulk Executor supports pluggable commands, so we wrote the load-export command initially as a custom command, then contributed it to the official repository.

Validation

Before the load process starts, the code fully validates the export based on its manifest to verify all files exist and their signatures match the manifest. This protects against any accidental corruption someone might have introduced to the export data.

Reading

The load process uses AWS Glue to load the Amazon S3 objects into a Spark Resilient Distributed Dataset (RDD) and detects if it’s a full or incremental export.

Transformation

The transform capability exposes two key functions. One for transforming records during a full load and another for incremental loads. The source Amazon S3 record currently being processed is passed as a parameter into this function. This record exposes all its internal data through a dictionary giving you full control over the load process. See the following example, demonstrating how to only include items that have a status attribute with the value active.

def transform_full_record(record: FullExportRecord) -> list[FullExportRecord]:
    """
    Example: Only load items where 'status' attribute is 'active'.

    Args:
        record: record.item is the deserialized Item dict,
                record.table_key_schema has key info.

    Returns:
        list[FullExportRecord]: Single-element list to keep, empty list to skip
    """
    if record.item.get("status") == "active":
        return [record]
    return []


def transform_incremental_record(record: IncrementalExportRecord) -> list[IncrementalExportRecord]:
    """
    Example: Only load items where 'status' attribute is 'active'.

    Behavior:
        - If new_image exists and status is 'active': load the item (PUT)
        - If new_image exists but status is not 'active': skip the item
        - If new_image is None (a delete): return the record, i.e. respect the delete

    Args:
        record: record.keys, record.new_image, record.old_image,
                record.table_key_schema, record.write_timestamp_micros

    Returns:
        list[IncrementalExportRecord]: Single-element list to keep, empty list to skip
    """
    if record.new_image:
        if record.new_image.get("status") == "active":
            return [record]
        else:
            return []
    return [record]

The transform code is entirely under your control. Usually for efficiency, you will have it perform a straightforward algorithmic check, but you can also query external sources while making your decision, for example, querying a CRM or another database, if you can handle the latency.

Parallelization and rate-limiting

The command then spreads the data across AWS Glue workers to write to DynamoDB in parallel. Each AWS Glue slot operates as an independent writer. To prevent this mass parallelism from possibly overwhelming the table’s throughput capacity, we use the Bulk Executor’s distributed rate-limiting feature, so you can use a configurable read or write capacity.

Verification

After the load completes, you can verify the integrity of the data by comparing the two tables using the bulk diff tool capability. If both tables match, you should see an output as such:

./bulk diff --table source --table2 destination
…
No differences found

Cost considerations

The two main drivers of cost during a load are:

  1. DynamoDB table write consumption: Based on the number and size of items written .
  2. AWS Glue Data Processing Units (DPU): Based on the number of workers, type of workers, and duration of execution during the AWS Glue job.

The following execution shows a test run of a full load against a full export holding 100 million records. We used the default worker type (G.1X) and the default AWS Glue worker count (auto-scaling up to 220). To increase load speed, we created an on-demand table pre-warmed at 240,000 write units per second. We told the bulk tool to use all 240,000. Total execution time was 12 min 27 seconds.

>> ./bulk load-export \
	--XMaxWriteRate 240000 \
	--table ... \
	--s3-path s3://... \
	--XTimeout 10080 \
	--XWaitForDPU
…
Destination Table: …
S3 export: 100,000,000 items across 512 files (FULL_EXPORT, DYNAMODB_JSON)
DynamoDB write costs depend on how many items are being written and the size of the items.
Here we estimate the command will write 100,000,000 items
 with average size 256 bytes;
 each write incurs an average of 1 write units
Write units required (approx): 100,000,000
This does not include costs for secondary indexes!
Approx DynamoDB cost for on-demand writes consuming 100,000,000 WRUs (using us-east-1 prices): $69.50
Writing items to DynamoDB...
===============================================================
JOB COMPLETED SUCCESSFULLY
  - Total items in export: 100,000,000
  - Total items written: 100,000,000
  - Execution time: 707.3 seconds
…
Waiting 40 seconds for DPU metrics to gather...
Job completed successfully. Job duration: 0:12:27 (24.06 DPU hours)

The run estimates the DynamoDB costs early based on the size of the load, here at $69.50 for approximately 100,000,000 writes of small items. The output shows the AWS Glue cost at the end. With pricing at $0.44 per DPU hour in us-east-1, it’s about $10.

Limitations

The load-export command supports exports using the DDB-JSON format only, and loads will overwrite existing items with the same primary key. You can modify the code if you want alternate behavior.

Conclusion

In this post, we outlined how to bulk load DynamoDB full and incremental exports using the Bulk Executor. You can choose exactly which data to load and what shape by using optional transformation capabilities, while built-in rate-limiting prevents write capacity saturation. Get started by cloning the Bulk Executor repository and running your first load today.


About the authors

Ruskin Dantra

Ruskin Dantra

Ruskin is a Solutions Architect based out of California. He is originally from the Land of the Long White Cloud, New Zealand and is an 18-year veteran in application development with a love for networking. His passion in life is to make complex things simple using AWS.

Jason Hunter

Jason Hunter

Jason is a California-based Principal Solutions Architect specializing in Amazon DynamoDB. He’s been working with NoSQL databases since 2003. He’s known for his contributions to Java, open source, and XML. You can find more DynamoDB posts and others posts written by Jason Hunter in the AWS Database Blog.