O blog da AWS

CQRS na AWS: Sincronizando os Serviços de Command e Query com o Padrão Transactional Outbox e a Técnica Polling Publisher

Esta é a segunda parte da série sobre diferentes formas de se implementar o padrão CQRS na AWS. Na primeira parte, discuti sobre o que é o CQRS e seus benefícios. Para começar, abordei um caso de uso em que temos um Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e um Amazon Elasticache for Redis como banco de dados do serviço de consultas. Quando o estado do serviço de comandos muda, uma mensagem é colocada em uma fila do Amazon SQS, que é pesquisada por um componente computacional (uma função do AWS Lambda, no exemplo apresentado) que atualiza o Redis, com informações pré-computadas prontas para serem recuperadas.

Neste post, usarei um padrão chamado Transactional Outbox (em português, algo como “caixa de saída transacional”). Com ele, persistimos um registro que representa um evento junto com o aggregate principal, tudo na mesma transação. Para publicar eventos de forma confiável, existem duas abordagens: Polling Publisher (em português, algo como “publicador que efetua consultas”) e Transaction Log Tailing (em português, algo como ler a calda/fim do log de transações). Neste post, abordarei a primeira técnica.

Introdução

A maioria das aplicações que criamos precisa de um banco de dados para armazenar dados. Armazenamos o estado de nossos objetos de domínio e os usamos para várias finalidades, como processar e gerar relatórios. Às vezes, nosso banco de dados pode não ser ideal para recuperação de dados, seja por sua natureza ou devido a um modelo de domínio complexo, por exemplo. Para esses casos, podemos usar o padrão arquitetural CQRS, que sugere que, para um determinado bounded context (em português, algo como “contexto delimitado”) do nosso domínio, podemos ter dois serviços, sendo um para receber comandos (ou seja, operações que mudam de estado) e outro para consultas (que só recuperam dados). Dessa forma, cada serviço pode ter o banco de dados que melhor se encaixe. O desafio é como manter os dois bancos de dados sincronizados, o que pode ser feito com eventos publicados a partir do serviço de comandos, a serem consumidos pelo serviço de consultas.

A publicação confiável de eventos relacionados a coisas que já aconteceram em uma aplicação pode ser um desafio. Se estamos usando ou não o padrão CQRS, se não tomarmos cuidado, podemos acabar publicando informações que ainda não foram persistidas ou não as publicar em nenhum momento, conforme discutimos na primeira parte desta série, e as fontes de dados podem ficar fora de sincronia. De fato, por estarmos publicando eventos a partir do serviço de comandos para serem consumidos pelo serviço de consultas, já há uma consistência eventual, mas se não tomarmos cuidado, as fontes de dados podem ficar dessincronizadas por mais tempo do que se espera. No contexto de um banco de dados relacional, o padrão Transactional Outbox nos permite publicar eventos de forma confiável. Quando aplicado, um evento é persistido na tabela outbox na mesma transação em que os dados do aggregate principal são persistidos, e publicado posteriormente em algum momento.

O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Figura 1. O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Existem duas técnicas para se lidar com tabelas outbox. Uma delas é a Polling Publisher, que explorarei nesta parte desta série. Depois de persistir o evento na tabela outbox, em intervalos regulares, um componente que possa executar computação (como uma função Lambda) extrairá os eventos que ainda não foram publicados e os publicará. Depois disso, esses mesmos eventos precisarão ser excluídos antes da próxima vez que os dados forem lidos, ou será necessário de alguma forma controlar quais eventos já foram publicados, como com uma coluna que indica se o evento já foi ou não publicado, por exemplo.

É importante observar que lidar com uma tabela outbox significa capturar alterações de dados, ou CDC, que é a ideia de recuperar as alterações que aconteceram em dados de alguma forma. Há várias maneiras de se fazer isso. Uma delas é consultar uma tabela de tempos em tempos para obter alterações, e é nessa ideia que se baseia a técnica Polling Publisher.

