O blog da AWS

Construindo um pipeline Serverless para fornecer mensagens confiáveis

Este post foi escrito por Jeff Harman, arquiteto sênior de prototipagem, Vaibhav Shah, arquiteto sênior de soluções, Erik Olsen, gerente técnico sênior de contas e adaptado ao Português por Daniel Abib, arquiteto sênior de soluções

Muitos setores precisam fornecer trilhas de auditoria para sistemas transacionais e de decisão. A tomada de decisão assistida por IA requer o monitoramento de todas as entradas do sistema de decisão quase em tempo real para evitar fraudes, detectar desvios de modelos e discriminação. Os sistemas modernos geralmente usam uma variedade muito maior de entradas para a tomada de decisões, incluindo imagens, texto não estruturado, valores históricos e outros grandes elementos de dados. Esses grandes elementos de dados representam um desafio para os sistemas de auditoria tradicionais que lidam com mensagens de texto relativamente pequenas em formatos estruturados. Este blog mostra o uso da tecnologia Serverless para criar um pipeline de streaming confiável, com desempenho, rastreável e durável para processamento de auditoria.

Visão geral

Considere os quatro requisitos a seguir para desenvolver uma arquitetura para ingestão de registros de auditoria:

  1. Tamanho do registro de auditoria: armazene e gerencie grandes cargas úteis (payload), de 256k a 6 MB de tamanho, que podem ser heterogêneas, incluindo texto, dados binários e referências a outros sistemas de armazenamento.
  2. Rastreabilidade de auditoria: os dados armazenados têm rastreabilidade total da carga útil (payload) e dos processos externos para monitorar o processo por meio de eventos baseados em assinatura.
  3. Alto desempenho: o tempo necessário para bloquear gravações no sistema é limitado ao tempo necessário para transmitir o registro de auditoria pela rede.
  4. Alta durabilidade de dados: quando o sistema envia um recibo da carga útil, a carga tem um risco muito baixo de perda devido a falhas do sistema.

O diagrama a seguir mostra uma arquitetura que atende a esses requisitos e modela o fluxo do registro de auditoria pelo sistema.

A principal fonte de latência é o tempo necessário para que um registro de auditoria seja transmitido pela rede. Os aplicativos que enviam registros de auditoria fazem uma chamada de API para um endpoint do Amazon API Gateway. Uma função do AWS Lambda recebe a mensagem e um cluster do Amazon ElastiCache para Redis fornece um mecanismo de armazenamento inicial de baixa latência para o registro de auditoria. Depois que os dados são armazenados no ElastiCache, o fluxo de trabalho do AWS Step Functions orquestra as funções de comunicação e persistência.

Os assinantes recebem quatro notificações do Amazon Simple Notification Service (Amazon SNS) referentes à chegada e ao armazenamento da carga útil (payload) do registro de auditoria, ao armazenamento dos metadados do registro de auditoria e à conclusão do arquivamento do registro de auditoria. Os usuários podem inscrever uma fila do Amazon Simple Queue Service (SQS) no tópico do SNS e usar mecanismos de fan out para obter alta confiabilidade.

  1. A função Lambda Ingest Message envia uma notificação inicial de recebimento
  2. A função Lambda Message Archive Handler notifica o armazenamento do registro de auditoria do ElastiCache para o Amazon Simple Storage Service (Amazon S3)
  3. A função Lambda Message Metadata Handler notifica sobre o armazenamento dos metadados da mensagem no Amazon DynamoDB.
  4. A função Lambda Final State Aggregation notifica que o registro de auditoria foi arquivado.

Qualquer falha nas três etapas fundamentais de processamento: ingestão, arquivamento de dados e arquivamento de metadados aciona uma mensagem em um SQS Dead Letter Queue (DLQ) que contém a solicitação original e uma explicação do motivo da falha. Qualquer falha na função Ingest Message invoca a função Ingest Message Failure, que armazena os parâmetros originais no bucket S3 Failed Message Storage para análise posterior.

O fluxo de trabalho (workflow) do Step Functions fornece orquestração e execução de caminhos paralelos para o sistema. O fluxo de trabalho abaixo mostra de forma detalhada o fluxo de execução e as ações de notificação. As etapas do Transformador (Transformer) convertem as estruturas de dados internas no formato necessário para os consumidores.

Estruturas de dados

Existem três tipos de eventos e mensagens gerenciados por esse sistema:

  1. Mensagem recebida: essa é a mensagem que o produtor envia para um endpoint do API Gateway.
  2. Mensagem interna: esse evento contém os metadados da mensagem, permitindo que sistemas subsequentes entendam o contexto do produtor da mensagem de origem.
  3. Mensagem de notificação: mensagens que permitem que os assinantes posteriores ajam com base na mensagem.

