AWS Database Blog

Implementing version control using Amazon DynamoDB

Some applications require you to record changes to data over time, and you can identify these changes by a timestamp or number so it’s possible to retrieve specific versions later. In addition, you should be able to easily retrieve the most recent version and the design of the application to maintain data integrity as versions consistently increase.

This post covers how to design and implement time-based and number-based version control in Amazon DynamoDB. A composite primary key is used for all four examples to model historical versions of data and to enable easy retrieval of the most recent version of data. You can find a Python implementation of the following solutions in the GitHub repo.

Time-based versioning

When data changes occur in sequence over time, you can use a timestamp to version data whenever it accompanies the changed data. For example, perhaps you need to design a data model for a factory that has many parts, and each part sends state data every 2 minutes. You’re required to store the historical state of each part in the database. The following data model illustrates how you could model this data in DynamoDB.

The following data model illustrates how you could model this data in DynamoDB.

Each state data (1) is added to the equipment item collection, and the sort key holds the timestamp accompanied by the state data. The Metadata item (2) acts as metadata for an equipment entity, containing attributes specific to this entity, such as Name, FactoryID, and LineID. This item is rarely updated.

The following code illustrates the Python implementation of how to store and retrieve the latest state data for an equipment item:

# Example state data 
event = {
    'ID': 'Equipment#1',
    'State': 'WARNING1',
    'Time': '2020-11-12T20:04:00'
}

ddb = boto3.resource('dynamodb')
table = ddb.Table('VersionControl')

# Add the new state data item 
table.put_item(
    Item={
        'PK': event['ID'],
        'SK': event['Time'],
        'State': event['State']
    }
) 

# Retrieve the latest state data
response = table.query(
    KeyConditionExpression = 
        Key('PK').eq(event['ID']) & Key('SK').begins_with('2'), 
    # Strongly consistent read
    ConsistentRead = True,
    # Sort items in descending order
    ScanIndexForward = False,
    # Specifies the maximum number of items to evaluate 
    Limit = 1      
)

items = response['Items']

The data type of the sort key attribute is String, which means that items in an item collection are sorted in order of UTF-8 bytes. When running a query operation, the results are returned by default in ascending order. In the preceding code, to reverse the order, the ScanIndexForward parameter is set to false, meaning that the item with Metadata as sort key is the first returned item, followed by the items with timestamp as sort key, from the latest to the earliest. To retrieve only the item with the latest date in the sort key, you use the begins_with() function in the KeyConditionExpression, as well as Limit, which specifies the maximum number of items to evaluate.

Number-based versioning

In contrast, some applications require a number-based version that gets updated to the next higher version after each change to the data, even though a timestamp might be available. To model historical versions of data and easily retrieve the most recent version of data, you can use a composite primary key for your DynamoDB table and follow the version control design pattern using sort key prefixes.

Let’s assume you need to use a number-based version instead of a time-based version for our factory use case, where you are required to store the historical state of each equipment in the database. The following data model illustrates how you can model this data in DynamoDB.

The following data model illustrates how you can model this data in DynamoDB.

The item with the Metadata sort key (1) acts as metadata for an equipment entity, containing attributes specific to the entity, such as Name, FactoryID, and LineID. This item is rarely updated.

The item with v0 in the sort key (2) holds a copy of the latest equipment revision, including all of its attributes, and also the Latest attribute, which holds the latest version number. This item is always updated with the content of the most recent version.

Every time the equipment is updated, a new item with the next higher version in the sort key and the updated contents is added (3).

If your data model requires it, a sort key can include additional data after the version prefix.

Implementing using atomic counters

Depending on how an application is designed, the version number of the data might or might not be supplied in the update request. When a version is not supplied, you can use atomic counters to increment the version number; in that case, there is no need to first retrieve the value of the Latest attribute from the table. To add the new revision item, set the ReturnValues parameter to UPDATED_NEW for the update operation, retrieve the value of the Latest attribute from the response, and pass it to the put operation as part of the sort key value. You can’t use a transactional write in this case because you have to retrieve the value of the Latest attribute from the response of the update operation and pass it to the put operation.