Caso de uso: Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e Amazon Elasticache for Redis como banco de dados do serviço de consultas, usando o padrão Transactional Outbox e a técnica Polling Publisher

Assim como no primeiro caso que explorei nesta série de posts, temos um Aurora como banco de dados do serviço de comandos e um Redis como banco de dados do serviço de consultas. O Aurora conterá informações relacionadas a clientes, produtos e pedidos, e o Redis conterá informações pré-computadas relacionadas a clientes. Para testar a arquitetura, usaremos dois endpoints, sendo um para salvar pedidos e outro para recuperar informações relacionadas ao cliente.

Visão Geral da Solução

Na AWS, os serviços de comandos e consultas podem ser implementados com qualquer serviço que possa executar computação. Neste exemplo, estamos usando o Amazon API Gateway para receber requisições, que são encaminhadas para funções Lambda, mas poderiam ser, por exemplo, containers sendo executados no Amazon Elastic Kubernetes Service ou no Amazon Elastic Container Service. O serviço que recebe comandos persistirá informações relacionadas aos pedidos nas tabelas referentes ao aggregate Pedido, mas desta vez não publicaremos uma massagem em uma fila do Amazon Simple Queue Service para representar o evento referente a criação do pedido; em vez disso, persistiremos informações relacionadas ao pedido e ao evento que acabou de ocorrer, tudo na mesma transação. E somente isso. Nesse momento, ainda não haverá a publicação do evento.

A quantidade de informações persistidas dependerá se o componente que lidará com o evento possui acesso às tabelas do banco de dados do serviço de comandos ou não. Se o componente tiver acesso e puder realizar consultas nelas, o evento poderá conter menos dados, como somente os ids dos registros. Com esses ids, consultas poderão ser realizadas posteriormente e mais informações poderão ser recuperadas. Caso contrário, o evento deverá conter todas as informações relevantes para que ele possa ser processado. O evento também poderá conter mais dados caso queiramos aliviar o banco de dados com menos consultas.

No caso do bancos de dados Aurora, o evento poderá ser persistido como um documento JSON, que é um dos tipos de dados suportados. Isso pode ser interessante, pois poderemos utilizar a tabela outbox para persistir eventos de diferentes aggregates e utilizar um metadado no documento que indique o tipo do evento, que será utilizado para tratá-lo e publicá-lo apropriadamente. Se outro banco de dados que não suporta esse formato estiver sendo usado, a tabela outbox poderá ter colunas que tenham tipos de dados mais “tradicionais”, como varchar, number, timestamp e assim por diante, e será necessário ter n tabelas, uma para cada tipo de evento que poderá acontecer na aplicação. O documento JSON a seguir é um exemplo de um evento a ser persistido em uma tabela outbox que possui somente dois metadados e o id do pedido, cujos detalhes serão recuperados no momento da publicação:


{
    "tipoAggregate": "Pedido",
	"tipoEvento": "PedidoCriado",
    "idPedido": 1
}

Um evento como o exemplificado acima gera menos tráfego de rede no momento da publicação, mas precisa que os detalhes sejam recuperados para o processamento. Já o evento exemplificado abaixo gera mais tráfego de rede na publicação por ser maior, mas os detalhes já estão todos contidos no evento, o que facilita o processamento.


{
	"tipoAggregate": "Pedido",
	"tipoEvento": "PedidoCriado",
	"idPedido": 1,
	"timestamp": 1700836837,
	"nome": "Bob",
	"email": "bob@anemailprovider.com",
	"produtos": [{
		"nome": "Computador",
		"preco": 1500,
		"quantidade": 1
	}, {
        "nome": "Phone",
		"preco": 500,
		"quantidade": 3
    }]	
}

Os eventos serão recuperados em intervalos predefinidos e cada evento será preparado como um documento JSON para que possa ser publicado em um broker de mensagens. Na solução proposta neste blog post, a função Lambda OrderEventTablePollerLambda é invocada pelo Amazon EventBridge Scheduler, que permite executar tarefas agendadas de tempos em tempos. No EventBridge, também é possível definir regras agendadas. Em ambas as opções, tarefas serão executadas em tempos predeterminados, mas o scheduler oferece escalabilidade aprimorada em relação às regras agendadas do EventBridge, com um conjunto mais amplo de operações de API de destino e serviços da AWS.