Passo a passo da solução

O produtor da mensagem chama o endpoint do API Gateway, que impõe os requisitos de segurança definidos pela empresa. Nessa implementação, o API Gateway usa uma chave de API para fornecer uma segurança mais robusta. O API Gateway também cria um cabeçalho de segurança para consumo pela função Lambda Ingest Message. O API Gateway pode ser configurado para impor padrões de formato de mensagem. Consulte Usar validação de solicitação no API Gateway para obter mais informações.

A função Lambda Ingest Message gera um ID de mensagem que rastreia a carga útil (payload) da mensagem em todo o seu ciclo de vida. Em seguida, ele armazena a mensagem completa no cache do ElastiCache for Redis. A função Lambda Ingest Message gera uma mensagem interna com todos os elementos necessários, conforme descrito acima. Por fim, o código do manipulador (handler) da função Lambda inicia o fluxo de trabalho do Step Functions com a carga interna da mensagem.

Se a função Ingest Message falhar por algum motivo, ela invoca a função Ingestion Failure Handler. Essa função do Lambda grava todos os dados recuperáveis de mensagens recebidas em um bucket do S3 e envia uma notificação na fila de mensagens perdidas (dead letter queue) do Ingest Message.

Em seguida, o fluxo de trabalho do Step Functions executa três processos em paralelo.

  • O fluxo de trabalho do Step Functions aciona a função Lambda Message Archive Data Handler para manter os dados das mensagens do cache do ElastiCache em um bucket do S3. Depois de armazenada, a função Lambda retorna as informações de referência e estado do bucket do S3. Há duas opções para remover a mensagem interna do cache. Remova a mensagem do cache imediatamente antes de enviar a mensagem interna e atualizar o sinalizador de cache do ElastiCache ou espere que o ciclo de vida do ElastiCache remova uma mensagem obsoleta. Essa solução aguarda o ciclo de vida do ElastiCache para remover a mensagem.
  • O fluxo de trabalho aciona a função Lambda Message Metadata Handler para gravar todos os metadados da mensagem e informações de segurança no DynamoDB. A função Lambda responde com as informações de referência do DynamoDB.
  • Por fim, o fluxo de trabalho do Step Functions envia uma mensagem ao tópico do SNS para informar aos assinantes que a mensagem chegou e que os processos de persistência de dados foram iniciados.

Depois que cada processo das funções Lambda é concluído, a função Lambda envia uma notificação ao tópico de notificação do SNS para alertar os assinantes de que cada ação foi concluída. Quando as funções Message Metadata e Message Archive são concluídas, a função Final Aggregation faz uma atualização final nos metadados no DynamoDB para incluir informações de referência do S3 e remover a referência do ElastiCache Redis.

Implantando a solução

Pré-requisitos:

  1. O AWS Serverless Application Model (AWS SAM) precisa instar instalado (consulte Introdução ao AWS SAM)
  2. Usuário/credenciais da AWS com permissões apropriadas para executar modelos do AWS CloudFormation na conta de destino da AWS
  3. Python 3.8 — 3.10
  4. O AWS SDK para Python (Boto3) precisa instar instalado
  5. A biblioteca python de solicitações precisa estar instalada

O código-fonte dessa implementação pode ser encontrado em https://github.com/aws-samples/blog-serverless-reliable-messaging

Instalando a solução:

  1. Clone o repositório git em um diretório local
  2. git clone https://github.com/aws-samples/blog-serverless-reliable-messaging.git
  3. Mude para o diretório que foi criado pela operação de clonagem, geralmente blog_serverless_reliable_messaging
  4. Execute o comando: sam build
  5. Execute o comando: sam deploy —-guided. Você deverá fornecer os seguintes parâmetros:
    1. Stack Name: nome dado a essa implantação (exemplo: serverless-streaming)
    2. AWS Region: região AWS onde implantar (exemplo: us-east-1)
    3. ElastiCacheInstanceClass: tipo de instância de cache do EC2 que será usada (exemplo: cache.t3.small)
    4. ElasticReplicaCount: quantas réplicas devem ser usadas com o ElastiCache (mínimo recomendado: 2)
    5. Nome do projeto: usado para nomear recursos na conta (exemplo: serverless-streaming)
    6. MultiAZ: Verdadeiro/Falso se várias zonas de disponibilidade precisarem ser usadas (recomendado: Verdadeiro)
    7. Os parâmetros padrão podem ser selecionados para o restante das perguntas

Testando:

Depois de implantar a pilha (stack), você pode testá-la por meio do endpoint do API Gateway com a chave de API referenciada na saída (output) da implantação. Há dois métodos para recuperar a chave de API por meio do console da AWS (do link fornecido na saída — ApiKeyConsole) ou por meio da AWS CLI (da referência da AWS CLI na saída — APIKeyCLI).

Você pode testar diretamente no console de serviço do Lambda invocando a função Ingest Message.

Uma mensagem de teste está disponível na raiz do projeto test_message.json para o teste direto na função Lambda da função Ingest.

  1. No console, navegue até o serviço Lambda
  2. Na lista de funções disponíveis, selecione a função “<project name>-IngestMessageFunction-xxxxx”
  3. Na “Visão geral da função”, selecione a guia “Teste”
  4. Insira um nome de evento de sua escolha
  5. Copie e cole o conteúdo de test_message.json na caixa “Event JSON”
  6. Clique em “Salvar” e, depois de salvar, clique em “Teste”
  7. Se for bem-sucedido, você deverá ver algo semelhante ao abaixo nos detalhes:
{
"isBase64Encoded": false,
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST"
},
"body": "{\"messageID\": \"XXXXXXXXXXXXXX\"}"
}
  1. No bucket do S3 “<project name>-s3messagearchive-xxxxxx”, encontre a carga útil (paylod) do json original com uma chave baseada na data e hora da execução do script, por exemplo: ANO/MÊS/DIA/HORA/MINUTO (YEAR/MONTH/DAY/HOUR/MINUTE) com um nome de arquivo do MessageID
  2. Em uma tabela do DynamoDB chamada metaDataTable, você deve encontrar um registro com um messageID igual ao messageID  acima, que contém todos os metadados relacionados à carga

Um script python está incluído com o código na pasta test_client

  1. Substitua os valores <Your API key key here> e <Your API Gateway URL here (IngestMessageApi) > pelos valores corretos para seu ambiente no arquivo test_client.py
  2. Execute o script de teste com o Python 3.8 ou superior com o pacote de solicitações instalado.  Exemplo de execução (do diretório principal do git clone):
    python3 -m pip install -r ./test_client/requirements.txt
    python3 ./test_client/test_client.py
  3. A saída bem-sucedida mostra o MessageID e o payload JSON do cabeçalho:
{
"messageID": " XXXXXXXXXXXXXX"
}
  1. No bucket do S3 “<project name>-s3messagearchive-xxxxxx“, você deve conseguir encontrar a carga útil do json original com uma chave baseada na data e hora da execução do script, por exemplo:
  2. ANO/MÊS/DIA/HORA/MINUTO (YEAR/MONTH/DAY/HOUR/MINUTE)
  3. com um nome de arquivo de messageID
  4. Em uma tabela do DynamoDB chamada metaDataTable, você deve encontrar um registro com um messageID igual ao messageID acima que contém todos os metadados relacionados à carga de trabalho (payload)

Conclusão

Este blog descreve padrões de arquitetura, padrões de mensagens e estruturas de dados que oferecem suporte a um sistema de mensagens altamente confiável para mensagens grandes. O uso de serviços Serverless, incluindo funções Lambda, Step Functions, ElastiCache, DynamoDB e S3, atende aos requisitos dos sistemas de auditoria modernos para serem escaláveis e confiáveis. A arquitetura compartilhada nesta postagem do blog é adequada para um ambiente altamente regulamentado para armazenar e rastrear mensagens maiores do que os sistemas de registro típicos, com registros com tamanhos entre 256k e 6MB. A arquitetura serve como um modelo que pode ser estendido e adaptado para se adequar a outros casos de uso Serverless.

Para obter recursos de aprendizado Serverless, visite Serverless Land.

Este blog é uma tradução do blog original em inglês (link aqui).

Biografia dos Autores

Jeff Harman é arquiteto sênior de prototipagem
Vaibhav Shah é arquiteto sênior de soluções
Erik Olsen é gerente técnico sênior de contas e adaptado

Biografia do tradutor

Daniel Abib é arquiteto de soluções sênior na AWS, com mais de 25 anos trabalhando com gerenciamento de projetos, arquiteturas de soluções escaláveis, desenvolvimento de sistemas e CI/CD, microsserviços, arquitetura Serverless & Containers e segurança. Ele trabalha apoiando clientes corporativos, ajudando-os em sua jornada para a nuvem.

https://www.linkedin.com/in/danielabib/

Biografia da Revisora

Bianca Mota é arquiteta de soluções para o segmento de startups e iniciou sua jornada na AWS em 2019 ajudando clientes em suas jornadas na nuvem. Além disso, Bianca é parte da comunidade Serverless na AWS e já apresentou sobre o assunto em diversos eventos, como o AWS Summit São Paulo e o AWS re:Invent.