O blog da AWS

CQRS na AWS: Sincronizando os Serviços de Command e Query com o Amazon SQS

No primeiro post da série que aborda as diferentes formas de se implementar o padrão Command Query Responsibility Segregation (CQRS) na AWS, abordarei a forma mais simples do padrão, que é sincronizar os serviços de comandos e consultas por meio do Amazon Simple Queue Service (Amazon SQS). Ao se trabalhar com aplicações que lidam com modelos de dados complexos, pode ser que este modelo não esteja no formato ideal para leitura, devido a joins complexos, por exemplo, que podem resultar em consultas que demoram mais do que se pode esperar, ou que o usuário esteja disposto a esperar. Nesses casos, o padrão arquitetural CQRS pode ser usado.

Assim, o padrão é relativamente simples: de um lado, temos um serviço que recebe escritas (comandos), e de outro lado, temos um serviço que recebe leituras (consultas), cada um com um banco de dados que facilite cada operação. O desafio deste padrão é: como podemos transportar os dados, do lado da escrita para a leitura de forma confiável? Quais formas podem ser utilizadas em cada cenário? Quais os prós e contras de cada solução? São estas as perguntas que serão respondidas nesta série sobre CQRS.

Nesta série de seis blog posts, explicarei a história desse padrão, a motivação por trás dele e como ele pode ser implementado com diferentes serviços da AWS. As quatro primeiras partes demonstrarão o Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e o Amazon Elasticache for Redis como banco de dados do serviço de consultas. Nesta primeira parte, demonstrarei o padrão CQRS em sua forma mais simples. Na segunda parte, começaremos a usar o padrão Transactional Outbox (em português, algo como “caixa de saída transacional”) para publicar eventos do serviço de comandos para alimentar o serviço de consultas, usando a técnica Polling Publisher. Na terceira e quarta partes, também usaremos o padrão Transactional Outbox, mas usando a técnica Transaction Log Tailing (em português, algo como “ler a cauda/fim do log de transações).

Na quinta parte, demonstrarei como podemos implementar o CQRS com o Amazon DynamoDB sendo o banco de dados do serviço de comandos e o Aurora como banco do serviço de consultas. Para finalizar, apresentarei uma conclusão e uma comparação de todas as técnicas na sexta parte.

Introdução

Em 1988, Bertrand Meyer introduziu a ideia de Command Query Separation, ou CQS, em seu livro Object-Oriented Software Construction, a ser aplicado a softwares orientados a objetos. A ideia fundamental é que os métodos de um objeto devem ser divididos em duas categorias: métodos que apenas retornam valor e não alteram o estado e métodos que apenas alteram o estado e não retornam nenhum valor. Cada método deve pertencer a uma dessas categorias, mas não a ambas. A vantagem é que temos uma clara separação de métodos, e os métodos que realizam consultas podem ser invocados de forma idempotente. Na prática, essa regra se aplica à maioria dos casos, mas não a todos. Usando os métodos da estrutura de dados da pilha como exemplo, seu método pop() retorna o objeto no topo da pilha e também o exclui, o que o torna um método que recupera um valor e altera o estado.

Inspirado pelo CQS, Greg Young apresentou pela primeira vez a ideia do padrão Command Query Responsibility Segregation, ou CQRS, em uma palestra apresentada por ele na QCon San Francisco, em 2006 (infelizmente, esta é a única edição da QCon San Francisco que não possui registros on-line). Foi, no entanto, somente após uma segunda palestra apresentada por ele, na QCon San Francisco, em 8 de novembro de 2007, que a ideia começou a tomar tração. A ideia geral é que nossa solução possa ser estruturada de forma que haja dois serviços, sendo um responsável pelas operações de escrita e outro pelas operações de leitura.

Há casos em que o banco de dados sendo utilizado não favorece as operações de leitura, por exemplo, um banco de dados relacional que precisa realizar vários joins e outras agregações para retornar as informações necessárias, introduzindo latência indesejada no tempo de resposta da aplicação. Nesses casos, podemos criar um serviço que receba as operações de escrita, armazena dados em seu banco de dados, e publica eventos que são capturados pelo serviço responsável pelas operações de consulta/leitura, que, por sua vez, preenche seu banco de dados de forma assíncrona com as novas informações em um formato que facilite a recuperação desses dados.

