O blog da AWS
Convertendo eventos do Apache Kafka de Avro para JSON usando EventBridge Pipes
Convertendo eventos do Apache Kafka de Avro para JSON usando EventBridge Pipes
Este post foi escrito por Pascal Vogel, arquiteto de soluções, e Philipp Klose, arquiteto de soluções globais.
O streaming de eventos com o Apache Kafka se tornou um elemento importante das arquiteturas modernas orientadas a dados e orientadas a eventos (EDAs), desbloqueando casos de uso como análise em tempo real do comportamento do usuário, detecção de anomalias e fraudes e processamento de eventos da Internet das Coisas. Produtores e consumidores de streams no Kafka costumam usar registros de esquema para garantir que todos os componentes sigam estruturas de eventos acordadas ao enviar (serializar) e processar (desserializar) eventos para evitar bugs e falhas no aplicativo.
Um formato de esquema comum no Kafka é o Apache Avro, que suporta estruturas de dados avançadas em um formato binário compacto. Para integrar o Kafka com outros serviços da AWS e de terceiros com mais facilidade, a AWS oferece o Amazon EventBridge Pipes, um serviço de integração ponto a ponto serverless. No entanto, muitos serviços downstream esperam eventos codificados em JSON, exigindo validação de esquema personalizada e repetitiva e lógica de conversão de Avro para JSON em cada serviço downstream.
Este blog mostra como consumir, validar, converter e enviar de forma confiável eventos Avro do Kafka para a AWS e serviços de terceiros usando o EventBridge Pipes, permitindo que você reduza a lógica de desserialização personalizada em serviços downstream. Você também pode usar os barramentos de eventos do EventBridge como destinos no Pipes para filtrar e distribuir eventos do Pipes para vários destinos, incluindo entrega entre contas e regiões.
Este blog descreve dois cenários:
- Usando o Amazon Managed Streaming para Apache Kafka (Amazon MSK) e o AWS Glue Schema Registry.
- Usando o Confluent Cloud e o Confluent Schema Registry.
Consulte os repositórios associados do GitHub para o Glue Schema Registry ou o Confluent Schema Registry para obter o código-fonte completo e as instruções detalhadas de implantação.
Streaming de eventos e validação de esquemas do Kafka na AWS
Para criar aplicativos de streaming de eventos com o Kafka na AWS, você pode usar o Amazon MSK, ofertas como o Confluent Cloud ou o Kafka auto-hospedado em instâncias do Amazon Elastic Compute Cloud (Amazon EC2).
Para evitar problemas comuns em streaming de eventos e arquiteturas orientadas por eventos, como inconsistências e incompatibilidades de dados, é uma prática recomendada definir e compartilhar esquemas de eventos entre produtores e consumidores de eventos. No Kafka, os registros de esquemas são usados para gerenciar, desenvolver e aplicar esquemas para produtores e consumidores de eventos. O AWS Glue Schema Registry fornece um local central para descobrir, gerenciar e desenvolver esquemas. No caso do Confluent Cloud, o Confluent Schema Registry tem a mesma função. Tanto o Glue Schema Registry quanto o Confluent Schema Registry oferecem suporte a formatos de esquema comuns, como Avro, Protobuf e JSON.
Para integrar o Kafka aos serviços da AWS, serviços de terceiros e seus próprios aplicativos, você pode usar o EventBridge Pipes. O EventBridge Pipes ajuda você a criar integrações ponto a ponto entre fontes e destinos de eventos com filtragem, transformação e enriquecimento opcionais. O EventBridge Pipes reduz a quantidade de código de integração que você precisa escrever e manter ao criar EDAs.
Muitos serviços da AWS e de terceiros esperam payloads (eventos) codificadas em JSON como entrada, o que significa que não podem consumir diretamente as payloads do Avro ou do Protobuf. Para substituir a lógica repetitiva de validação e conversão de AVRO para JSON em cada consumidor, você pode usar a etapa de enriquecimento do EventBridge Pipes. Essa solução usa uma função do AWS Lambda na etapa de enriquecimento para desserializar e validar eventos do Kafka com um registro de esquema, incluindo tratamento de erros com filas de mensagens mortas, e converter eventos em JSON antes de passá-los para serviços downstream.
Visão geral da solução
A solução apresentada neste blog consiste nos seguintes elementos-chave:
- A origem do pipe é um cluster Kafka implementado usando o MSK ou o Confluent Cloud. O EventBridge Pipes lê eventos do stream Kafka em lotes e os envia para a função de enriquecimento (veja aqui um exemplo de evento).
- A etapa de enriquecimento (função Lambda) desserializa e valida os eventos em relação ao registro do esquema configurado (Glue ou Confluent), converte eventos de Avro em JSON com tratamento de erros integrado e os retorna ao pipe.
- O alvo dessa solução de exemplo é um barramento de eventos personalizado do EventBridge que é invocado pelo EventBridge Pipes com eventos codificados em JSON retornados pela função Lambda de enriquecimento. O EventBridge Pipes oferece suporte a uma variedade de outros destinos, incluindo Lambda, AWS Step Functions, Amazon API Gateway, destinos de API e muito mais, permitindo que você crie EDAs sem escrever código de integração.
- Neste exemplo de solução, o barramento de eventos envia todos os eventos para o Amazon CloudWatch Logs por meio de uma regra do EventBridge. Você pode estender o exemplo para invocar destinos adicionais do EventBridge.
Opcionalmente, você pode adicionar esquemas OpenAPI 3 ou JSONSchema Draft 4 para seus eventos no registro do esquema do EventBridge, gerando-os manualmente a partir do esquema Avro ou usando a descoberta do esquema do EventBridge. Isso permite que você baixe vinculações de código para os eventos convertidos em JSON para várias linguagens de programação, como JavaScript, Python e Java, para usá-las corretamente em seus destinos do EventBridge.
O restante deste blog descreve essa solução para os registros do esquema Glue e Confluent com exemplos de código.
EventBridge Pipes com o Glue Schema Registry
Esta seção descreve como implementar a validação e a conversão do esquema de eventos de Avro para JSON usando o EventBridge Pipes e o Glue Schema Registry. Você pode encontrar o código-fonte e as instruções detalhadas de implantação no GitHub.
Pré-requisitos
Você precisa de um cluster serverless Amazon MSK em execução e do registro do Glue Schema configurado. Este exemplo inclui um esquema Avro e um registro do esquema Glue. Veja a seguinte postagem no blog da AWS para obter uma introdução à validação de esquemas com o Glue Schema Registry: valide, evolua e controle esquemas no Amazon MSK e no Amazon Kinesis Data Streams com o AWS Glue Schema Registry.
Configuração do EventBridge Pipes
Use o modelo (template) do AWS Cloud Development Kit (AWS CDK) fornecido no repositório do GitHub para implementar:
- Um pipe do EventBridge que se conecta ao seu tópico atual do Amazon MSK Serverless Kafka como fonte por meio da autenticação do AWS Identity and Access Management (IAM).
- O EventBridge Pipes lê eventos do seu tópico do Kafka usando o tipo de fonte Amazon MSK.
- Uma função Lambda de enriquecimento em Java para realizar a desserialização, validação e conversão de eventos de Avro para JSON.
- Uma “dead letter queue” do Amazon Simple Queue Service (Amazon SQS) para armazenar eventos nos quais a desserialização falhou.
- Um barramento de eventos personalizado do EventBridge como destino. Uma regra do EventBridge envia todos os eventos recebidos para um grupo de logs do CloudWatch Logs.
Para fontes baseadas em MSK, o EventBridge suporta parâmetros de configuração, como janela do lote, tamanho do lote e posição inicial, que você pode definir usando os parâmetros da classe CfnPipe na pilha CDK de exemplo.
O exemplo do pipe EventBridge consome eventos do Kafka em lotes de 10 porque tem como alvo um barramento de eventos do EventBridge, que tem um tamanho máximo de lote de 10. Consulte lotes e concorrência no Guia do usuário do EventBridge Pipes para escolher uma configuração ideal para outros destinos.
EventBridge Pipes com o Confluent Schema Registry
Esta seção descreve como implementar a validação e a conversão do esquema de eventos de Avro para JSON usando o EventBridge Pipes e o Confluent Schema Registry. Você pode encontrar o código-fonte e as instruções detalhadas de implantação no GitHub.
Pré-requisitos
Para configurar essa solução, você precisa de um stream do Kafka em execução no Confluent Cloud, bem como do Confluent Schema Registry configurado. Consulte o tutorial correspondente do Schema Registry para o Confluent Cloud para configurar um registro de esquema para seu stream do Confluent Kafka.
Para se conectar ao seu cluster do Confluent Cloud Kafka, você precisa de uma chave de API para o Confluent Cloud e o Confluent Schema Registry. O AWS Secrets Manager é usado para armazenar com segurança seus segredos do Confluent.
Configuração do EventBridge Pipes
Use o modelo de CDK da AWS fornecido no repositório do GitHub para implementar:
- Um pipe do EventBridge que se conecta ao seu tópico existente do Confluent Kafka como fonte por meio de um segredo de API armazenado no Secrets Manager.
- O EventBridge Pipes lê eventos do seu tópico do Confluent Kafka usando o tipo de fonte de stream autogerenciado do Apache Kafka, que inclui todos os clusters do Kafka que não são do MSK.
- Uma função Lambda de enriquecimento em Python para realizar a desserialização, validação e conversão de eventos de Avro para JSON.
- Uma “dead letter queue” do SQS para armazenar eventos nos quais a desserialização falhou.
- Um barramento de eventos personalizado do EventBridge como destino. Uma regra do EventBridge grava todos os eventos recebidos em um grupo de logs do CloudWatch Logs.
Para fontes auto-gerenciadas do Kafka, o EventBridge suporta parâmetros de configuração, como janela do lote, tamanho do lote e posição inicial, que você pode definir usando os parâmetros da classe CfnPipe
na pilha CDK de exemplo.
O exemplo do pipe EventBridge consome eventos do Kafka em lotes de 10 porque tem como alvo um barramento de eventos do EventBridge, que tem um tamanho máximo de lote de 10. Consulte lotes e concorrência no Guia do usuário do EventBridge Pipes para escolher uma configuração ideal para outros destinos.
Funções Lambda de enriquecimento
Ambas as soluções descritas anteriormente incluem uma função Lambda de enriquecimento para validação de esquema e conversão de Avro para JSON.
A função Java Lambda se integra ao Glue Schema Registry usando a AWS Glue Schema Registry Library. A função Python Lambda se integra ao Confluent Schema Registry usando a biblioteca confluent-kafka
e usa Powertools for AWS Lambda (Python) para implementar as melhores práticas serverless, como registro e rastreamento.
As funções de enriquecimento do Lambda realizam as seguintes tarefas:
- Nos eventos pesquisados do stream Kafka pelo pipe EventBridge, a chave e o valor do evento são codificados em
base64
. Portanto, para cada evento no lote passado para a função, a chave e o valor são decodificados. - Supõe-se que a chave do evento seja serializada pelo produtor como um tipo de string.
- O valor do evento é desserializado usando o registro do Glue Schema
Serde
(Java) ou oconfluent-kafka
AvroDeserializer
(Python). - Em seguida, a função retorna os eventos JSON convertidos com sucesso para o pipe EventBridge, que então invoca o destino para cada um deles.
- Eventos nos quais a desserialização do Avro falhou são enviados para a fila de letras mortas do SQS.
Conclusão
Esta postagem do blog mostra como implementar o consumo de eventos, a validação do esquema Avro e a conversão para JSON usando Amazon EventBridge Pipes, Glue Schema Registry e Confluent Schema Registry.
O código-fonte do exemplo apresentado está disponível no repositório GitHub AWS Samples para o Glue Schema Registry e o Confluent Schema Registry. Para ver mais padrões, visite a Coleção de padrões serverless.
Para obter mais recursos de aprendizado serverless, visite Serverless Land.
Este blog em português é uma tradução do blog original em inglês (link aqui).
Biografia do Autor
Pascal Vogel, Arquiteto de Soluções | |
Philipp Klose, Arquiteto de Soluções Globais |
Biografia do tradutor
Rodrigo Peres é Arquiteto de Soluções na AWS, com mais de 20 anos de experiência trabalhando com arquitetura de soluções, desenvolvimento de sistemas e modernização de sistemas legados. |