Após recuperar os eventos, a função Lambda OrderEventTablePollerLambda os publica em um tópico do Amazon SNS, que por sua vez os publica em uma fila SQS, que é consultada pela função Lambda RedisUpdaterLambda, que atualiza ou insere chaves no Redis com os dados de cada cliente. Após as inserções ou atualizações, essa função publica mensagens na fila SQS OrderEventsCleanerQueue, para que cada registro que já foi processado pela função RedisUpdaterLambda possa ser excluído da tabela outbox. Então, a função OrderEventsCleanerLambda recupera em batches de 10 as mensagens da fila SQS e, dentro de uma transação, exclui todos os registros dos eventos que foram publicados (com um delete * from public.order_event where id in (<lista de ids de eventos publicados>)), para que eles não sejam publicados novamente na próxima vez que a tabela outbox for consultada.

Há pelo menos três formas de se lidar com uma tabela outbox. A primeira é simplesmente não atualizar ou excluir os registros da tabela e publicar todos os eventos a cada nova publicação, e controlar a idempotência do lado do serviço que os consome. Por um lado, teremos menos operações após a publicação dos eventos no banco. Por outro, teremos processamento, armazenamento e custos desnecessários, tanto para publicar os eventos quanto para controlá-los no serviço que os consome. Assim, essa é a forma menos aconselhável de se lidar com uma tabela outbox.

A segunda é remover os registros publicados a cada publicação. Após o evento ser processado pelo serviço de consultas, um componente deverá remover os registros da tabela outbox. A terceira é adicionar uma coluna que indica se um evento já foi processado ou não. A cada publicação, somente os registros que ainda não foram processados serão recuperados e publicados, e após isso, será necessário atualizar o valor da coluna de controle para verdadeiro. Essa opção pode ser interessante caso se queira reexecutar os eventos para a reprodução de um bug ou para o carregamento de um modelo de domínio, por exemplo.

Caso não se esteja excluindo os registros da tabela outbox após as publicações e se esteja somente os atualizando para indicar que eles já foram processados, será necessário pensar em alguma forma de expurgo de dados para que o banco não cresça inadvertidamente, como essa abordagem que utiliza a extensão pg_partman do Postgres, em conjunto com o Amazon S3. No caso do MySQL, pode-se utilizar arquivamento por partições. No exemplo a seguir, os registros são removidos após cada publicação, e dessa forma, o banco de dados estará sempre com poucos ou nenhum registro na tabela outbox.

Imagem demonstrando a arquitetura proposta, tendo o Aurora como banco de dados do serviço de comandos, e o Redis como banco de dados do serviço de consultas. De tempos em tempos, uma Lambda é disparada, que por sua vez recupera da tabela outbox todos os eventos que não foram publicados, prepara cada um como um documento JSON e os coloca em um tópico do Amazon SNS, que os entrega em diferentes destinos, como filas do Amazon SQS.

Figura 2. Arquitetura proposta, tendo o Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos, e o Amazon Elasticache for Redis como banco de dados do serviço de consultas.

O benefício desta arquitetura é que, embora seja um pouco mais complexa do que a apresentada na primeira parte desta série, estamos usando o padrão Transactional Outbox, que é uma forma confiável de publicar eventos. Com isso, os eventos são persistidos e publicados em algum momento.

Há pontos a serem considerados nessa solução. A primeira é que, independente da quantidade de instâncias com a qual nossa aplicação esteja rodando ou do mecanismo utilizado para disparar a ação de recuperação de registros da tabela outbox, o melhor a se fazer é considerar a semântica at-least once na entrega dos eventos e, assim, considerar que poderá haver publicação duplicada de eventos e garantir que cada evento seja processado somente uma vez com algum mecanismo de idempotência. Segundo, ainda temos a publicação de eventos separada de uma transação, que é a transação que exclui os eventos na tabela outbox.