Os benefícios de se ter dois serviços separados para escrita e consultas são os seguintes:

  • Escalabilidade independente. Como os serviços existem de forma independente, é possível escalá-los conforme necessário. Se a aplicação receber mais solicitações de alteração de estado, poderemos ter mais servidores ou containers lidando com elas. Se o banco de dados sendo utilizado for um banco de dados relacional, como o Aurora, precisaremos encontrar o tamanho certo da instância de escrita e, para isso, poderemos usar métricas como IOPS, CPU Utilization e Throughput, além de analisar como as escritas são feitas no banco de dados. Dependendo do padrão de escrita, por exemplo, se houver picos durante um determinado período de tempo e períodos mais longos de escrita muito baixos, pode ser o caso de se fazer uso de serverless, que está disponível para o Aurora.
  • Banco de dados para propósito específico. Nem sempre o modelo de dados ou banco de dados sendo utilizado pode nos servir conforme precisamos. Pode ser que o modelo de dados reflita um caso de uso real que, por exemplo, não favoreça leituras ou que o banco de dados usado não permita responder facilmente a perguntas específicas. Nesses casos, podemos usar o banco de dados mais apropriado para operações de escrita e consulta separadamente.
  • Tecnologias e ciclo de vida independentes. Haverá dois serviços independentes lidando com comandos e consultas, e cada serviço poderá ser construído com a tecnologia que melhor atenda cada necessidade. Por exemplo, podemos ter o serviço de comandos implementado em Java e o serviço de consultas implementado em Python. Da mesma forma, cada serviço pode ter seu próprio ambiente de execução, como o serviço de comandos sendo executado em AWS Lambda e o serviço de consulta sendo executado no Amazon Elastic Kubernetes Service (Amazon EKS). Cada serviço também pode ter seu próprio ciclo de vida e pipeline de implantação.

Ter dois bancos de dados separados e dois serviços separados atendendo a diferentes solicitações introduz mais complexidade em um sistema, o que só compensa quando precisamos de escalabilidades diferentes ou bancos de dados diferentes para atender a comandos e consultas. Se escalabilidade for a única preocupação, podemos ter apenas um banco de dados atendendo a diferentes serviços, que podem ser escalados de forma independente. Nesse caso, poderíamos aliviar o banco de dados para que este pudesse lidar com escritas e leituras separadamente, como com réplicas de leitura.

Se o banco de dados for um Amazon Relational Database Service (Amazon RDS), o serviço que atende a consultas pode ler réplicas de leitura. Se o banco de dados for um Aurora, pode-se também fazer uso de endpoints de leitura, ou até endpoints customizados, que permitem agrupar réplicas de leitura para propósitos diferentes, como ter duas réplicas de leitura para um relatório A, e outras duas réplicas de leitura para um relatório B. Se um banco de dados diferente para fins de leitura for a única preocupação (como em um caso em que se deseja manter informações pré-computadas em um cache de segundo nível, como o Redis), podemos ter apenas um serviço acessando bancos de dados diferentes a cada necessidade, introduzindo, por exemplo, uma fila que recebe um evento após uma inserção e que é lida por um componente que atualiza o banco de dados que atende a consultas.

Se precisarmos de dois serviços separados e dois bancos de dados diferentes para atender a comandos e consultas, há algumas opções que podemos usar para implementar o CQRS na AWS. Vamos dar uma olhada na forma mais simples.

Caso de uso: Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e Amazon Elasticache for Redis como banco de dados do serviço de consultas, usando o Amazon SQS para sincronização

No primeiro caso que abordarei nesta série de blog posts, temos o Aurora como banco de dados do serviço que recebe comandos e o Redis como banco de dados do serviço que recebe consultas. O banco de dados Aurora conterá informações relacionadas a clientes, produtos e pedidos, e o Redis conterá informações pré-computadas relacionadas a clientes. Para testar a arquitetura, usaremos dois endpoints, um para salvar pedidos e outro para recuperar informações relacionadas a clientes.

