AWS Database Blog

Use Spring Cloud to capture Amazon DynamoDB changes through Amazon Kinesis Data Streams

Many organizations are interested in using Amazon DynamoDB in their applications to benefit from its scale, reliability, scalability, and powerful set of features. A particularly interesting feature is the capability of DynamoDB to stream table-level changes in near real-time, either through Amazon DynamoDB Streams or Amazon Kinesis Data Streams (for information about the differences between services, see Streaming options for change data capture). However, getting the most out of DynamoDB and using these streaming capabilities require knowledge of the AWS SDK, which can be a challenge for some organizations.

Thankfully for most developers, the open source community has been hard at work adding the right level of abstraction to the Java Spring framework, helping them use familiar libraries to interact with many AWS services.

In this post, we demonstrate how you can use Spring Cloud to interact with DynamoDB and capture table-level changes using Kinesis Data Streams through familiar Spring constructs (Kinesis Data Streams was selected because the latest version of Spring Cloud Streams does not support native DynamoDB Streams). We run you through a basic implementation and configuration that will help you get started.

The Spring framework and its components

Java is undisputedly the most popular language in enterprise application development. According to research firm Gartner, application leaders ranked Java as the number one language for custom application development in large enterprises in 2022. The main drivers for Java’s enterprise dominance are its stability, scalability, and strong community support. In a recent survey of tech decision-makers at enterprises, 57% said over 75% of their business-critical apps rely on Java. Java skills also transfer across tech generations, protecting enterprise Java investments. These factors cement Java’s position as the number one backend language for enterprise application development.

The Spring Framework is an open source Java platform that provides infrastructure support for developing robust Java applications. Spring handles common tasks like transaction management and dependency injection to simplify application development. Spring Boot builds on top of the Spring Framework to further ease building production-ready apps with opinionated auto-configuration and an embedded web server. Spring Cloud offers additional libraries for building distributed systems with patterns like service discovery, circuit breakers, and gateways. It handles complexities around distributed transactions, security, and deployment orchestration. Spring Cloud Stream is a framework for building event-driven microservices connected with scalable messaging systems. It provides an abstraction over platforms like Kafka to facilitate asynchronous communication of data streams between Spring Boot microservices, enabling robust event-driven architectures.

Together, these Spring projects help developers rapidly build, connect and scale Java applications from monoliths to microservices.

Solution overview

To illustrate this use case, we use a simplified example from the AWS prescriptive guidance: the transactional outbox pattern, which resolves the dual write operations issue that occurs in distributed systems when a single operation involves both a database write operation and a message or event notification. A dual write operation occurs when an application writes to two different systems; for example, when a microservice needs to persist data in the database and send a message to notify other systems. A failure in one of these operations might result in inconsistent data.

The solution as shown in the following diagram uses Amazon Elastic Container Service (Amazon ECS), DynamoDB, and Kinesis Data Streams.

Architecture of the solution

Prerequisites

To follow along with this post, you should have the following:

  • Familiarity with modern Java and Spring
  • Familiarity with the AWS Cloud Development Kit (AWS CDK) and the command line
  • Familiarity with IntelliJ or VSCode
  • Docker installed and daemon running

You can find the AWS CDK code to deploy the required infrastructure solution, as well as the sample application Java code, in the GitHub repo. However, we encourage you to go through the code step by step in this post, because we explain important concepts throughout.

Code walkthrough

We use Spring Initializr to help us get started. Complete the following steps:

  1. Create a new project using the parameters below. (The following screenshot illustrates using IntelliJ.)
    1. Name: flightApp
    2. Location: <FOLDER_OF_YOUR_CHOICE>
    3. Language: Java
    4. Type: Gradle – Groovy
    5. Group: com.amazonaws.samples
    6. Artifact: springCloudBlogPost
    7. JDK: <JDK_17_INSTALLED_ON_YOUR_MACHINE>
    8. Java: 17
    9. Packaging: Jar

Project creation

  1. Choose the latest stable version of Spring Boot (3.2.2 as of this post) and choose Create.

You should have the following folder structure.

File tree structure

  1. Open a terminal in the root folder in which you created your flightApp application and run the following commands:
git clone https://github.com/aws-samples/dynamodb-kinesis-spring-blog.git
mv dynamodb-kinesis-spring-blog/infra/ .

You should have the following folder structure in your root folder:

infra/
flightApp/

Let’s develop a basic, working version of the application.

  1. Add the missing dependencies to your build.gradle file:
