AWS Developer Tools Blog

Building an Apache Kafka data processing Java application using the AWS CDK

Building an Apache Kafka data processing Java application using the AWS CDK

Piotr Chotkowski, Cloud Application Development Consultant, AWS Professional Services

Using a Java application to process data queued in Apache Kafka is a common use case across many industries. Event-driven and microservices architectures, for example, often rely on Apache Kafka for data streaming and component decoupling. You can use it as a message queue or an event bus, as well as a way to improve resilience and reproducibility of events occurring inside of the application.

In this post, I walk you through the process of creating a simple end-to-end data processing application using AWS tools and services as well as other industry standard techniques. We start with a brief architecture overview and an infrastructure definition. Then you see how with just a few lines of code you can set up an Apache Kafka cluster using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and the AWS Cloud Development Kit (AWS CDK). Next, I show you how to shape your project structure and package your application for deployment. We also look at the implementation details and how we can create Kafka topics in Amazon MSK cluster as well as send and receive messages from Apache Kafka using services such as AWS Lambda and AWS Fargate.

I use the AWS CDK to automate infrastructure creation and application deployment. The AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. For more information, see the Developer Guide, AWS CDK Intro Workshop, and the AWS CDK Examples GitHub repo.

All the code presented in this post is open sourced and available on GitHub.

Overview of solution

The following diagram illustrates our overall architecture.

Architecture diagram of the solution

Triggering the TransactionHandler Lambda function publishes messages to an Apache Kafka topic. The application is packaged in a container and deployed to ECS Fargate, consumes messages from the Kafka topic, processes them, and stores the results in an Amazon DynamoDB table. The KafkaTopicHandler Lambda function is called once during deployment to create Kafka topic. Both the Lambda function and the consumer application publish logs to Amazon CloudWatch.

To follow along with this post, you need the following prerequisites:

Project structure and infrastructure definition

The project consists of three main parts: the infrastructure (including Kafka cluster and Amazon DynamoDB), a Spring Boot Java consumer application, and Lambda producer code.

Let’s start with exploring the infrastructure and deployment definition. It’s implemented using a set of AWS CDK stacks and constructs. I’ve chosen Typescript as my language here mainly because of personal preference. However if you prefer you can use CDK with other languages. At the time of writing, AWS CDK supports Python, TypeScript, Java, .NET and Go. For more information, see Working with the AWS CDK.

Let’s look at the project directory structure. All AWS CDK stacks are located in the amazon-msk-java-app-cdk/lib directory. In amazon-msk-java-app-cdk/bin, you can find the main AWS CDK app where all of the stacks are instantiated. amazon-msk-java-app-cdk/lambda contains code for TransactionHandler, which publishes messages to a Kafka topic, as well as code for KafkaTopicHandler, which is responsible for creating Kafka topic. The business logic for the Kafka consumer, which is a Java Maven project, is in the consumer directory. The Dockerfile necessary for Fargate container creation is located in consumer/docker/Dockerfile. Finally, doc contains architecture diagrams and scripts contains the deployment script.

Setting up your Kafka cluster

The central part of the architecture is the Kafka cluster created using Amazon MSK, which is relatively easy to define and deploy with the AWS CDK. In the following code, I use the CfnCluster construct to set up my cluster:

new msk.CfnCluster(this, "kafkaCluster", {
    brokerNodeGroupInfo: {
        securityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],
        clientSubnets: [...vpcStack.vpc.selectSubnets({
            subnetType: ec2.SubnetType.PRIVATE
        }).subnetIds],
        instanceType: "kafka.t3.small",
        storageInfo: {
            ebsStorageInfo: {
                volumeSize: 5
            }
        }
    },
    clusterName: "TransactionsKafkaCluster",
    kafkaVersion: "2.7.0",
    numberOfBrokerNodes: 2
});

vpcStack in the preceding code refers to the AWS CDK stack containing the VPC definition. Because we’re using this cluster for demonstration purposes only, I limit storage to 5 GB, the instance type to kafka.t3.small, and the number of broker nodes to two, which is the minimum allowed number. We don’t want to connect to this cluster from outside the VPC, so I place the cluster in a private subnet of my VPC. For more information about the allowed settings, see interface CfnClusterProps. To learn more about Amazon MSK, check out the Amazon MSK Labs workshop.

Topic creation