Visão Geral da Solução

A opção mais simples é publicar um evento em uma fila SQS depois de atualizar o banco de dados do serviço de comandos. A partir daí, os eventos serão recuperados por um componente que possa realizar computação (como uma função Lambda) e as informações pré-computadas relacionadas ao cliente serão atualizadas no Redis. Outra opção seria colocar um evento em um tópico do Amazon SNS em vez de em uma fila SQS, para que ele pudesse notificar vários destinatários sobre as novas informações inseridas no Aurora.

É importante observar que, embora esse exemplo use o Amazon API Gateway, encaminhando mensagens para uma função Lambda, os serviços de comandos e consultas podem ser implementados por qualquer componente computacional. Poderia ser, por exemplo, um container no Amazon EKS como serviço de comandos e um container no Amazon ECS como serviço de consultas. O aspecto mais importante dessa implementação é fazer com que o serviço de comandos publique eventos que alimentem o banco de dados do serviço de consultas.

Essa primeira solução também considera uma dead-letter queue. Quando uma mensagem é processada com sucesso por uma função Lambda, ela remove automaticamente a mensagem processada da fila. Se a mensagem não puder ser processada pela função Lambda devido, por exemplo, a uma exceção, ela se tornará visível novamente na fila e o número de vezes que essa mensagem foi recebida na fila aumentará em 1. Quando esse contador atinge o máximo de recebimentos, configurado quando a dead-letter queue foi configurada na fila principal, essa mensagem é movida para a dead-letter queue, para que possa ser processada por outros meios, como acionar um alarme, enviar um e-mail, inserir um registro em outro banco de dados e assim por diante. Para manter esse exemplo simples, esses outros mecanismos foram omitidos.

Outra solução possível seria fazer com que o API Gateway colocasse uma mensagem diretamente em uma fila SQS em vez de invocar uma função Lambda. Na fila, poderíamos fazer com que outro componente (como uma função Lambda) recuperasse a mensagem. Nesse caso, se uma exceção fosse lançada na função Lambda, a mensagem não seria removida da fila até que o número de recebimentos fosse alcançado. Teríamos que controlar o tratamento dessas mensagens com padrões de resiliência, como o retry, circuit breaker, exponential backoff e também uma dead-letter queue. Se o caso de uso sendo implementado exigisse um processamento síncrono, essa ideia não seria uma opção.

Imagem demonstrando a arquitetura proposta, tendo o Aurora como banco de dados do serviço de comandos, e o Redis como banco de dados do serviço de consultas. A sincronização de dados entre os dois bancos é feito por meio da publicação de um evento pelo serviço de comandos em uma fila do Amazon SQS, que por sua vez é lida por uma função Lambda que atualiza o Redis.

Figura 1. Arquitetura proposta, tendo o Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos, e o Amazon Elasticache for Redis como banco de dados do serviço de consultas.

O ponto forte dessa arquitetura é a simplicidade. Em termos simples, usamos uma fila para enviar dados do lado do serviço de comandos para o lado do serviço de consultas. Assim que recebemos um pedido na função OrderReceiverLambda e o salvamos no Aurora, publicamos um evento em uma fila SQS. Poderíamos até mesmo preparar as informações pré-computadas relacionadas ao cliente nessa função Lambda, mas ela estaria fazendo mais do que deveria, contrariando o Single Responsibility Principle (Princípio de Responsabilidade Única). Além disso, se não conseguíssemos atualizar o cache do Redis por algum motivo (pode ser que ele estivesse sendo atualizado para uma nova versão, por exemplo), perderíamos essas informações. Com uma fila, podemos processar esse evento posteriormente, mesmo que o cache do Redis esteja temporariamente indisponível.

Nosso objetivo aqui é levar as alterações do Aurora para o Redis de forma confiável. Assim como o DynamoDB, o Amazon DocumentDB ou o Amazon Neptune, o Aurora também tem uma forma de streams, que é o Database Activity Streams. O detalhe é que essa funcionalidade do Aurora está mais para uma questão de auditoria (tudo que acontece no banco de dados é registrado em formato de log), então, até seria possível utilizá-lo para a publicação de eventos, mas seria necessário adaptar o formato dos logs para um formato mais amigável. Além disso, essa funcionalidade exige a utilização do Amazon Kinesis Data Streams, o que ainda não queremos utilizar. Nesse cenário, o ponto de atenção é que a transação (por ser um banco de dados relacional que suporta transações ACID) é separada da publicação do evento.

