AWS Database Blog
Use Spring Cloud to capture Amazon DynamoDB changes through Amazon Kinesis Data Streams
Many organizations are interested in using Amazon DynamoDB in their applications to benefit from its scale, reliability, scalability, and powerful set of features. A particularly interesting feature is the capability of DynamoDB to stream table-level changes in near real-time, either through Amazon DynamoDB Streams or Amazon Kinesis Data Streams (for information about the differences between services, see Streaming options for change data capture). However, getting the most out of DynamoDB and using these streaming capabilities require knowledge of the AWS SDK, which can be a challenge for some organizations.
Thankfully for most developers, the open source community has been hard at work adding the right level of abstraction to the Java Spring framework, helping them use familiar libraries to interact with many AWS services.
In this post, we demonstrate how you can use Spring Cloud to interact with DynamoDB and capture table-level changes using Kinesis Data Streams through familiar Spring constructs (Kinesis Data Streams was selected because the latest version of Spring Cloud Streams does not support native DynamoDB Streams). We run you through a basic implementation and configuration that will help you get started.
The Spring framework and its components
Java is undisputedly the most popular language in enterprise application development. According to research firm Gartner, application leaders ranked Java as the number one language for custom application development in large enterprises in 2022. The main drivers for Java’s enterprise dominance are its stability, scalability, and strong community support. In a recent survey of tech decision-makers at enterprises, 57% said over 75% of their business-critical apps rely on Java. Java skills also transfer across tech generations, protecting enterprise Java investments. These factors cement Java’s position as the number one backend language for enterprise application development.
The Spring Framework is an open source Java platform that provides infrastructure support for developing robust Java applications. Spring handles common tasks like transaction management and dependency injection to simplify application development. Spring Boot builds on top of the Spring Framework to further ease building production-ready apps with opinionated auto-configuration and an embedded web server. Spring Cloud offers additional libraries for building distributed systems with patterns like service discovery, circuit breakers, and gateways. It handles complexities around distributed transactions, security, and deployment orchestration. Spring Cloud Stream is a framework for building event-driven microservices connected with scalable messaging systems. It provides an abstraction over platforms like Kafka to facilitate asynchronous communication of data streams between Spring Boot microservices, enabling robust event-driven architectures.
Together, these Spring projects help developers rapidly build, connect and scale Java applications from monoliths to microservices.
Solution overview
To illustrate this use case, we use a simplified example from the AWS prescriptive guidance: the transactional outbox pattern, which resolves the dual write operations issue that occurs in distributed systems when a single operation involves both a database write operation and a message or event notification. A dual write operation occurs when an application writes to two different systems; for example, when a microservice needs to persist data in the database and send a message to notify other systems. A failure in one of these operations might result in inconsistent data.
The solution as shown in the following diagram uses Amazon Elastic Container Service (Amazon ECS), DynamoDB, and Kinesis Data Streams.
Prerequisites
To follow along with this post, you should have the following:
- Familiarity with modern Java and Spring
- Familiarity with the AWS Cloud Development Kit (AWS CDK) and the command line
- Familiarity with IntelliJ or VSCode
- Docker installed and daemon running
You can find the AWS CDK code to deploy the required infrastructure solution, as well as the sample application Java code, in the GitHub repo. However, we encourage you to go through the code step by step in this post, because we explain important concepts throughout.
Code walkthrough
We use Spring Initializr
to help us get started. Complete the following steps:
- Create a new project using the parameters below. (The following screenshot illustrates using IntelliJ.)
- Name:
flightApp
- Location: <FOLDER_OF_YOUR_CHOICE>
- Language:
Java
- Type:
Gradle – Groovy
- Group:
com.amazonaws.samples
- Artifact:
springCloudBlogPost
- JDK: <JDK_17_INSTALLED_ON_YOUR_MACHINE>
- Java:
17
- Packaging:
Jar
- Name:
- Choose the latest stable version of Spring Boot (3.2.2 as of this post) and choose Create.
You should have the following folder structure.
- Open a terminal in the root folder in which you created your
flightApp
application and run the following commands:
You should have the following folder structure in your root folder:
Let’s develop a basic, working version of the application.
- Add the missing dependencies to your
build.gradle
file:
- Create a new
model
package under yourcom.amazonaws.samples.springcloudblogpost
package and add a newFlight.java
class to it:
The @DynamoDbBean
class annotation tells Spring Cloud that this entity needs to be mapped to a DynamoDB table (by default, the name of the table will match the name of the class). The @DynamoDbPartitionKey
annotation on the getter tells Spring Cloud that this will be the table partition key. This is the only required attribute for a basic implementation. So far, you have been using basic features of the AWS SDK.
- Next, create a new
controller
package in the same root package and add a newFlightController.java
class to it, where you will use Spring Cloud:
The DynamoDbTemplate
is a Spring Cloud construct (a default implementation of DynamoDBOperations
) that is built on top of the AWS SDK-provided DynamoDbEnhancedClient
. The goal of the DynamoDbTemplate
is to provide you with a programming model similar to JPA where a class is turned into an entity through annotations and which provides simple methods to perform typical CRUD operations.
In this use case, dynamoDbTemplate.save(flight)
is sufficient to save a flight item to DynamoDB, and IterableUtils.toList(dynamoDbTemplate.scanAll(Flight.class).items())
is sufficient to return a list of flights from DynamoDB (using a simple but not particularly optimal broad scan operation).
- Add a simple
Dockerfile
to the root of the project (it will be used to containerize your application before deploying it to Amazon ECS):
- Deploy the basic infrastructure before getting started (run the following commands in the
infra
folder):
If your public IP changes and you lose access to the application, you can update it directly on the AWS Management Console by navigating to AWS WAF and AWS Shield, and then by choosing the IP sets in the navigation pane. Make sure you choose the AWS Region you deployed your stack in, and then update the IP set with your new public IP.
After a few minutes, the deployment should be done.
- Deploy the first version of your application using the following AWS CDK command:
After a few minutes, the deployment will complete and you will be presented with the Application Load Balancer (ALB) URL that you can use to make API calls.
- You can run a quick test (provide the ALB URL that your
cdk deploy
commands output):
The GET endpoint is working and is returning an empty collection, which is expected because you haven’t booked any flights.
At this point, you have a functioning Spring Boot application that exposes two endpoints (GET /flights
and POST /flights
) that let customers book flights and retrieve their bookings (but not change or cancel them). Flights are saved in a DynamoDB table named flight
. Because the table has been configured to send all updates to a Kinesis data stream that you defined in the AWS CDK template, the next step is to capture and process these changes. For that, you use Spring Cloud Streams.
Before we run you through the code, let’s take a step back to understand a few important concepts. Spring Cloud Streams use binders, which are messaging systems-specific libraries, binding channels, and binding functions, as illustrated in the following figure.
In your case:
- The external messaging system is Kinesis Data Streams
- The external destination binder is the Kinesis binder (provided by the community)
- The consumer binding function is what you define in your code
- The input binding channel is what you define in your configuration
Let’s create our first function, and then see how to configure it. First, you print the raw message that DynamoDB sends to the Kinesis data stream.
- Create a new
service
package under the root package and add the followingChangeCaptureService.java
class:
- Add the following lines to your
application.properties
file (in theresources
folder):
- Deploy a new version of your application:
- Book a new flight (provide the ALB URL that your
cdk deploy
commands output):
You should get the following response:
If you look at your Amazon ECS service logs, you should the see the following message (you can search for the UUID that was printed as a result of your POST request):
Great! You inserted a new flight into the DynamoDB table, and DynamoDB sent all the relevant information to Kinesis Data Streams, which was picked up by your printDynamoDBmessage
function.
Let’s unpack what happened and how:
- You added the Kinesis binder to your dependencies, and because it was the only one, there was no need to specify it anywhere in the configuration file. The application automatically bound itself to the Kinesis data stream you defined.
- You enabled the usage of the Kinesis Client Library and Kinesis Producer Library for all message consumption and production. Unless you require the use of Spring Cloud streaming default mechanisms, we recommend always having this parameter set to
true
because the Kinesis Client Library has capabilities to simplify shard discovery and therefore avoid potential issues during re-sharding. - You defined a consumer function that Spring Cloud will automatically bind to the input channel you defined in the configuration.
- Finally, and the most important part of the process, you defined the relevant
channels
with the proper convention:
- In your example, the following configuration parameters told Spring Cloud Stream that the function
printDynamoDBmessage
will subscribe to the Kinesis data stream you defined in the infrastructure as code template and that it can expect to receive a message in JSON format:
- Spring Cloud Stream uses consumer groups, a concept borrowed from Kafka. To prevent both our tasks from picking up the same message, we defined them as belonging to the same consumer group:
And that’s it! With just a few lines of code and configuration, you have successfully created a resilient and scalable application that can subscribe to a Kinesis data stream and ingest its messages.
Admittedly, printing out the raw message is not very useful. What if you want to use that flight event and propagate it further into your application? Maybe notify a payment service that a new booking has been made? Let’s do that.
- Start by creating a
FlightEvent.java
record, along with aFlightStatus.java
enum
in themodel
package:
But how do you parse that DynamoDB message into a FlightEvent
type? You could do that in the consumer function, but Spring Cloud Stream offers a way to define your own type in each channel.
- Start by creating a new
configuration
package under the root package and add a newDynamoDBMessageConverter.java
class to it:
You tell Spring Cloud this can be used as a custom parser by:
- Inheriting from
AbstractMessageConverter
- Defining a new
MimeType
(in our case,application/ddb
) - Overriding the
supports
method with the class type that you want to be parsing the message to - Overriding the
convertFromInternal
method with your parsing logic
- Create a
CustomConfiguration.java
class in theconfiguration
package that will tell Spring Boot to create a new instance of your converter at start up (using the@Configuration
annotation):
You’re almost there! Now you can tell Spring Cloud to use that custom message converter when processing messages coming from your Kinesis data stream.
- First, create a new
Consumer
function in theChangeCaptureService
class:
- Add the relevant lines in your
application.properties
file:
- Deploy this new version:
- Let’s book another flight. If things work as intended, you should see two log lines: one with the raw DynamoDB message and one with a
String
representation of theFlightEvent
record:
You should get the following response:
Open the Amazon ECS logs, and you should find the following lines:
And you’re done! To sum up, you have accomplished the following throughout this post:
- Built a simple Spring Boot backend API serving two endpoints
- Set up DynamoDB as your data store
- Captured any changes made to the table in a customizable way
- And all the while writing very minimal boilerplate code
Clean up
To clean up the resources that were created as part of this post, run the following commands (in the infra
folder):
If you followed the instructions and booked flights, data was inserted into the DynamoDB tables, so you have to manually delete them:
- Open the DynamoDB console in the Region in which you deployed your infrastructure.
- Choose Tables in the navigation pane.
- Select all relevant tables.
- Choose Delete and follow the instructions.
Similarly, you have to manually delete the Kinesis data stream:
- Open the Kinesis Data Streams console in the Region in which you deployed your infrastructure.
- Choose Data Streams in the navigation pane.
- Select your Kinesis data stream, and on the Action menu, choose Delete.
Conclusion
In this post, we provided a demonstration of what you can accomplish with DynamoDB, Kinesis Data Stream, Spring Boot and Spring Cloud. To learn more about DynamoDB, please see the DynamoDB Developer Guide and to go further with Spring, we encourage you to explore the official Spring documentation, specifically Spring Cloud AWS and Spring Cloud Stream. Try out this implementation and let us know your thoughts in the comments.
About the Author
Camille Birbes is a Senior Solutions Architect with AWS and is based in Hong Kong. He works with major financial institutions to design and build secure, scalable, and highly available solutions in the cloud. Outside of work, Camille enjoys any form of gaming, from board games to the latest video game. You can find Camille on LinkedIn.