dependencies {
    implementation('org.springframework.boot:spring-boot-starter-actuator')
    implementation('org.springframework.boot:spring-boot-starter-validation')
    implementation('org.springframework.boot:spring-boot-starter-web')
    implementation('org.springframework.cloud:spring-cloud-stream:4.1.0')
    implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2')
    implementation('org.apache.commons:commons-collections4:4.4')
    implementation(platform("software.amazon.awssdk:bom:2.22.13"))
    implementation("software.amazon.awssdk:dynamodb-enhanced")
    implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.1.0")
    implementation("io.awspring.cloud:spring-cloud-aws-starter-dynamodb")
    compileOnly('org.projectlombok:lombok')
    annotationProcessor('org.projectlombok:lombok')
    testImplementation('org.springframework.boot:spring-boot-starter-test')
    testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder:4.1.0")
}
  1. Create a new model package under your com.amazonaws.samples.springcloudblogpost package and add a new Flight.java class to it:
package com.amazonaws.samples.springcloudblogpost.model;

import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;

import java.time.LocalDateTime;
import java.util.UUID;

@DynamoDbBean
@Getter
@Setter
public class Flight {

    private UUID id;
    private String departureAirport;
    private String arrivalAirport;
    private LocalDateTime departureDateTime;
    private LocalDateTime arrivalDateTime;
    
    public Flight() {
        this.id = UUID.randomUUID();
    }

    @DynamoDbPartitionKey
    public UUID getId() {
        return id;
    }

}

The @DynamoDbBean class annotation tells Spring Cloud that this entity needs to be mapped to a DynamoDB table (by default, the name of the table will match the name of the class). The @DynamoDbPartitionKey annotation on the getter tells Spring Cloud that this will be the table partition key. This is the only required attribute for a basic implementation. So far, you have been using basic features of the AWS SDK.

  1. Next, create a new controller package in the same root package and add a new FlightController.java class to it, where you will use Spring Cloud:
package com.amazonaws.samples.springcloudblogpost.controller;

import com.amazonaws.samples.springcloudblogpost.model.Flight;
import io.awspring.cloud.dynamodb.DynamoDbTemplate;
import jakarta.validation.Valid;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.IterableUtils;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/api")
@AllArgsConstructor
public class FlightController {

    private final DynamoDbTemplate dynamoDbTemplate;

    @GetMapping("/flights")
    public List<Flight> getAllFlights() {
        return IterableUtils.toList(dynamoDbTemplate.scanAll(Flight.class).items());
    }

    @PostMapping("/flights")
    @Transactional
    public Flight createFlight(@Valid @RequestBody Flight flight) {
        return dynamoDbTemplate.save(flight);
    }
}

The DynamoDbTemplate is a Spring Cloud construct (a default implementation of DynamoDBOperations) that is built on top of the AWS SDK-provided DynamoDbEnhancedClient. The goal of the DynamoDbTemplate is to provide you with a programming model similar to JPA where a class is turned into an entity through annotations and which provides simple methods to perform typical CRUD operations.

In this use case, dynamoDbTemplate.save(flight) is sufficient to save a flight item to DynamoDB, and IterableUtils.toList(dynamoDbTemplate.scanAll(Flight.class).items()) is sufficient to return a list of flights from DynamoDB (using a simple but not particularly optimal broad scan operation).

  1. Add a simple Dockerfile to the root of the project (it will be used to containerize your application before deploying it to Amazon ECS):
FROM --platform=linux/arm64 amazoncorretto:17 AS build
WORKDIR /workspace/app