Remover os eventos que já foram publicados diminui as chances de um evento ser publicado de forma duplicada, além de manter a tabela o mais limpa possível, o que oferece menos custo de armazenamento e mais performance na recuperação dos eventos da tabela outbox. No entanto, ainda temos a publicação dos eventos separada da transação que exclui os eventos. Isso significa que os pontos de atenção que foram levantados na primeira parte desta série ainda podem acontecer.

Isso é menos problemático do que se não estivéssemos usando o padrão Transactional Outbox, pois podemos garantir que os eventos serão publicados em algum momento. Independente da quantidade de instâncias executando nossa aplicação, há um risco remoto de que um evento seja publicado mais de uma vez, em uma eventualidade de a publicação ser feita e os registros dos eventos não serem excluídos ou atualizados por alguma falha entre a publicação e a transação que atualiza ou exclui os eventos. Nesse caso, problemas poderão acontecer quando o componente que recebe o evento publicado executar operações que não sejam idempotentes, portanto, é importante controlar quais eventos já foram processados pelo serviço de consultas de alguma forma.

Uma forma de mitigar isso é introduzir um mecanismo de idempotência para lidar com mensagens duplicadas, semelhante à abordagem adotada na primeira parte desta série. Lá, temos chaves no Redis que representam mensagens que já foram processadas, com tempo de expiração igual a 5 minutos. Poderíamos apresentar a mesma ideia aqui, tendo como chaves os ids dos registros da tabela outbox e como valores os ids de execução das funções Lambda (context.aws_request_id). Antes de publicar um evento, verificaríamos se o cache tem uma entrada com o mesmo id do evento. Em caso negativo, podemos publicá-lo; caso contrário, podemos excluir ou atualizar o evento da tabela outbox pois trata-se de um evento que já foi processado.

Ao usar da técnica Polling Publisher, introduziremos mais operações em nosso banco de dados consultando-o em intervalos regulares. Uma opção é ter réplicas de leitura, que é um recurso disponível para todos os bancos de dados do Amazon RDS, apenas para que os eventos sejam consultados. Ainda assim, o componente que recupera informações de uma réplica de leitura estaria competindo pelos recursos do banco de dados com outros componentes.

Outro ponto é que, com a técnica Polling Publisher, estamos limitados a recuperar os dados que estão na tabela. Não é o caso que exploramos neste exemplo, mas não conseguiríamos capturar metadados sobre a transação em si. Além disso, para capturar exclusões precisaríamos, de alguma forma, manter os estados entre as pesquisas e compará-los para ver o que foi excluído. Se quiséssemos reutilizar os eventos da tabela outbox (o que seria pouco prático), também precisaríamos ter uma coluna do tipo timestamp, que seria atualizada por alguma lógica de domínio e indicaria quando um determinado registro foi alterado pela última vez e, dependendo desse valor, prosseguiríamos com o tratamento e a publicação do evento.

Para a publicação dos eventos, poderíamos recuperar todos os registros da tabela outbox (com select *) e publicá-los de uma só vez em um tópico SNS em um único documento JSON. Na sequência, poderíamos publicá-los todos juntos como uma única mensagem na fila SQS, de onde a Lambda que atualiza o Redis poderia recuperar essa mensagem e iterar os registros e inserir ou atualizar uma a uma as chaves que correspondem a cada cliente no Redis. Nessa abordagem, seria difícil controlar de forma individual as chaves que foram inseridas ou atualizadas. Além disso, no momento em que este blog post é escrito, o tamanho máximo do payload de mensagens no SNS é de 256 KiB, e no SQS é de 256 KiB. Então, teríamos um problema se tentássemos publicar um documento JSON representando todos os eventos a serem publicados se o tamanho desse documento JSON fosse maior do que 256 KiB. Por isso, a melhor abordagem é recuperar cada evento da tabela outbox e publicá-lo em um tópico SNS ou fila SQS individualmente.

