AWS Database Blog

How Channel Corporation modernized their architecture with Amazon DynamoDB, Part 2: Streams

This post is co-written with TaeHun Yoon and Changsoon Kim from Channel Corporation.

Channel Corporation is a B2B software as a service (SaaS) startup that operates the all-in-one artificial intelligence (AI) messenger Channel Talk. In Part 1 of this series, we introduced our motivation for NoSQL adoption, technical problems with business growth, and considerations for migration from PostgreSQL to Amazon DynamoDB. In this post, we share our experience integrating with other services to solve areas that couldn’t be addressed with DynamoDB alone.

Background: Structured and unstructured data retrieval problems

There are some key differences between relational data design and NoSQL. DynamoDB excels at efficient data retrieval for well-defined access patterns, offering optimized performance for specific query types. Given this characteristic, the following Channel Corporation use cases were difficult to solve with DynamoDB alone:

  • Searching for structured data with various filtering conditions (message search) – Channel Talk allows users to quickly search for a conversation. Users should be able to search by adding various filters such as assignee, team, and follower, as illustrated in the following screenshot.

[Add Image 001]

  • Searching for unstructured data (customer data search) – Customer data has a different schema depending on what information the Channel Corporation’s customer enters. This schema is not fixed, and at the time of writing, it has the characteristic of being able to quickly search up to 1 million customer data records using multiple fields.

[Add Image 002]

Channel Corporation decided that it would be more effective and efficient to use a purpose-built search service such as Amazon OpenSearch Service with DynamoDB to solve these two problems. To do this, data synchronization between DynamoDB and other services is required. DynamoDB has zero-ETL integration with Amazon OpenSearch Service, but Channel Corporation currently uses self-managed extract, transform, and load(ETL) pipeline with the following reasons:

  • It was necessary to transfer only the minimum information necessary for searchable data. Plus, we needed to ignore changes in the value of a specific attribute, and our logic compared the changes between the existing and current value.
  • In order to ensure idempotence in search service, there were cases where records had to be changed and inserted using soft delete instead of directly deleting them.

Integrating streams with DynamoDB

A series of sequential events ordered in time is called a stream. DynamoDB provides two ways to run change data capture (CDC) as a stream:

  • Amazon DynamoDB Streams captures a time-ordered sequence of item-level modifications in a DynamoDB table and stores this information in a log for up to 24 hours
  • Amazon Kinesis Data Streams captures item-level modifications in a DynamoDB table and replicates them to a Kinesis data stream

Channel Corporation was able to use these services to solve the message and customer data search issues by passing the changed data of DynamoDB to a service that can search well through a stream.

The following diagram illustrates the workflow using DynamoDB Streams.

[Add Image 003]

This solution offers the following characteristics:

  • Reading from DynamoDB Streams is free for AWS Lambda based consumers
  • It offers a deduplicated, time-ordered sequence of item-level modifications

However, it has the following considerations:

  • You need to handle failures in Lambda, the stream processing layer
  • There is a possibility of missing events if the starting position is set to LATEST

The following diagram illustrates the workflow using Kinesis Data Streams.

[Add Image 004]

This solution offers the following characteristics:

However, consider the following:

  • It needs to handle failures in Lambda, the stream processing layer
  • There is a possibility of missing events if the starting position is set to LATEST
  • There is a possibility of reverse or duplicated events

To understand this in more detail, let’s look at how data is passed from DynamoDB to each stream and how Lambda performs operations on the streams.

DynamoDB Streams

DynamoDB sends changes that are performed on each item within a partition to the corresponding shard by maintaining the order of the changes made. This process makes sure a given key is present in at most one shard and its order is maintained.

[Add Image 005]

Kinesis Data Streams

Data records in Kinesis Data Streams may appear in a different order than when item changes occurred, and the same item notification may appear in the stream more than once. You can check the ApproximateCreationDateTime attribute to identify the order that the item modifications occurred in, and to identify duplicate records.

[Add Image 006]

How Lambda performs operations on streams

An event source mapping is a Lambda resource that reads items from stream- and queue-based services and invokes a function with batches of records. You can use an event source mapping to process events from streams or queues of services that don’t directly invoke Lambda functions. Let’s approach the characteristics of each solution and problems again with the understanding that Lambda is not invoked directly from DynamoDB Streams, but through that resource.

You can handle basic retry processing by setting the MaximumRecordAgeInSeconds and MaximumRetryAttempts values ​​of the event source mapping configuration. However, failures that can’t be resolved by only retry can occur due to various reasons, such as bugs in your Lambda code or issues faced during deployment.

When you look at the event source mapping resource, you can configure the On-failure destination setting to forward notifications about the records that could be processed to an Amazon Simple Queue Service (Amazon SQS) queue or Amazon Simple Notification Service (Amazon SNS) topic. The following example shows an invocation record for a DynamoDB stream:

{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}

Based on the preceding information, you should retrieve the record from DynamoDB Streams and try again. When retrieving the record from DynamoDB Streams and processing it, not all events are delivered in chronological order, so out of order delivery of events is possible.

When the BatchSize is more than 1 and some items fail to process due to transient failures, Lambda retries the entire batch, including previously processed records. Without appropriate handling, this can result in duplicate events.

In addition, if the starting position of the event source mapping is set to LATEST, some events may be missed.

Sometimes values set in the event source mapping, such as MaximumRetryAttempts and MaximumRecordAgeInSeconds, need to be changed depending on error handling and situations, unlike the initial settings. In this case, some records may be missed unintentionally.

If the starting position is changed to TRIM_HORIZON to solve this, all data in DynamoDB Streams will be delivered to the event consumer from the beginning, which may result in duplicate processing of events.