COPY . /workspace/app
RUN --mount=type=cache,target=/root/.gradle ./gradlew clean build -x test
RUN mkdir -p build/dependency && (cd build/dependency; jar -xf ../libs/*-SNAPSHOT.jar)

FROM --platform=linux/arm64 amazoncorretto:17
VOLUME /tmp
ARG DEPENDENCY=/workspace/app/build/dependency
COPY --from=build ${DEPENDENCY}/BOOT-INF/lib /app/lib
COPY --from=build ${DEPENDENCY}/META-INF /app/META-INF
COPY --from=build ${DEPENDENCY}/BOOT-INF/classes /app
ENTRYPOINT ["java","-cp","app:app/lib/*","com.amazonaws.samples.springcloudblogpost.FlightAppApplication"]
  1. Deploy the basic infrastructure before getting started (run the following commands in the infra folder):
npm install
cdk bootstrap
cdk deploy BaseStack –parameters myPublicIP=<REPLACE_WITH_YOUR_PUBLIC_IP>/32

If your public IP changes and you lose access to the application, you can update it directly on the AWS Management Console by navigating to AWS WAF and AWS Shield, and then by choosing the IP sets in the navigation pane. Make sure you choose the AWS Region you deployed your stack in, and then update the IP set with your new public IP.

After a few minutes, the deployment should be done.

  1. Deploy the first version of your application using the following AWS CDK command:
cdk deploy AppStack

After a few minutes, the deployment will complete and you will be presented with the Application Load Balancer (ALB) URL that you can use to make API calls.

  1. You can run a quick test (provide the ALB URL that your cdk deploy commands output):
curl http://<ALB_URL>/api/flights
> []

The GET endpoint is working and is returning an empty collection, which is expected because you haven’t booked any flights.

At this point, you have a functioning Spring Boot application that exposes two endpoints (GET /flights and POST /flights) that let customers book flights and retrieve their bookings (but not change or cancel them). Flights are saved in a DynamoDB table named flight. Because the table has been configured to send all updates to a Kinesis data stream that you defined in the AWS CDK template, the next step is to capture and process these changes. For that, you use Spring Cloud Streams.

Before we run you through the code, let’s take a step back to understand a few important concepts. Spring Cloud Streams use binders, which are messaging systems-specific libraries, binding channels, and binding functions, as illustrated in the following figure.

Spring Cloud Streams architecture

In your case:

  • The external messaging system is Kinesis Data Streams
  • The external destination binder is the Kinesis binder (provided by the community)
  • The consumer binding function is what you define in your code
  • The input binding channel is what you define in your configuration

Let’s create our first function, and then see how to configure it. First, you print the raw message that DynamoDB sends to the Kinesis data stream.

  1. Create a new service package under the root package and add the following ChangeCaptureService.java class:
package com.amazonaws.samples.springcloudblogpost.service;

import com.amazonaws.samples.springcloudblogpost.FlightAppApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import java.util.function.Consumer;

@Service
public class ChangeCaptureService {

    static final Logger logger = LoggerFactory.getLogger(
            FlightAppApplication.class
    );

    @Bean
    public Consumer<String> printDynamoDBmessage() {
        return logger::info;
    }
}
  1. Add the following lines to your application.properties file (in the resources folder):
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
spring.cloud.function.definition=printDynamoDBmessage;
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.group=1
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.destination=${kinesisstreamname}
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.content-type=application/json
  1. Deploy a new version of your application:
cdk deploy AppStack
  1. Book a new flight (provide the ALB URL that your cdk deploy commands output):
curl -d '{"departureAirport":"Paris","arrivalAirport":"London","departureDateTime":"2024-04-01T10:15:30","arrivalDateTime":"2024-04-01T10:15:30"}' -H 'Content-Type: application/json' http://<ALB_URL>/api/flights

You should get the following response:

{"id":"<AUTO_GENERATED_UUID>","departureAirport":"Paris",
"arrivalAirport":"London","departureDateTime":"2024-04-01T10:15:30",
"arrivalDateTime":"2024-04-01T10:15:30"}

If you look at your Amazon ECS service logs, you should the see the following message (you can search for the UUID that was printed as a result of your POST request):

{"awsRegion":"<AWS_REGION>",
 "eventID":"<UUID>",
 "eventName":"INSERT",
 "userIdentity":null,
 "recordFormat":"application/json",
 "tableName":"flight",
 "dynamodb":{
   "ApproximateCreationDateTime":<TIMESTAMP>,
   "Keys":{
     "id":{
       "S":"<UUID>"
       }
     },
     "NewImage":{
       "arrivalAirport":{"S":"London"},
       "arrivalDateTime":{"S":"2024-04-01T10:15:30"},
       "id":{"S":"<UUID>"},
       "departureAirport":{"S":"Paris"},
       "departureDateTime":{"S":"2024-04-01T10:15:30"}
     },
     "SizeBytes":187
    },
  "eventSource":"aws:dynamodb"
}

Great! You inserted a new flight into the DynamoDB table, and DynamoDB sent all the relevant information to Kinesis Data Streams, which was picked up by your printDynamoDBmessage function.

Let’s unpack what happened and how:

  • You added the Kinesis binder to your dependencies, and because it was the only one, there was no need to specify it anywhere in the configuration file. The application automatically bound itself to the Kinesis data stream you defined.
  • You enabled the usage of the Kinesis Client Library and Kinesis Producer Library for all message consumption and production. Unless you require the use of Spring Cloud streaming default mechanisms, we recommend always having this parameter set to true because the Kinesis Client Library has capabilities to simplify shard discovery and therefore avoid potential issues during re-sharding.
  • You defined a consumer function that Spring Cloud will automatically bind to the input channel you defined in the configuration.
  • Finally, and the most important part of the process, you defined the relevant channels with the proper convention:
input: <functionName> + -in- + <function parameter index>
output: <functionName> + -out- + <function parameter index>
  • In your example, the following configuration parameters told Spring Cloud Stream that the function printDynamoDBmessage will subscribe to the Kinesis data stream you defined in the infrastructure as code template and that it can expect to receive a message in JSON format:
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.destination=${kinesisstreamname}
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.content-type=application/json
  • Spring Cloud Stream uses consumer groups, a concept borrowed from Kafka. To prevent both our tasks from picking up the same message, we defined them as belonging to the same consumer group:
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.group=1

And that’s it! With just a few lines of code and configuration, you have successfully created a resilient and scalable application that can subscribe to a Kinesis data stream and ingest its messages.

Admittedly, printing out the raw message is not very useful. What if you want to use that flight event and propagate it further into your application? Maybe notify a payment service that a new booking has been made? Let’s do that.

  1. Start by creating a FlightEvent.java record, along with a FlightStatus.java enum in the model package:
package com.amazonaws.samples.springcloudblogpost.model;

import java.time.LocalDateTime;

public record FlightEvent(
    FlightStatus flightStatus,
    String departureAirport, 
    String arrivalAirport,
    LocalDateTime departureDateTime,
    LocalDateTime arrivalDateTime
) {}

package com.amazonaws.samples.springcloudblogpost.model;

public enum FlightStatus {
    NEW,
    MODIFIED,
    CANCELLED
}

But how do you parse that DynamoDB message into a FlightEvent type? You could do that in the consumer function, but Spring Cloud Stream offers a way to define your own type in each channel.

  1. Start by creating a new configuration package under the root package and add a new DynamoDBMessageConverter.java class to it:
package com.amazonaws.samples.springcloudblogpost.configuration;

import com.amazonaws.samples.springcloudblogpost.FlightAppApplication;
import com.amazonaws.samples.springcloudblogpost.model.FlightEvent;
import com.amazonaws.samples.springcloudblogpost.model.FlightStatus;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeType;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
public class DynamoDBMessageConverter extends AbstractMessageConverter {

    static final Logger logger = LoggerFactory.getLogger(
            FlightAppApplication.class);
    private final ObjectMapper objectMapper;

    public DynamoDBMessageConverter(ObjectMapper objectMapper) {
        super(new MimeType("application", "ddb"));
        this.objectMapper = objectMapper;
    }

    @Override
    protected boolean supports(Class<?> cls) {
        return cls.equals(FlightEvent.class);
    }

    @Override
    protected Object convertFromInternal(Message<?> message,
                                         @NotNull Class<?> targetClass, @Nullable Object conversionHint) {
        try {
            String flightEventStatus = objectMapper.readTree(
                    (byte[]) message.getPayload()).get("eventName").toString();
            FlightStatus flightStatus = FlightStatus.NEW;
            switch (flightEventStatus) {
                case "MODIFY": flightStatus = FlightStatus.MODIFIED;
                case "REMOVE": flightStatus = FlightStatus.CANCELLED;
            }
            TreeNode flightJson = objectMapper.readTree(
                    (byte[]) message.getPayload()).get("dynamodb").get("NewImage");
            String departureAirport = flightJson.get("departureAirport").get("S")
                    .toString();
            String arrivalAirport = flightJson.get("arrivalAirport").get("S")
                    .toString();
            LocalDateTime departureDateTime = LocalDateTime.parse(
                    flightJson.get("departureDateTime").get("S").toString().
                            replaceAll("\"", ""), DateTimeFormatter.ISO_DATE_TIME);
            LocalDateTime arrivalDateTime = LocalDateTime.parse(
                    flightJson.get("arrivalDateTime").get("S").toString()
                            .replaceAll("\"", ""), DateTimeFormatter.ISO_DATE_TIME);
            return new FlightEvent(flightStatus, departureAirport,
                    arrivalAirport, departureDateTime, arrivalDateTime);
        } catch (IOException e) {
            logger.error("Error converting DynamoDB stream message", e);
            return null;
        }
    }
}

You tell Spring Cloud this can be used as a custom parser by:

  • Inheriting from AbstractMessageConverter
  • Defining a new MimeType (in our case, application/ddb)
  • Overriding the supports method with the class type that you want to be parsing the message to
  • Overriding the convertFromInternal method with your parsing logic
  1. Create a CustomConfiguration.java class in the configuration package that will tell Spring Boot to create a new instance of your converter at start up (using the @Configuration annotation):
package com.amazonaws.samples.springcloudblogpost.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;

@Configuration
@AllArgsConstructor
public class CustomConfiguration {

    private final ObjectMapper objectMapper;

    @Bean
    public MessageConverter customMessageConverter() {
        return new DynamoDBMessageConverter(this.objectMapper);
    }

}

You’re almost there! Now you can tell Spring Cloud to use that custom message converter when processing messages coming from your Kinesis data stream.

  1. First, create a new Consumer function in the ChangeCaptureService class:
@Bean
public Consumer<FlightEvent> printFlightEvent() {
    return flightEvent -> logger.info(flightEvent.toString());
}
  1. Add the relevant lines in your application.properties file:
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
spring.cloud.function.definition=printDynamoDBmessage;printFlightEvent
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.group=1
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.destination=${kinesisstreamname}
spring.cloud.stream.bindings.printDynamoDBmessage-in-0.content-type=application/json
spring.cloud.stream.bindings.printFlightEvent-in-0.group=2
spring.cloud.stream.bindings.printFlightEvent-in-0.destination=${kinesisstreamname}
spring.cloud.stream.bindings.printFlightEvent-in-0.content-type=application/ddb
  1. Deploy this new version:
cdk deploy AppStack
  1. Let’s book another flight. If things work as intended, you should see two log lines: one with the raw DynamoDB message and one with a String representation of the FlightEvent record:
curl -d '{"departureAirport":"Tokyo","arrivalAirport":"London","departureDateTime":"2024-04-01T10:15:30","arrivalDateTime":"2024-04-01T10:15:31"}' -H 'Content-Type: application/json' http://<ALB_URL>/api/flights

You should get the following response:

{"id":"<UUID>","departureAirport":"Tokyo",
  "arrivalAirport":"London","departureDateTime":"2024-04-01T10:15:30",
  "arrivalDateTime":"2024-04-01T10:15:31"}

Open the Amazon ECS logs, and you should find the following lines:

{
  "awsRegion": "<AWS_REGION>",
  "eventID": "<UUID>",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "flight",
  "dynamodb": {
    "ApproximateCreationDateTime": <TIMESTAMP>,
    "Keys": {
      "id": {
        "S": "<UUID>"
      }
    },
    "NewImage": {
      "arrivalAirport": {
        "S": "London"
      },
      "arrivalDateTime": {
        "S": "2024-04-01T10:15:31"
      },
      "id": {
        "S": "<UUID>"
      },
      "departureAirport": {
        "S": "Tokyo"
      },
      "departureDateTime": {
        "S": "2024-04-01T10:15:30"
      }
    },
    "SizeBytes": 187
  },
  "eventSource": "aws:dynamodb"
}
FlightEvent[flightStatus=NEW, departureAirport="Tokyo",
arrivalAirport="London", departureDateTime=2024-04-01T10:15:30,
arrivalDateTime=2024-04-01T10:15:31]

And you’re done! To sum up, you have accomplished the following throughout this post:

  1. Built a simple Spring Boot backend API serving two endpoints
  2. Set up DynamoDB as your data store
  3. Captured any changes made to the table in a customizable way
  4. And all the while writing very minimal boilerplate code

Clean up

To clean up the resources that were created as part of this post, run the following commands (in the infra folder):

cdk destroy --all

If you followed the instructions and booked flights, data was inserted into the DynamoDB tables, so you have to manually delete them:

  1. Open the DynamoDB console in the Region in which you deployed your infrastructure.
  2. Choose Tables in the navigation pane.
  3. Select all relevant tables.
  4. Choose Delete and follow the instructions.

Similarly, you have to manually delete the Kinesis data stream:

  1. Open the Kinesis Data Streams console in the Region in which you deployed your infrastructure.
  2. Choose Data Streams in the navigation pane.
  3. Select your Kinesis data stream, and on the Action menu, choose Delete.

Conclusion

In this post, we provided a demonstration of what you can accomplish with DynamoDB, Kinesis Data Stream, Spring Boot and Spring Cloud. To learn more about DynamoDB, please see the DynamoDB Developer Guide and to go further with Spring, we encourage you to explore the official Spring documentation, specifically Spring Cloud AWS and Spring Cloud Stream. Try out this implementation and let us know your thoughts in the comments.


About the Author

Camille Birbes is a Senior Solutions Architect with AWS and is based in Hong Kong. He works with major financial institutions to design and build secure, scalable, and highly available solutions in the cloud. Outside of work, Camille enjoys any form of gaming, from board games to the latest video game. You can find Camille on LinkedIn.