Uma consideração final diz respeito ao intervalo de pesquisa no banco de dados. Se estivermos utilizando um banco de dados Aurora, uma possibilidade seria ter réplicas de leitura e criar um endpoint customizado, no qual poderíamos concentrar réplicas de leitura somente para servir consultas à tabela outbox. Se recuperarmos os registros a serem publicados, digamos, a cada 1 segundo, aumentaremos o uso de memória e CPU das nossas réplicas de leitura, o que implica em uma solução mais cara. Se fizermos isso, digamos, a cada minuto, muita coisa pode ter acontecido com o banco de dados do serviço de comandos, e o banco de dados do serviço de consultas pode estar muito defasado. De qualquer forma, podemos correr o risco de pesquisar a tabela outbox e não receber nada. Nesse caso, estaríamos literalmente pagando por nada. O intervalo das consultas deve ser definido cuidadosamente, caso a caso.

Executando o Exemplo

Para executar o exemplo, os leitores deverão ter uma conta na AWS e um usuário com permissões de admin. Depois, basta executar o passo-a-passo fornecido no repositório de códigos desta série de blog posts sobre CQRS, no AWS Samples, hospedado no Github. Ao executar o passo-a-passo, os leitores terão a infraestrutura aqui apresentada nas suas próprias contas.

O exemplo contém dois endpoints, sendo um para receber informações relacionadas a pedidos (representando o nosso serviço de comando) e outro para recuperar informações relacionadas a clientes (representando nosso serviço de consultas). Para verificar se tudo funcionou corretamente, vá até o API Gateway e, na lista de APIs, entre na API “OrdersAPI”, e depois em Stages. Haverá somente uma stage chamada “prod”. Recupere o valor do campo Invoke URL e acrescente “/orders”. Esse é o endpoint que recebe informações relacionadas a pedidos.

Vamos realizar uma requisição POST a esse endpoint. Podemos usar qualquer ferramenta para efetuar as requisições, como cURL ou Postman. Como esse endpoint está protegido, também precisamos adicionar basic authentication. Se você estiver usando o Postman, será necessário recuperar nome de usuário e senha gerados na construção da infraestrutura. No API Gateway, vá até “API Keys” e copie o valor da coluna “API key” de “admin_key”. Esse valor contém nome de usuário e senha separados pelo caracter “:”, porém está codificado em Base64. Decodifique o valor, utilizando alguma ferramenta online, ou o próprio comando “base64” do Linux. O nome de usuário está à esquerda do caracter “:”, e a senha está à direita. Adicione uma “Authorization” do tipo “Basic Auth” e preencha os campos “Username” e “Password” com os valores recuperados. Adicione também um header “Content-Type”, com o valor “application/json”.

Se você estiver usando, por exemplo, cURL, não será necessário decodificar o valor da API key. Basta adicionar um header “Authorization” com o valor “Basic <valor da api key copiado da coluna API key>”. Adicione também um header “Content-Type”, com o valor “application/json”.

O payload para efetuar requisições para esse endpoint é o seguinte:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Isso representa um pedido que o cliente com id 1 fez, contendo produtos com ids 1 e 2. O total desse pedido é de $3000. Todas essas informações serão armazenadas no Aurora. Ao efetuar essa requisição POST, se tudo funcionou conforme o esperado, você deverá ver o seguinte resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Agora, vamos verificar se as informações relacionadas ao cliente foram enviadas ao Redis. Ao endpoint do API Gateway, que foi recuperado anteriormente, acrescente “/clients/1”. Esse é o endpoint que recupera informações relacionadas ao cliente. Vamos efetuar uma solicitação GET para esse endpoint. Assim como fizemos com o endpoint “/orders”, precisamos adicionar basic authentication. Siga as etapas explicadas anteriormente e efetue a requisição GET. No caso da infraestrutura que disponibilizei no Git, a recuperação e publicação de eventos é feita a cada minuto, então será necessário esperar até o próximo minuto para ver as alterações no Redis. Se tudo funcionou conforme o esperado, você verá uma saída semelhante à seguinte:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Isso significa que conseguimos alimentar com sucesso o Redis com informações prontas para serem lidas, enquanto as mesmas informações estão no Aurora, em outro formato.