At the time of writing Amazon MSK doesn’t allow you to create a Kafka topic inside the cluster using the AWS service API. You can only do this by connecting directly to the Kafka cluster either using Kafka tools or using a library from within the code of your application. In this project I’m using the AWS CDK’s custom resource provider. It allows you to use a custom Lambda function to handle AWS CloudFormation’s lifecycle events. The definitions of CustomResource, Provider and Lambda function resources you can find in the kafka-topic-stack.ts file and implementation of the handler Lambda function in the kafka-topic-handler.ts file. Let’s look at the code of function:

export const handler = async (event: any, context: any = {}): Promise<any> => {
    try {
        if (event.RequestType === 'Create' || event.RequestType === 'Update') {
            let result = await createTopic(event.ResourceProperties.topicConfig);
            response.send(event, context, response.SUCCESS, {alreadyExists: !result});
        } else if (event.RequestType === 'Delete') {
            await deleteTopic(event.ResourceProperties.topicConfig.topic);
            response.send(event, context, response.SUCCESS, {deleted: true});
        }
    } catch (e) {
        response.send(event, context, response.FAILED, {reason: e});
    }
}

Handler is called once when the KafkaTopicStack is deployed and once when it’s destroyed. I use the admin client from the KafkaJS open-source library to create Kafka topic on ‘Create’ AWS CloudFormation event and to destroy it on ‘Delete’ event. Calling KafkaJS’s createTopics method will resolve to true if the topic was created successfully or false if it already exists.

Consumer implementation details

The main purpose of the Kafka consumer part of this project is to process and validate incoming transaction messages and store results in the DynamoDB table. The consumer application is written in Java with the use of the Spring Boot framework. The core part of functionality is implemented in the KafkaConsumer class. I use the KafkaListener annotation to define the entry point for incoming messages. Spring takes care of most of the boilerplate code for us, in particular, we don’t need to write the logic to manually pull messages from the Kafka topic or worry about deserialization. All you need to do is provide the necessary elements in the configuration class. In the following code, the Spring Boot configuration is located in the ApplicationConfiguration class:

@Bean
public ConsumerFactory<String, byte[]> consumerFactory(KafkaConsumerProperties properties) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress());
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation());
    LOGGER.info(configs.toString());

    return new DefaultKafkaConsumerFactory<>(configs);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConsumerFactory<String, byte[]> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setMessageConverter(new ByteArrayJsonMessageConverter());
    return factory;
}

The preceding code sets up the Kafka consumer configuration. We get the bootstrap servers address string and Kafka consumer group ID from the environment variables that are set up during application deployment. By default, Amazon MSK uses TLS 1.2 for secure communication, so we need to set up SSL configuration in our application as well. For more information about encryption, see Amazon MSK Encryption.

For the deserialization of incoming Kafka messages, I use classes provided by the Apache Kafka library. To enable Spring to deserialize Kafka JSON messages into POJOs, I use the ByteArrayDeserializer class combined with ByteArrayJsonMessageConverter. That way, Spring simply passes bytes as is from the deserializer to the message converter, and the converter transforms bytes into Java objects using Jackson’s ObjectMapper underneath. I use this approach because it allows me to send plaintext JSON messages. We don’t need anything more sophisticated for the purpose of this post. Depending on your needs, you can use different combinations of deserializers and message converters or dedicated deserializers, such as KafkaAvroDeserializer, which uses the schema registry to figure out the target type.

For more information about how to use Apache Kafka with Spring framework please refer to the Spring documentation.

Consumer deployment

We complete three high-level steps to deploy the consumer application into Fargate.

First, we need to build and package our application into an executable JAR. I use the Apache Maven Shade plugin with Spring Boot Maven plugin dependency. It’s configured in the consumer application pom.xml. The JAR is created during the package phase of the Maven project build and placed in the consumer/docker directory next to the Dockerfile.

Next, we define the image used to create the ECS task container. To do that, we create a Dockerfile, which is a text file containing all the instructions and configuration necessary to assemble a Docker image. I use Amazon Linux 2 as a base for the image, additionally installing Java 11 Amazon Corretto distribution, awslogs, and a CloudWatch agent. For the SSL configuration, we also need to copy the truststore file. In line 9, we copy the executable JAR built in the previous step from the local location into the image. The last line in the Dockerfile is an entry point starting the consumer application. It’s a standard Java command:

java -cp kafka-consumer-1.0-SNAPSHOT-shaded.jar amazon.aws.samples.kafka.ConsumerApplication

Finally, we reference the Dockerfile in the AWS CDK stack. We do this inside the fargate-stack.ts file. We define the infrastructure necessary to run our containerized application in the ECS task. To use the local Dockerfile image definition inside the AWS CDK stack, you need to create the asset DockerImageAsset:

const image = new assets.DockerImageAsset(this, "ConsumerImage", {
    directory: '../consumer/docker'
});

Next, we reference this image asset in the definition of the ECS task using the ContainerImage.fromDockerImageAsset method:

fargateTaskDefinition.addContainer("KafkaConsumer", {
    image: ecs.ContainerImage.fromDockerImageAsset(image),
    logging: ecs.LogDrivers.awsLogs({streamPrefix: 'KafkaConsumer'}),
    environment: {
        'TABLE_NAME': this.tableName,
        'GROUP_ID': this.groupId,
        'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
        'REGION': this.region,
        'TOPIC_NAME': topicName.valueAsString
    }
});

During the AWS CDK stack deployment, the image defined in the Dockerfile is created and uploaded to an Amazon Elastic Container Registry (Amazon ECR) repository. That image is used to create and start the ECS task container, thereby starting our consumer application. For more information about other ways of obtaining images, see the Amazon ECS Construct Library.

Producer implementation details

We now have our Kafka cluster and consumer application defined. Now we need to publish messages to Kafka. I use a Lambda function to publish messages to Kafka. All the code of the producer is located in the transaction-handler.ts file. I use the KafkaJS open-source library to communicate with the Kafka cluster and send messages.

Producer deployment

Now let’s deploy our Kafka producer code. The AWS CDK stack definition for that part is located in the lambda-stack.ts file.

let transactionHandler = new NodejsFunction(this, "TransactionHandler", {
    runtime: Runtime.NODEJS_14_X,
    entry: 'lambda/transaction-handler.ts',
    handler: 'handler',
    vpc: vpcStack.vpc,
    securityGroups: [vpcStack.lambdaSecurityGroup],
    functionName: 'TransactionHandler',
    timeout: Duration.minutes(5),
    environment: {
        'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
        'TOPIC_NAME': topicName.valueAsString
    }
});

This is a relatively short piece of code. The AWS CDK NodejsFunction construct allows us to package our business logic code and deploy it as a Node.js Lambda function to the AWS Cloud. Due to internal AWS CDK packaging and deployment logic, it makes your life easier if you place the directory containing your Lambda code in the AWS CDK root directory next to the bin and lib directories. In the properties, in the entry field, you have to point to the local file containing your code. This is the relative path from the AWS CDK root directory. You can pass environment variables inside of the environment field. For this post, I pass Kafka’s bootstrap address string and topic name that I need in order to communicate with the Kafka cluster and send messages from within the Lambda function. If esbuild is available, it’s used to bundle your code in your environment. Otherwise, bundling occurs in a Docker container. This means that if you don’t want to use esbuild, you have to start a Docker daemon before deploying your AWS CDK stack. For more information about the NodejsFunction construct, see the Amazon Lambda Node.js Library.

Execution walk through

Once we deploy the application it’s time to test it. To trigger Lambda function and send a message to the Kafka queue you can use the following AWS CLI command.

aws lambda invoke --cli-binary-format raw-in-base64-out --function-name TransactionHandler --log-type Tail --payload '{ "accountId": "account_123", "value": 456}' /dev/stdout --query 'LogResult' --output text | base64 –d

Here you are adding 456 to the balance of the account account_123. Lambda function sends JSON message to the Amazon MSK cluster. The consumer application pulls the message from the Kafka topic in the form of bytes and transforms it to an instance of POJO class. Next the consumer business logic executes and the application stores results in the Amazon DynamoDB table. You can run following command to see the content of the table.

aws dynamodb scan --table-name Accounts --query "Items[*].[id.S,Balance.N]" --output text

All the logs from execution are stored in Amazon CloudWatch. To view them you can go to AWS console or run aws logs tail command with specified CloudWatch Logs group.

You can experiment with the application by sending multiple messages with different values of accountId and value fields of JSON payload.

Conclusion

In this post, we discussed different techniques to implement and deploy your application using AWS CDK constructs, Java and Typescript application code. High-level AWS CDK constructs enable you to quickly define the cloud infrastructure of your system and let you focus more on implementing your business logic. You can use a mix of programing languages that best fit your use case and keep all your code and infrastructure definitions in one place.

To run the code presented in this post, follow the prerequisites and usage steps described in the README file of the GitHub project.

Stay tuned for more content about cloud application development. If you have any questions or suggestions, please leave a comment. I hope you have enjoyed reading this post and learned something new. If you did, please share with your colleagues. Happy coding!

More from this author