Problem-solving with DynamoDB Streams and Kinesis Data Streams

We believe that both DynamoDB Streams and Kinesis Data Streams have the ability to solve similar problems. For this post, we discuss the following use cases:

  • Is it possible to write all stream processing functions idempotently?
  • Can we retry when a problem occurs in a Lamdba function?

Idempotence

One of the most important things in stream processing is to design your logic idempotent. You should validate incoming events and determine if they’ve been processed before. If you write your event consumer idempotently, many problems can be solved.

For example, let’s look at a situation where events appear in the stream out of order.

[Add Image 007]

As shown in the preceding figure, data integrity can be broken due to processing of events out of order.

To solve this, if all events occurring in create, update, and delete operations are performed in chronological order, there will be no problem because each state is the same final state.

In other words, if the last current state is the result of the most recent event, the preceding problem can be easily solved.

To do this, let’s rewrite the implementation, assuming that updates are only performed if the timestamp representing the time is greater, so that only events after the current state are performed.

[Add Image 008]

In this case, out of order delivery occurred, but because it’s an event in the past rather than the current state, it is not performed, so the same result value can be obtained. DynamoDB allows optimistic locking with version number, and the version number automatically increases each time an item is updated. An update or delete request is possible only if the client-side object version matches the corresponding item version number in the DynamoDB table. If you use this characteristic, you can solve the problem.

If we perform the same logic as before, we have no issues with create and update operations, but there is a case where it becomes a problem for deletes.

[Add Image 009]

Even in the case of deletes, the problem can be solved by using soft delete instead of hard delete for records in the service to enforce the order of occurrence of events. For example, if A is deleted and there is information about when it was deleted, we can use the information and drop the event.

[Add Image 010]

Retry strategy for failure

Now, let’s assume that all logic is written idempotently and talk about whether you can retry when a problem occurs.

Both Kinesis Data Streams and DynamoDB Streams can use the On-failure destination option and redeliver past data to stream consumers. However, the strategies for the two streams may be different:

  • DynamoDB Streams – DynamoDB Streams provides LATEST and TRIM_HORIZON in the starting position of the event source mapping. This means that in order to get records from a specific point in time again, a separate application should exist to read and reprocess from a specific sequence number in a specific shard to the desired point in time.
  • Kinesis Data Streams – Kinesis Data Streams provides five options, including AT_TIMESTAMP, in the starting position of the event source mapping. This feature allows us to go back to the point just before the problem occurred, update only the event source mapping, and redeploy to resolve the problem.

Channel Corporation’s choice

We looked at the cases that can arise when synchronizing data with other services using the two streams provided by DynamoDB. It is difficult to say that it is unconditionally better to use a specific stream because the pros and cons of the two streams are different in terms of operational considerations and cost. Therefore, Channel Corporation uses both streams based on specific criteria.

In the following use cases, we use DynamoDB Streams:

  • When it’s important that events occur in chronological order
  • When it’s okay to have higher error recovery costs when a problem occurs

In the following use cases, we use Kinesis Data Streams:

  • When it’s important to recover quickly from a desired point in time when a problem occurs
  • When there is a case where more than two Lambda functions need to process the stream simultaneously

Online schema change strategy using DynamoDB Streams

As another example of using streams, Channel Corporation uses DynamoDB Streams to perform online schema change. The same approach can be used to migrate between different AWS accounts. The following diagram illustrates the workflow.

[Add Image 011]

This workflow includes the following steps:

  1. The first step consists of two parts:
    1. Create a new table with a new schema in DynamoDB.
    2. Deploy a Lambda function that consumes the DynamoDB Streams event of the old table and converts it to the new table schema.
  2. Read the historical data before the Lambda function was deployed and change it to the new schema.
  3. Deploy new API servers.

This process enables us to perform live schema change even when there are significant schema changes. In Step 2, there are various ways to input data to the new table, such as Amazon EMR, AWS Glue, or a custom application.

When you need to insert data from a specific point in time into a new DynamoDB table, there are also many things to care about due to idempotence. To simplify this, Channel Corporation creates a pipeline as illustrated in the preceding figure and increases the version of all existing items by 1. In this case, all changed items are moved to DynamoDB Streams and Lambda can process them to a new schema, so you can transfer data to the new table without much concern.

Conclusion

With DynamoDB, scaling is nearly infinite, and dependencies with various downstream services are eliminated. For Channel Corporation, the combination of DynamoDB and Kinesis Data Streams offers a robust solution for application deployment. This pairing enables quick recovery from a specific point in time if issues arise during deployment. As a result, developers can confidently perform deployments at any time, knowing they have a reliable fallback mechanism in place. Finally, we can implement an online schema change strategy leveraging one of the streaming options for DynamoDB to remove legacy tables and efficiently manage tables.

Consider implementing a similar solution for your own use case, and leave your questions in the comments section.


About the Authors

TaeHun (Clsan) Yoon, a seasoned professional in the realm of computer engineering, spearheads innovative solutions as the CTO (Chief Technology Officer) at Channel Corp. With a keen focus on enhancing the chatting experience, TaeHun and his team are dedicated to resolving diverse challenges encountered by customers.

Changsoon (CS) Kim is a DevOps Engineer at Channel Corp. He is interested in efficiently resolving issues between development and operations.

Sungbae Park is Account Manager in the AWS Startup Team and a regular member of AWS SaaS TFC (Technical Field Communities). He is currently focusing on helping B2B software startups grow and succeed with AWS. Sungbae previously worked as Partner Development Manager making mutual growth with MSP, SI, and ISV partners.

Hyuk Lee is a Sr. DynamoDB Specialist Solutions Architect based in South Korea. He loves helping customers modernize their architecture using the capabilities of Amazon DynamoDB.