Limpando os Recursos

Para excluir a infraestrutura criada, em um terminal, no mesmo diretório em que a infraestrutura foi criada, basta executar o comando “cdk destroy” e confirmar. A deleção leva aproximadamente 10 minutos.

Conclusão

Neste blog post, apresentei a técnica Polling Publisher, que é uma forma de implementar o padrão Transactional Outbox. Nessa abordagem, toda vez que alterarmos o estado de um aggregate no serviço de comandos, persistimos em uma tabela outbox um registro que representa o evento que acabou de acontecer, junto com o próprio aggregate. Em intervalos regulares, recuperamos os eventos da tabela outbox, os publicamos e os excluímos da tabela outbox ou os atualizamos para indicar que já foram processados.

Com o padrão Transactional Outbox, em algum momento todos os eventos serão publicados, então não há risco de não publicá-los. É uma abordagem relativamente simples, mas quando usamos a técnica Polling Publisher, deve-se tomar cuidado para que os eventos sejam publicados e tratados adequadamente. Deve-se sempre considerar que pode ser que os eventos sejam publicados de forma duplicada, e dessa forma, fazer uso de algum mecanismo de idempotência que permita que cada evento seja processado somente uma vez.

No próximo post desta série, explorarei outra técnica para publicar eventos utilizando o padrão Transactional Outbox, que é o Transaction Log Tailing. Assim como qualquer outra solução, ela tem seus prós e contras, que exploraremos a seguir!

Sobre o Autor

Roberto Perillo é arquiteto de soluções enterprise da AWS Brasil especialista em serverless, atendendo a clientes da indústria financeira, e atua na indústria de software desde 2001. Atuou por quase 20 anos como arquiteto de software e desenvolvedor Java antes de ingressar na AWS, em 2022. Possui graduação em Ciência da Computação, especialização em Engenharia de Software e mestrado também em Ciência da Computação. Um eterno aprendiz. Nas horas vagas, gosta de estudar, tocar guitarra, e também jogar boliche e futebol de botão com seu filho, Lorenzo!

Sobre os Colaboradores

Luiz Santos Luiz Santos trabalha atualmente como Technical Account Manager (TAM) na AWS e é um entusiasta de tecnologia, sempre busca novos conhecimentos e possui maior profundidade sobre desenvolvimento de Software, Data Analytics, Segurança, Serverless e DevOps. Anteriormente, teve experiência como Arquiteto de Soluções AWS e SDE.
Maria Mendes Maria Mendes é Arquiteta de Soluções desde Agosto de 2022. Anteriormente, teve experiência na área de Engenharia de Dados, trabalhando com Python, SQL e arquitetura orientada a eventos. Na AWS, Maria faz parte da comunidade técnica de foco em serviços de DevOps da AWS.

Sobre os Revisores

Ricardo Tasso Ricardo Tasso é arquiteto de soluções para parceiros na AWS, atua ajudando na jornada de parceria com a AWS e a entregar a melhor solução aos clientes. Trabalha com administração de sistemas e soluções de TI há mais de 15 anos, com experiência em redes, storage, sistemas operacionais, filas e messaging, com foco nos últimos 7 anos em DevOps, serverless, containers, infrastructure as code, CI/CD, IA para DevOps, além de arquitetura e modernização de aplicações.
Gerson Itiro Hidaka Gerson Itiro Hidaka atualmente trabalha como Enterprise Solutions Architect na AWS e atua no atendimento a clientes da área Financeira no Brasil. Entusiasta de tecnologias como Internet das Coisas (IoT), Drones, DevOps e especialista em tecnologias como virtualização, serverless, containers e Kubernetes. Trabalha com soluções de TI há mais de 26 anos, tendo experiência em inúmeros projetos de otimização de infraestrutura, redes, migração, disaster recovery e DevOps em seu portfólio.