Há pontos a serem considerados nessa solução. A primeira é que o commit da transação na OrderReceiverLambda é separado da publicação do evento. Vamos considerar um caso em que uma transação é confirmada (commited) e, em seguida, um evento é enviado para uma fila SQS. Se por algum motivo houver um problema ao colocar o evento na fila SQS, os dois bancos de dados ficarão fora de sincronia. Se a publicação do evento na fila SQS acontecer dentro da transação, o que pode acontecer é que o evento pode ser publicado na fila SQS, o banco de dados Redis será atualizado, mas a transação poderá, por algum motivo, não ser confirmada, deixando os dois bancos de dados também fora de sincronia. Nessa mesma situação, se houver um problema ao publicar o evento na fila SQS, a transação será revertida (rolledback) não por causa de um problema com os dados em si, mas porque houve um problema com um componente externo.

Essa solução considera uma fila standard SQS, o que significa que podemos receber quase que um número ilimitado de mensagens por segundo. Nesse caso, não precisamos que as mensagens sejam ordenadas, então podemos usar uma fila standard em vez de uma fila FIFO. Se usássemos uma fila FIFO, teríamos o limite de no máximo 300 mensagens por segundo. Considerando o exemplo acima, a mensagem a ser colocada na fila SQS pela OrderReceiverLambda pode se parecer com o seguinte:

select_statement = "select name, email from public.client where id = %s"
# Recupera os dados do cliente que está efetuando o pedido
cur.execute(select_statement, str(event["id_client"]))
client = cur.fetchone()

order_event = {
    "messageId": str(uuid.uuid4()),
    "id_client": event["id_client"],
    "name": client[0],
    "email": client[1],
    "order_total": order_total,
    "event_date": now
}

Enquanto as filas FIFO têm uma semântica de entrega exactly once, as filas standard têm uma semântica de entrega at-least once e, portanto, precisamos garantir que a mesma mensagem não seja processada duas vezes. Uma forma é adicionar um campo id à mensagem a ser colocada na fila (no objeto “order_event” acima, o campo “messageId”) e verificar esse id antes de processá-la. E como já estamos usando o Redis, podemos adicionar uma entrada ao nosso cache, cuja chave será o id da mensagem e o valor será o id de execução da função Lambda (context.aws_request_id), com um tempo de expiração de cinco minutos. E antes de adicionarmos ou atualizarmos o valor da chave do cliente, verificamos se o valor da chave correspondente ao id da mensagem é igual ao id de execução da função Lambda. Caso seja, processamos a mensagem; caso contrário, outro ambiente de execução já terá tratado a mensagem, então podemos simplesmente ignorá-la, pois será uma mensagem duplicada.

Há um pequeno detalhe aqui: dependendo de como fizermos esse controle, uma determinada mensagem pode ainda assim ser processada duas vezes. Imagine o seguinte cenário: ao recebermos uma mensagem da fila SQS na função Lambda, adicionamos como chave no Redis o id da mensagem, e em seguida verificamos se o valor da chave é igual ao id de execução da função Lambda naquele contexto. Após a verificação avaliar para verdadeiro, outro ambiente de execução Lambda recupera a mesma mensagem, sobrescreve a chave com o id da mensagem e verifica se o valor da chave é igual ao id de execução da função Lambda naquele outro contexto. Após a verificação avaliar para verdadeiro, a mensagem é processada de forma duplicada. Para evitar este cenário, a primeira coisa a fazer é inserir a chave correspondente ao id da mensagem no Redis utilizando o parametro “nx” (Not eXists, ou “não existe”), que garante que a chave seja inserida somente se ela ainda não existir. Dessa forma, conseguiremos processar mensagens duplicadas somente uma vez.

O código da função Lambda que atualiza o Redis com os dados do cliente que efetuou o pedido poderia se parecer com o seguinte:


from redis.commands.json.path import Path

import json
import os
import redis

redis_host = os.environ['REDIS_HOST']
redis_port = os.environ['REDIS_PORT']
r = redis.Redis(host=redis_host, port=redis_port)

def lambda_handler(event, context):
    body = json.loads(event['Records'][0]['body'])
    r.set(body['messageId'], context.aws_request_id, nx=True, ex=300)

    if r.get(body['messageId']).decode() == context.aws_request_id:
        if not r.exists(body['id_client']):
			json_value = {'name': body['name'],
                          'email': body['email'],
                          'total': body['order_total'],
                          'last_purchase': body['event_date']}
            r.json().set(body['id_client'], Path.root_path(), json_value)
        else:
            current_value = r.json().get(body['id_client'])
            json_value = {'name': body['name'],
                          'email': body['email'],
                          'total': body['order_total'] + current_value['total'],
                          'last_purchase': body['event_date']}
            r.json().set(body['id_client'], Path.root_path(), json_value)

Outra coisa a considerar é que não podemos reexecutar os eventos, o que poderia ser interessante se quiséssemos reproduzir um bug ou carregar outro banco de dados, por exemplo. Para resolver esses problemas, o padrão Transactional Outbox pode ajudar. Mas essa é a abordagem que vamos explorar nos próximos três posts desta série!

Executando o Exemplo

Para executar o exemplo, os leitores deverão ter uma conta na AWS e um usuário com permissões de admin. Depois, basta executar o passo-a-passo fornecido no repositório de códigos desta série de blog posts sobre CQRS, no AWS Samples, hospedado no Github. Ao executar o passo-a-passo, os leitores terão a infraestrutura aqui apresentada nas suas próprias contas.

O exemplo contém dois endpoints, sendo um para receber informações relacionadas a pedidos (representando o nosso serviço de comandos) e outro para recuperar informações relacionadas a clientes (representando nosso serviço de consultas). Para verificar se tudo funcionou corretamente, vá até o API Gateway e, na lista de APIs, entre na API “OrdersAPI”, e depois em Stages. Haverá somente uma stage chamada “prod”. Recupere o valor do campo Invoke URL e acrescente “/orders”. Esse é o endpoint que recebe informações relacionadas a pedidos.

Vamos realizar uma requisição POST a esse endpoint. Podemos usar qualquer ferramenta para efetuar as requisições, como cURL ou Postman. Como esse endpoint está protegido, também precisamos adicionar basic authentication. Se você estiver usando o Postman, será necessário recuperar o nome de usuário e senha gerados na construção da infraestrutura. No API Gateway, vá até “API Keys” e copie o valor da coluna “API key” de “admin_key”. Esse valor contém nome de usuário e senha separados pelo caracter “:”, porém está codificado em Base64. Decodifique o valor, utilizando alguma ferramenta online, ou o próprio comando “base64” do Linux. O nome de usuário está à esquerda do caracter “:”, e a senha está à direita. Adicione uma “Authorization” do tipo “Basic Auth” e preencha os campos “Username” e “Password” com os valores recuperados. Adicione também um header “Content-Type”, com o valor “application/json”.

Se você estiver usando, por exemplo, cURL, não será necessário decodificar o valor da API key. Basta adicionar um header “Authorization” com o valor “Basic <valor da api key copiado da coluna API key>”. Adicione também um header “Content-Type”, com o valor “application/json”.

O payload para efetuar requisições para esse endpoint é o seguinte:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Isso representa um pedido que o cliente com id 1 fez, contendo produtos com ids 1 e 2. O total desse pedido é de $3000. Todas essas informações serão armazenadas no Aurora. Ao efetuar essa requisição POST, se tudo funcionou conforme o esperado, você deverá ver o seguinte resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Agora, vamos verificar se as informações relacionadas ao cliente foram enviadas ao Redis. Ao endpoint do API Gateway, que foi recuperado anteriormente, acrescente “/clients/1”. Esse é o endpoint que recupera informações relacionadas ao cliente. Vamos efetuar uma solicitação GET para esse endpoint. Assim como fizemos com o endpoint “/orders”, precisamos adicionar basic authentication. Siga as etapas explicadas anteriormente e efetue a requisição GET. Se tudo funcionou conforme o esperado, você verá uma saída semelhante à seguinte:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Isso significa que conseguimos alimentar com sucesso o Redis com informações prontas para serem lidas, enquanto as mesmas informações estão no Aurora, em outro formato.