Although this implementation works well, it’s obvious that it’s not idempotent. If the update operation fails, retry both the update and put operations together. If the update operation succeeds and the put operation fails, retry only the put operation to avoid re-incrementing the version in the update operation.

Configure the retry mode and maximum attempts settings in the AWS SDK according to your application requirements; more retry attempts means that the function takes longer to respond in case of failure. For example, the AWS Lambda function running this code takes more time to complete and clients must wait longer for a response. Time-critical or latency-sensitive applications may prefer to fail without retrying, or to handle the error elsewhere. See the following code:

# Example state data 
event = {
    'ID': 'Equipment#1',
    'State': 'WARNING1',
    'Time': '2020-11-01T20:04:00'
}

ddb = boto3.resource('dynamodb')
table = ddb.Table('VersionControl')

# Update the item that contains the latest version and content
response = table.update_item(
    Key={
        'PK': event['ID'],
        'SK': 'v0'
    },
    # Atomic counter is used to increment the latest version
    UpdateExpression='SET Latest = if_not_exists(Latest, :defaultval) + :incrval, #time = :time, #state = :state',
    ExpressionAttributeNames={
        '#time': 'Time',
        '#state': 'State'
    },
    ExpressionAttributeValues={
        ':time': event['Time'],
        ':state': event['State'], 
        ':defaultval': 0,
        ':incrval': 1
    },
    # return the affected attribute after the update
    ReturnValues='UPDATED_NEW'  
)

# Get the updated version
latest_version = response['Attributes']['Latest']

# Add the new item with the latest version
table.put_item(
    Item={
        'PK': event['ID'],
        'SK': 'v' + str(latest_version),
        'Time': event['Time'],
        'State': event['State']
    }
)  

Implementing using DynamoDB Streams, Lambda functions, and atomic counters

You can use the change data capture feature in DynamoDB to simulate a transactional write, including the update and put operations, and at the same time decouple these two operations to make the error handling non-blocking. You can use either Amazon Kinesis Data Streams for DynamoDB or DynamoDB Streams in the implementation; the following diagram and code show how you can implement version control with DynamoDB Streams and Lambda.

Because each Lambda function only has one responsibility, the error handling is simpler and can be configured on the Lambda event source mapping, for example using BisectBatchOnFunctionError. When you configure DynamoDB Streams for your table in DynamoDB, choose NEW_IMAGE or NEW_AND_OLD_IMAGES for StreamViewType based on what your use case needs.

When you configure DynamoDB Streams for your table in DynamoDB, choose NEW_IMAGE or NEW_AND_OLD_IMAGES for StreamViewType based on what your use case needs.

The following code is the Lambda function that writes the latest version to the DynamoDB table:

# Lambda function that writes the latest version to DynamoDB table
def handler(event, _):

    ddb = boto3.resource('dynamodb')
    table = ddb.Table('VersionControl')
    
    # Update the item that contains the latest version and content
    table.update_item(
        Key={
            'PK': event['ID'],
            'SK': 'v0'
        },
        # Atomic counter is used to increment the latest version
        UpdateExpression=
            'SET Latest = if_not_exists(Latest, :defaultval) + :incrval, #time = :time, #state = :state',
        ExpressionAttributeNames={
            '#time': 'Time',
            '#state': 'State'
        },
        ExpressionAttributeValues={
            ':time': event['Time'],
            ':state': event['State'], 
            ':defaultval': 0,
            ':incrval': 1
        }
    )

When the update operation on the item with v0 in the sort key is performed successfully, a record containing the updated item including the Latest attribute is available in the DynamoDB stream and can be read by the Lambda function. In case of failure after maximum retry attempts in the put operation or in the Lambda function, you can configure a dead-letter queue (DLQ) for the failed records to be manually processed later. This way, you ensure that the sequence of versions is saved and you can later add this specific version to the table. For more information, see Error handling. The following code is the Lambda function that processes items from DynamoDB Streams:

# Lambda function that processes items from DynamoDB Stream
def handler(event, _):
    ddb = boto3.resource('dynamodb')
    table = ddb.Table('VersionControl')

    for record in event['Records']:
        if record['dynamodb']['Keys']['SK']['S'] == 'v0':
            new_image = record['dynamodb']['NewImage']
            latest_id = new_image['PK']['S']
            latest_version = new_image['Latest']['N']
            latest_time = new_image['Time']['S']
            latest_state = new_image['State']['S']

            # Add the new item with the latest version
            table.put_item(
                Item={
                    'PK': latest_id,
                    'SK': 'v' + str(latest_version),
                    'Time': latest_time,
                    'State': latest_state
                }
            )

Using DynamoDB Streams provides a flexible solution with independent error handling, but it comes with the additional cost of DynamoDB Streams and Lambda invocations. There is also a delay between the update and put operations towards the table.

Implementing using TransactionalWrite

To ensure the accuracy and consistency of the version control, this implementation uses the optimistic concurrency control method as well as transactional writes, where update and put actions are grouped in a single TransactWriteItems operation that either succeeds or fails as a unit. This implementation is suitable for applications that can’t tolerate inconsistent versioning, or delays between the update and put operations. However, transactional operations consume more WCUs than the standard put and update operations.

The optimistic concurrency control method protects the data from being overwritten by the writes of others. Let’s assume that the state of the equipment is changing every second and the application is required to store these changes in the table. You either know what the latest version number for this specific equipment is, or you need to retrieve it from the item with v0 in the sort key.

By performing a conditional update, you can update the item but only if the version number of the item stored in the table hasn’t changed. If there is a version mismatch, it means that the item got modified after you retrieved it but before you performed the update operation. As a result, the update attempt fails. If this happens, you can try again by repeating the process. See the following code:

ddb = boto3.client('dynamodb')

# Retrieve the latest version
response_latest_version = ddb.get_item(
    TableName = 'VersionControl',
    Key = {
        'PK': {'S': event['ID']},
        'SK': {'S': 'v0'}
    }
)
latest_version = 0
higher_version = 1
# Extract 'Latest' from response
if 'Item' in response_latest_version:
    latest_version = response_latest_version['Item']['Latest']['N']
    higher_version = int(latest_version) + 1

# Transactional write where Update and Put are grouped together
ddb.transact_write_items(
    TransactItems = [
        {
            'Update': {
                'TableName': 'VersionControl',
                'Key': {
                    'PK': {
                        'S': event['ID']
                    },
                    'SK': {
                        'S': 'v0'
                    }
                },
                # Conditional write makes the update idempotent here 
                # since the conditional check is on the same attribute 
                # that is being updated.
                'ConditionExpression': 
                    'attribute_not_exists(#latest) OR #latest = :latest',
                'UpdateExpression': 'SET #latest = :higher_version, #time = :time, #state = :state',
                'ExpressionAttributeNames': {
                    '#latest': 'Latest',
                    '#time': 'Time',
                    '#state': 'State'
                },
                'ExpressionAttributeValues': {
                    ':latest': {
                        'N': str(latest_version)
                    },
                    ':higher_version': {
                        'N': str(higher_version)
                    },
                    ':time': {
                        'S': event['Time']
                    },
                    ':state': {
                        'S': event['State']
                    }
                }
            }
        },
        {
            'Put': {
                'TableName': 'VersionControl',
                'Item': {
                    'PK': {
                        'S': event['ID']
                    },
                    'SK': {
                        'S': 'v' + str(higher_version)
                    },
                    'Time': {
                        'S': event['Time']
                    },
                    'State': {
                        'S': event['State']
                    }
                }
            }
        }
    ]
)

Conclusion

When designing for an application, the requirements that you need to satisfy determine whether to choose a time-based or a number-based version. When you use a composite primary key for a table in DynamoDB, an entity item collection is modeled in such a way that you can easily retrieve the latest version while at the same time maintain the historical revisions. When you design and implement for a use case that requires version control, consider how frequent new revisions are added and how tolerant your use case is towards inconsistent versioning.


About the Author

Samaneh Utter is an Amazon DynamoDB Specialist Solutions Architect based in Göteborg, Sweden.