AWS Compute Blog

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes

This post is written by Pascal Vogel, Solutions Architect, and Philipp Klose, Global Solutions Architect.

Event streaming with Apache Kafka has become an important element of modern data-oriented and event-driven architectures (EDAs), unlocking use cases such as real-time analytics of user behavior, anomaly and fraud detection, and Internet of Things event processing. Stream producers and consumers in Kafka often use schema registries to ensure that all components follow agreed-upon event structures when sending (serializing) and processing (deserializing) events to avoid application bugs and crashes.

A common schema format in Kafka is Apache Avro, which supports rich data structures in a compact binary format. To integrate Kafka with other AWS and third-party services more easily, AWS offers Amazon EventBridge Pipes, a serverless point-to-point integration service. However, many downstream services expect JSON-encoded events, requiring custom, and repetitive schema validation and conversion logic from Avro to JSON in each downstream service.

This blog post shows how to reliably consume, validate, convert, and send Avro events from Kafka to AWS and third-party services using EventBridge Pipes, allowing you to reduce custom deserialization logic in downstream services. You can also use EventBridge event buses as targets in Pipes to filter and distribute events from Pipes to multiple targets, including cross-account and cross-Region delivery.

This blog describes two scenarios:

  1. Using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Glue Schema Registry.
  2. Using Confluent Cloud and the Confluent Schema Registry.

See the associated GitHub repositories for Glue Schema Registry or Confluent Schema Registry for full source code and detailed deployment instructions.

Kafka event streaming and schema validation on AWS

To build event streaming applications with Kafka on AWS, you can use Amazon MSK, offerings such as Confluent Cloud, or self-hosted Kafka on Amazon Elastic Compute Cloud (Amazon EC2) instances.

To avoid common issues in event streaming and event-driven architectures, such as data inconsistencies and incompatibilities, it is a recommended practice to define and share event schemas between event producers and consumers. In Kafka, schema registries are used to manage, evolve, and enforce schemas for event producers and consumers. The AWS Glue Schema Registry provides a central location to discover, manage, and evolve schemas. In the case of Confluent Cloud, the Confluent Schema Registry serves the same role. Both the Glue Schema Registry and the Confluent Schema Registry support common schema formats such as Avro, Protobuf, and JSON.

To integrate Kafka with AWS services, third-party services, and your own applications, you can use EventBridge Pipes. EventBridge Pipes helps you create point-to-point integrations between event sources and targets with optional filtering, transformation, and enrichment. EventBridge Pipes reduces the amount of integration code that you have to write and maintain when building EDAs.

Many AWS and third-party services expect JSON-encoded payloads (events) as input, meaning they cannot directly consume Avro or Protobuf payloads. To replace repetitive Avro-to-JSON validation and conversion logic in each consumer, you can use the EventBridge Pipes enrichment step. This solution uses an AWS Lambda function in the enrichment step to deserialize and validate Kafka events with a schema registry, including error handling with dead-letter queues, and convert events to JSON before passing them to downstream services.

Solution overview

Architecture overview of the solution

The solution presented in this blog post consists of the following key elements:

  1. The source of the pipe is a Kafka cluster deployed using MSK or Confluent Cloud. EventBridge Pipes reads events from the Kafka stream in batches and sends them to the enrichment function (see here for an example event).
  2. The enrichment step (Lambda function) deserializes and validates the events against the configured schema registry (Glue or Confluent), converts events from Avro to JSON with integrated error handling, and returns them to the pipe.
  3. The target of this example solution is an EventBridge custom event bus that is invoked by EventBridge Pipes with JSON-encoded events returned by the enrichment Lambda function. EventBridge Pipes supports a variety of other targets, including Lambda, AWS Step Functions, Amazon API Gateway, API destinations, and more, enabling you to build EDAs without writing integration code.
  4. In this sample solution, the event bus sends all events to Amazon CloudWatch Logs via an EventBridge rule. You can extend the example to invoke additional EventBridge targets.

Optionally, you can add OpenAPI 3 or JSONSchema Draft 4 schemas for your events in the EventBridge schema registry by either manually generating it from the Avro schema or using EventBridge schema discovery. This allows you to download code bindings for the JSON-converted events for various programming languages, such as JavaScript, Python, and Java, to correctly use them in your EventBridge targets.

The remainder of this blog post describes this solution for the Glue and Confluent schema registries with code examples.

EventBridge Pipes with the Glue Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Glue Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

You need an Amazon MSK serverless cluster running and the Glue Schema registry configured. This example includes a Avro schema and a Glue Schema Registry. See the following AWS blog post for an introduction to schema validation with the Glue Schema Registry: Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry.

EventBridge Pipes configuration

Use the AWS Cloud Development Kit (AWS CDK) template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Amazon MSK Serverless Kafka topic as the source via AWS Identity and Access Management (IAM) authentication.
  2. EventBridge Pipes reads events from your Kafka topic using the Amazon MSK source type.
  3. An enrichment Lambda function in Java to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An Amazon Simple Queue Service (Amazon SQS) dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule sends all incoming events into a CloudWatch Logs log group.

For MSK-based sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

EventBridge Pipes with the Confluent Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Confluent Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

To set up this solution, you need a Kafka stream running on Confluent Cloud as well as the Confluent Schema Registry set up. See the corresponding Schema Registry tutorial for Confluent Cloud to set up a schema registry for your Confluent Kafka stream.

To connect to your Confluent Cloud Kafka cluster, you need an API key for Confluent Cloud and Confluent Schema Registry. AWS Secrets Manager is used to securely store your Confluent secrets.

EventBridge Pipes configuration

Use the AWS CDK template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Confluent Kafka topic as the source via an API secret stored in Secrets Manager.
  2. EventBridge Pipes reads events from your Confluent Kafka topic using the self-managed Apache Kafka stream source type, which includes all non-MSK Kafka clusters.
  3. An enrichment Lambda function in Python to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An SQS dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule writes all incoming events into a CloudWatch Logs log group.

For self-managed Kafka sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

Enrichment Lambda functions

Both of the solutions described previously include an enrichment Lambda function for schema validation and conversion from Avro to JSON.

The Java Lambda function integrates with the Glue Schema Registry using the AWS Glue Schema Registry Library. The Python Lambda function integrates with the Confluent Schema Registry using the confluent-kafka library and uses Powertools for AWS Lambda (Python) to implement Serverless best practices such as logging and tracing.

The enrichment Lambda functions perform the following tasks:

  1. In the events polled from the Kafka stream by the EventBridge pipe, the key and value of the event are base64 encoded. Therefore, for each event in the batch passed to the function, the key and the value are decoded.
  2. The event key is assumed to be serialized by the producer as a string type.
  3. The event value is deserialized using the Glue Schema registry Serde (Java) or the confluent-kafka AvroDeserializer (Python).
  4. The function then returns the successfully converted JSON events to the EventBridge pipe, which then invokes the target for each of them.
  5. Events for which Avro deserialization failed are sent to the SQS dead letter queue.

Conclusion

This blog post shows how to implement event consumption, Avro schema validation, and conversion to JSON using Amazon EventBridge Pipes, Glue Schema Registry, and Confluent Schema Registry.

The source code for the presented example is available in the AWS Samples GitHub repository for Glue Schema Registry and Confluent Schema Registry. For more patterns, visit the Serverless Patterns Collection.

For more serverless learning resources, visit Serverless Land.