Limpando os Recursos

Para excluir a infraestrutura criada, em um terminal, no mesmo diretório em que a infraestrutura foi criada, basta executar o comando “cdk destroy” e confirmar. A deleção leva aproximadamente 10 minutos.

Conclusão

Neste blog post, abordei como podemos implementar o padrão CQRS com serviços AWS, em sua forma mais simples. A vantagem desse padrão é que podemos ter serviços diferentes servindo comandos e consultas, além de bancos de dados diferentes, cada um com finalidades diferentes.

Os serviços de comandos e consultas são independentes e, portanto, podem ser escalados separadamente, ter bancos de dados diferentes e também regras diferentes, como segurança. Embora eu tenha apresentado neste blog post o Amazon Aurora como o banco de dados para o serviço de comandos e o Amazon Elasticache for Redis como o banco de dados para o serviço de consultas, a ideia é que possamos ter o banco de dados que melhor atenda a cada propósito.

Essa é a forma mais simples de CQRS e tem seus prós e contras. A vantagem desta solução é a simplicidade, e sua desvantagem é que eventos podem ser perdidos e também não podem ser reprocessados. No próximo post desta série, explorarei uma técnica que faz uso do padrão Transactional Outbox para enviar de forma confiável informações do serviço de comandos para o serviço de consultas, mais especificamente, com a técnica Polling Publisher.

Sobre o Autor

Roberto Perillo é arquiteto de soluções enterprise da AWS Brasil especialista em serverless, atendendo a clientes da indústria financeira, e atua na indústria de software desde 2001. Atuou por quase 20 anos como arquiteto de software e desenvolvedor Java antes de ingressar na AWS, em 2022. Possui graduação em Ciência da Computação, especialização em Engenharia de Software e mestrado também em Ciência da Computação. Um eterno aprendiz. Nas horas vagas, gosta de estudar, tocar guitarra, e também jogar boliche e futebol de botão com seu filho, Lorenzo!

Sobre os Colaboradores

Luiz Santos trabalha atualmente como Technical Account Manager (TAM) na AWS e é um entusiasta de tecnologia, sempre busca novos conhecimentos e possui maior profundidade sobre desenvolvimento de Software, Data Analytics, Segurança, Serverless e DevOps. Anteriormente, teve experiência como Arquiteto de Soluções AWS e SDE.
Maria Mendes é Arquiteta de Soluções desde Agosto de 2022. Anteriormente, teve experiência na área de Engenharia de Dados, trabalhando com Python, SQL e arquitetura orientada a eventos. Na AWS, Maria faz parte da comunidade técnica de foco em serviços de DevOps da AWS.

Sobre os Revisores

Gerson Itiro Hidaka atualmente trabalha como Enterprise Solutions Architect na AWS e atua no atendimento a clientes da área Financeira no Brasil. Entusiasta de tecnologias como Internet das Coisas (IoT), Drones, DevOps e especialista em tecnologias como virtualização, serverless, containers e Kubernetes. Trabalha com soluções de TI há mais de 26 anos, tendo experiência em inúmeros projetos de otimização de infraestrutura, redes, migração, disaster recovery e DevOps em seu portfólio.
Peterson Larentis é arquiteto especialista sênior em soluções serverless na AWS, professor e coordenador de cursos de MBA, empreendedor e palestrante internacional. Na AWS, ele ajuda os maiores clientes da América Latina a criar arquiteturas seguras, confiáveis, elásticas e escaláveis com excelência operacional usando as melhores práticas ágeis de engenharia de software. Foi responsável pelas trilhas de DevOps no AWS Summit Brasil por duas edições e é o arquiteto líder da comunidade serverless da AWS na América Latina. Possui MBA em Marketing Digital pela Fundação Getúlio Vargas e seis certificações da AWS.