Blog de Amazon Web Services (AWS)

CQRS en AWS: Sincronizando los Servicios de Command y Query con el Estándar Transactional Outbox, la Técnica Transaction Log Tailing y el Debezium Connector

Esta es la tercera parte de la serie sobre cómo implementar CQRS en AWS. La primera parte fue una introducción al tema y también analizamos un caso de uso en el que tenemos una edición compatible con Amazon Aurora for PostgreSQL como base de datos del servicio de comandos y una Amazon Elasticache for Redis como base de datos del servicio de consultas. Para intercambiar información, se coloca un evento en una cola de Amazon SQS, en la que se busca un componente informático (una función de AWS Lambda, en el ejemplo presentado) que actualice Redis con información precalculada que esté lista para recuperarse.

En la segunda parte, exploramos el mismo caso de uso, pero empezamos a usar el estándar Transactional Outbox (en español, algo así como “bandeja de salida transaccional”). Allí, lo exploramos con la técnica Polling Publisher (en español, algo así como “el publicador que consulta”), mediante la cual buscamos en la bandeja de salida de tiempos en tiempos y publicamos eventos que aún no se han publicado. Para que esta idea funcione, después de publicar los eventos, borramos la bandeja de salida para que el mismo evento no se publique dos veces.

En esta tercera parte de esta serie, exploraremos la técnica Transaction Log Tailing (en español, algo así como “leer la cola o el final del log de transacciones”), que es otra forma de utilizar el estándar Transactional Outbox. Esta técnica gestiona el log de transacciones de una tabla de una base de datos para publicar los cambios. El caso de uso es prácticamente el mismo, con las mismas bases de datos y el mismo dominio.

Introducción

La mayoría de las aplicaciones que creamos necesitan una base de datos para almacenar los datos. Almacenamos el estado de nuestros objetos de dominio y los utilizamos para diversos fines, como procesar y generar informes. En ocasiones, es posible que nuestra base de datos no sea ideal para la recuperación de datos, ya sea por su naturaleza o debido a un modelo de dominio complejo, por ejemplo. Para estos casos, podemos usar el estándar arquitectural CQRS, que sugiere que, para un determinado bounded context (en español, algo así como “contexto delimitado”) de nuestro dominio, podemos tener dos servicios, uno para recibir comandos (es decir, operaciones que cambian de estado) y otro para consultas (que solo recuperan datos). De este modo, cada servicio puede tener la base de datos que mejor se adapte. El desafío consiste en cómo mantener sincronizadas las dos bases de datos, lo que se puede hacer con los eventos publicados desde el servicio de comandos para que los consuma el servicio de consultas.

La publicación confiable de eventos relacionados con cosas que ya han sucedido en una aplicación puede ser un desafío. Tanto si utilizamos el estándar CQRS como si no lo hacemos, si no tenemos cuidado, podemos terminar publicando información que aún no existe o no publicarla en cualquier momento, como ya comentamos en la primera parte de esta serie, y las fuentes de datos pueden perder la sincronización de las fuentes de datos. De hecho, dado que publicamos eventos del servicio de comandos para que los consuma el servicio de consultas, ya existe cierta coherencia posible, pero si no tenemos cuidado, es posible que las fuentes de datos no estén sincronizadas durante más tiempo del esperado. En el contexto de una base de datos relacional, el estándar Transactional Outbox nos permite publicar eventos de forma fiable. Cuando se aplica, un evento se conserva en la bandeja de salida de la misma transacción en la que se conservan los datos aggregate principales y, en algún momento, se publican más adelante.

El estándar Transactional Outbox. En este ejemplo, un aggregate de pedidos se compone de la clase Pedido, que a su vez contiene una lista de ArticuloPedido. Al realizar un pedido, el aggregate se confirma junto con un registro que representa el evento de creación del pedido.

Figura 1. El estándar Transactional Outbox. En este ejemplo, un aggregate de pedidos se compone de la clase Pedido, que a su vez contiene una lista de ArticuloPedido. Al realizar un pedido, el aggregate se confirma junto con un registro que representa el evento de creación del pedido.

Hay dos técnicas para tratar las bandejas de salida. Ya que analizamos la técnica de Polling Publisher en la última publicación, ahora veamos la técnica de Transaction Log Tailing. A diferencia de la técnica anterior, esta técnica utiliza el log de transacciones de una tabla para recuperar lo que ya existe y publicar los cambios. Tampoco es necesario necesariamente limpiar la bandeja de salida, aunque podría ser una buena idea ahorrar espacio de almacenamiento.

Es importante tener en cuenta que trabajar con una bandeja de salida significa capturar los cambios en los datos, o CDC, que es la idea de recuperar los cambios que se produjeron en los datos de alguna manera. Hay varias maneras de hacerlo. Una de ellas consiste en tener un componente que analice el log de transacciones de una tabla de la base de datos, que es el log de que contiene todo lo que ha quedado en las tablas, y en esta idea se basa la técnica de Transaction Log Tailing.

Caso de uso: Amazon Aurora PostgreSQL-Compatible Edition como base de datos del servicio de comandos y Amazon Elasticache para Redis como base de datos del servicio de consultas, mediante el estándar Transactional Outbox y la técnica Transaction Log Tailing con el Debezium Connector

Como en el primer y segundo caso que analicé en esta serie de publicaciones, tenemos Aurora como base de datos del servicio de comando y Redis como base de datos del servicio de consultas. Aurora contendrá información relacionada con los clientes, los productos y los pedidos, y Redis contendrá información precalculada relacionada con los clientes. Para probar la arquitectura, utilizaremos dos endpoints, uno para guardar los pedidos y otro para recuperar la información relacionada con el cliente.

Para implementar esta técnica, además de Aurora y Redis, también necesitaremos un cluster de Amazon Managed Streaming for Apache Kafka (Amazon MSK). Todos los clusters se implementarán en 3 subredes privadas en una Amazon VPC personalizada. También utilizaremos Amazon EventBridge Pipes para pasar datos del tema (topic) de Kafka a un tema de Amazon SNS, de modo que los cambios puedan enviarse a varios consumidores.

Resumen de la Solución

Al igual que hicimos con la técnica Polling Publisher, seguiremos conservando los datos del pedido junto con un registro que representa el evento que acaba de ocurrir en la bandeja de salida. La diferencia aquí es que, a diferencia de la técnica anterior, no necesitaremos componentes que se activen cada n minutos para recuperar los datos de las réplicas leídas de Aurora. En su lugar, tendremos un componente que recuperará los registros del log de transacciones de la base. Hay varias opciones, pero en la solución propuesta en esta entrada del blog, utilizaremos el Debezium Connector. El nombre de la técnica Transaction Log Tailing proviene del hecho de que los componentes que gestionan el log de transacciones lo observan como “la cola” de ese log y hacen que los registros estén disponibles para su uso en algún lugar. En el caso del Debezium Connector, un tema de Kafka.

En esta opción de implementación del estándar arquitectural CQRS, seguimos el log de transacciones de la bandeja de salida de la base de datos. Para ello, utilizaremos un componente denominado Debezium Connector for PostgreSQL, que publicará los cambios en la bandeja de salida en un tema de Kafka cada vez que se produzcan. El Debezium Connector captura los cambios a nivel de registro en las tablas indicadas por el usuario en las bases de datos relacionales tan pronto como persistan. Cuando se conecta a la base de datos por primera vez, toma un snapshot del estado actual de las tablas y la publica en un tema de Kafka, y luego continúa supervisando esas tablas y publicando los cambios a nivel de registro. Actualmente, en el caso de una base de datos de PostgreSQL, Debezium solo puede enviar cambios a nivel de registro sobre un tema de Kafka, lo cual tiene sentido, ya que se trata de un connector de Kafka.

Es importante tener en cuenta que, para que el Debezium Connector funcione, en el servicio Amazon RDS, necesitamos crear un parameter group de cluster, establecer el parámetro rds.logical_replication en 1 y asignar el parameter group a nuestra base de datos Aurora (si el clúster de base de datos se creó antes de asignar el parameter group al cluster, también necesitamos reiniciar las instancias del cluster). Para obtener instrucciones sobre cómo trabajar con la replicación lógica en Aurora PostgreSQL, consulte la documentación Uso de la replicación lógica de PostgreSQL con Aurora.

El Debezium Connector que utilizamos aquí es un source connector (en español, algo así como “conector fuente”), lo que significa que lee un log de transacciones y publica los cambios en un tema de Kafka. A partir de ahí, los datos pasan por el pipe de EventBridge hasta que los cambios se envían a otros componentes. Otra opción sería tener un sink connector (en español, algo así como “conector de destino o sumidero”): mientras que un source connector envía datos de algún lugar a un tema de Kafka, un sink connector envía datos de un tema de Kafka a algún otro lugar.

Hay algunos connectors en la comunidad que pueden enviar datos de un tema de Kafka a un tema de SNS. Si bien definitivamente funciona, lo único que hay que tener en cuenta es que, si fuera necesario algún tipo de tratamiento de datos, como lo que hacemos con la función OrderEventAdapterLambda de Lambda que los limpia, o si fuera necesario algún tipo de filtrado, esto no sería posible con un sink connector, como ocurre con EventBridge Pipes. También necesitamos instalar un connector nuevo y su plugin correspondiente para cada destino al que queramos enviar información. Un pipe de EventBridge es más flexible y fácil de usar, ya que no requiere la instalación de nuevos componentes.

En nuestro ejemplo, se realizará una llamada POST al endpoint /orders, con la información de un pedido que realizará un cliente. Esta llamada la validará un autorizador de Lambda (para simplificar, utilizaremos basic authentication) y la procesará más adelante mediante la función OrderReceiverLambda de Lambda, que realizará la función de nuestro servicio de comandos. Esta función Lambda, en la misma transacción, introducirá datos en las tablas que contienen información relacionada con el aggregate del pedido y también en la tabla que corresponde a nuestra bandeja de salida. El log de transacciones de esta tabla será supervisado por el Debezium Connector, que publicará los cambios que se le hayan realizado en un tema de Kafka.

El tema de Kafka que reciba los cambios realizados en la bandeja de salida será el origen de un pipe de EventBridge. La segunda etapa del pipe será la función Lambda OrderAdapterLambda, en la fase de enrichment, cuya finalidad es borrar el evento emitido por la source del pipe y también decodificar la clave y el valor del evento, que están codificados en Base64.

Esta función Lambda también puede desempeñar la función de un filtro. Cuando aplicamos la técnica Transaction Log Tailing con el Debezium Connector, podemos tener acceso no solo a los datos del evento en sí, sino también a los metadatos de la transacción en sí (esto sería posible añadiendo propiedades como transforms.unwrap.add.fields a la configuración de nuestro connector). También podríamos tener acceso a la eliminación de registros (para permitir que el Debezium Connector capture los eventos de eliminación de la bandeja de salida, tendríamos que establecer la propiedad transforms.unwrap.delete.handling.mode en rewrite o none).

Si hubiéramos habilitado Debezium para capturar los eventos de eliminación, este filtrado sería importante porque, dado que estamos observando el log de transacciones de la bandeja de salida, recibiríamos todos los eventos, incluidas las eliminaciones, y en el escenario que estamos explorando, solo nos interesan las inserciones. Estos eventos de eliminación se producen porque la función OrderEventCleanerLambda de Lambda borra la bandeja de salida, que genera los eventos.

Es cierto que es posible definir un filtro en un pipe de EventBridge. El problema es que los datos del evento están codificados en Base64 y, para filtrarlos, necesitamos decodificar los datos y comprobar los metadatos del evento para comprobar que el campo op es igual a “c” (como en “created”), y no podemos hacerlo en el filtro de un pipe de EventBridge, por lo que podríamos usar la función Lambda de la fase de enrichment para hacerlo. Para continuar procesando los eventos en la fase de enrichment, podríamos crear una lista, decodificar los eventos e incluir en la lista solo aquellos eventos cuyo campo de operación contenido en los metadatos de cada evento fuera igual a “c” y devolver la lista en la función Lambda.

Tras la fase de enriquecimiento de la función Lambda, el evento finalmente se envía a un tema de SNS, que lo envía a dos colas de SQS: una para que la lea una función Lambda que envía un correo electrónico de notificación y otra para que la lea una función Lambda que actualiza la clave del cliente en Redis.

La función Lambda que actualiza Redis publica un evento en otra cola de SQS opcional, que es leída por una función Lambda que elimina los eventos de la bandeja de salida que ya se han procesado. El evento lo publica la función Lambda que actualiza Redis solo para garantizar que la función Lambda opcional que elimina un evento ya publicado de Aurora solo reciba un evento después de una actualización correcta de Redis. Eliminar los registros de la bandeja de salida después de procesar los eventos es una forma de mantenerla limpia. Otras formas incluyen usar la extensión pg_partman, disponible para PostgreSQL de Amazon RDS o Amazon Aurora para PostgreSQL en el caso de PostgreSQL, o trabajar con el archivado de particiones en el caso de MySQL.

Un segundo endpoint, /clients/{clientId}, recupera la información relacionada con el cliente. La función Lambda que proporciona esta información representa nuestro servicio de consultas y la recupera de la clave de Redis que se actualizó anteriormente. La información devuelta contiene el nombre del cliente, su correo electrónico y el importe total que el cliente ya ha comprado.

Imagen que muestra la arquitectura propuesta, con Aurora como base de datos del servicio de comandos y Redis como base de datos del servicio de consultas. De tiempos en tiempos, se activa un Lambda, que a su vez recupera todos los eventos no publicados de la bandeja de salida transaccional, prepara cada uno como un documento JSON y los coloca en un tema de Amazon SNS, que los entrega a diferentes destinos, como Amazon SQS. colas.

Figura 2. Arquitectura propuesta, con Amazon Aurora PostgreSQL-Compatible Edition como base de datos del servicio de comandos y Amazon Elasticache for Redis como base de datos del servicio de consultas.

Como analizamos en la segunda parte de esta serie, esta solución también utiliza el estándar Transactional Outbox. Por lo tanto, se garantizará que los eventos que se producen en los contextos limitados de nuestra aplicación se publicarán en algún momento, y es posible que también dispongamos de mecanismos que eliminen los eventos de la bandeja de salida solo después de que se hayan consumido.

Otra ventaja de esta solución es que los cambios se toman del log de transacciones de la bandeja de salida de un tema de Kafka. Esto significa que tenemos la garantía de que lo que consumimos ya ha ocurrido en la bandeja de salida, que puede limpiarse en algún momento con un componente opcional que pueda realizar cálculos (como una función Lambda) o combinando extensiones de PostgreSQL, como pg_partman, con otras soluciones de AWS, como Amazon S3. Y dado que el Debezium Connector está instalado en el cluster MSK, que es Multi-AZ, tenemos más tolerancia a los fallos, por lo que se trata de una arquitectura más resiliente.

Otra ventaja es que, a diferencia de lo que exploramos con la técnica Polling Publisher, al observar el log de transacciones de la bandeja de salida, podemos capturar más datos que los que están presentes en la propia bandeja de salida, como los metadatos sobre la transacción. Además, los eventos se publicarán en la bandeja de salida tan pronto como persistan, por lo que el retraso de replicación entre las bases de datos será mucho menor. Otro punto es que, a diferencia de las otras soluciones, los eventos que persistían en la bandeja de salida se pueden reproducir, ya que también tratan sobre el tema de Kafka. Si queremos alimentar otra base de datos o reproducir eventos para intentar simular un bug, todos los eventos se pueden volver a reproducir.

La desventaja de esta arquitectura es que, aunque es más resiliente, añade más componentes y, por lo tanto, tenemos más complejidad y también es un poco más cara que las que se exploraron anteriormente.

Ejecutando el Ejemplo

Para ejecutar el ejemplo, los lectores deben tener una cuenta de AWS y un usuario con permisos de administrador. A continuación, basta con ejecutar el paso a paso que se proporciona en el repositorio de código para esta serie de entradas de blog sobre CQRS, en AWS Samples, alojadas en Github. Al realizar el proceso paso a paso, los lectores dispondrán de la infraestructura que se presenta aquí en sus propias cuentas.

El ejemplo contiene dos endpoints, uno para recibir información relacionada con los pedidos (que representa nuestro servicio de comando) y el otro para recuperar información relacionada con los clientes (que representa nuestro servicio de consultas). Para comprobar que todo ha funcionado correctamente, ve a la Amazon API Gateway y, en la lista de API, introduce la API “OrdersAPI” y, a continuación, “Stages”. Solo habrá una stage llamada “prod”. Recupere el valor del campo “Invoke URL” y añada “/orders”. Este es el endpoint que recibe la información relacionada con los pedidos.

Hagamos una solicitud POST a ese endpoint. Podemos usar cualquier herramienta para realizar solicitudes, como cURL o Postman. Como este endpoint está protegido, también necesitamos añadir basic authentication. Si utilizas Postman, tendrás que recuperar el nombre de usuario y la contraseña generados al crear la infraestructura. En la puerta de enlace de API, vaya a “API Keys” y copie el valor de la columna “API Key” de “admin_key”. Este valor contiene el nombre de usuario y la contraseña separados por el carácter “:”, pero está codificado en Base64. Decodifique el valor con una herramienta en línea o con el comando “base64” de Linux. El nombre de usuario está a la izquierda del carácter “:” y la contraseña, a la derecha. Añada una “Authorization” del tipo “Basic Auth” y rellene los campos “Username” y “Password” con los valores recuperados. Añade también un header “Content-Type”, con el valor “application/json”.

El payload para realizar solicitudes a este endpoint es el siguiente:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Esto representa un pedido realizado por el cliente con el identificador 1 y que contiene productos con los identificadores 1 y 2. El total de ese pedido es de $3000. Toda esta información se almacenará en Aurora. Al realizar esta solicitud POST, si todo funcionó según lo esperado, debería ver el siguiente resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Ahora, verifiquemos que la información relacionada con el cliente se haya enviado a Redis. Al endpoint de API Gateway, que se recuperó anteriormente, añada “/clients/1”. Este es el endpoint que recupera la información relacionada con el cliente. Hagamos una solicitud GET para ese endpoint. Al igual que hicimos con el endpoint “/orders”, necesitamos añadir basic authentication. Siga los pasos explicados anteriormente y realice la solicitud GET. Si todo ha funcionado según lo esperado, verás un resultado similar al siguiente:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Esto significa que pudimos alimentar correctamente a Redis con información lista para ser leída, mientras la misma información está en Aurora, en otro formato.

Limpiando los Recursos

Debido a que la infraestructura creada es relativamente compleja y algunos recursos deben crearse después de la creación de la infraestructura inicial, para limpiar los recursos, será necesario seguir algunos pasos. Para eliminar la infraestructura creada, vaya a Amazon MSK en la consola. Luego, en MSK Connect, vaya a “Connectors”, seleccione el connector “KafkaOrderEventConnector” y elimínelo. A continuación, ve a “Custom plugins”, seleccione el plugin “debezium-plugin” y elimínelo. A continuación, vaya a Amazon CloudFormation, “Stacks”, y haga clic en el botón de opción de la pila “cqrsOnAws”. Elimine el stack. Esta eliminación tardará aproximadamente 30 minutos. Probablemente no sea posible eliminar todo el stack. Si esto ocurre, vaya a Amazon EC2 y, en “Network & Security”, vaya a “Network Interfaces”. Elimine las dos interfaces restantes seleccionándolas y haciendo clic en “Delete” en el menú “Actions”. A continuación, vuelva a CloudFormation, seleccione la stack “cqrsOnAws” y haga clic en “Retry Delete”. A continuación, elija “Force delete this entire stack” y haga clic en “Delete”.

Conclusión

En esta entrada de blog, hablé de la técnica de Transaction Log Tailing, que es una forma de implementar el estándar Transactional Outbox. Por lo tanto, cada vez que estamos a punto de cambiar el estado de un aggregate en el servicio de comando, incluimos en una bandeja de salida un evento que representa la situación que le acaba de ocurrir al aggregate, junto con el aggregate en sí. Para publicar los cambios, utilizamos un componente que observa el log de transacciones de la bandeja de salida en la base de datos y publica los cambios a nivel de registro en otro componente. En nuestro ejemplo, utilizamos el Debezium Connector, que es el componente que observa el log de transacciones de una bandeja de salida y publica los cambios en un tema de Kafka.

Con el estándar Transactional Outbox, en algún momento se publicarán todos los eventos, por lo que no hay riesgo de no publicarlos. Es un enfoque relativamente simple, pero el Transaction Log Tailing es un poco más complejo que los demás enfoques y también es un poco más caro. Sin embargo, es más resistente y ofrece la posibilidad de reproducir eventos, ya sea para alimentar otra base de datos o para intentar reproducir un bug.

En la siguiente publicación de esta serie, analizaré otra técnica para publicar eventos desde una bandeja de salida, en la que analizaremos otra forma de la técnica de seguimiento de log de transacciones que utiliza el Amazon Database Migration Service para publicar de forma continua los cambios a nivel de registro que se produjeron en la bandeja de salida en otros componentes de nuestra arquitectura.

Este contenido és una traduccíon del blog original en Portugués (enlace acá).

Acerca del Autor

Roberto Perillo Roberto Perillo es un arquitecto de soluciones empresariales en AWS Brasil, especializado en sistemas serverless, que presta servicios a clientes del sector financiero y ha estado en la industria del software desde 2001. Trabajó durante casi 20 años como arquitecto de software y desarrollador Java antes de unirse a AWS en 2022. Es licenciado en Ciencia de la Computación, tiene una especialización en Ingeniería de Software y un máster también en Ciencia de la Computación. Un aprendiz eterno. En su tiempo libre, le gusta estudiar, tocar la guitarra y también ¡jugar a los bolos y al fútbol de mesa con su hijo, Lorenzo!

Acerca del Colaborador

Luiz Santos Luiz Santos trabaja actualmente como Technical Account Manager (TAM) en AWS y es un entusiasta de la tecnología, siempre busca nuevos conocimientos y tiene una mayor experiencia en el desarrollo de software, el análisis de datos, la seguridad, la tecnología serverless y DevOps. Anteriormente, tenía experiencia como arquitecto de soluciones de AWS y SDE.

Acerca de las Traductoras

Gabriela Guimarães Gabriela Guimarães es actualmente una de las 10 jóvenes que participan en el primer Tech Apprentice Program – Black Women Edition en AWS. Licenciado en Análisis y Desarrollo de Sistemas, estudia el mundo de la nube con enfoque en backend.
Maria Lucio Maria Lucio es Aprendiz de Tecnología, formando parte del primer Tech Apprentice Program – Black Women Edition en AWS. Licenciada en Técnico en Computación y Licenciada en Sistemas de Información, le apasiona la programación y orienta su trabajo como aprendiz hacia proyectos y estudios que le permitan evolucionar en el área del desarrollo de software.

Acerca de los Revisores

Erika Nagamine Erika Nagamine es arquitecta de soluciones de datos en AWS. Tiene una sólida formación académica, con un título en Sistemas de Información, un posgrado en Administración de Bases de Datos, Ingeniería de Datos y una especialización en Minería de Datos Complejos de la Unicamp. Trabaja con clientes de varios segmentos y ha participado en más de 200 proyectos en Brasil y en todo el mundo. Actualmente cuenta con múltiples certificaciones en datos y computación en la nube, todas las certificaciones de AWS, y le encanta compartir conocimientos en las comunidades y dar conferencias en eventos técnicos destacados en Brasil y el mundo.
Karine Ferrari Karine Ferrari es arquitecta de soluciones en AWS con experiencia con clientes de pequeñas y medianas empresas y de servicios financieros. Cuenta con 15 años de experiencia en el área de tecnología de la información trabajando en grandes instituciones y en los últimos 4 años ha estado trabajando en arquitectura para proyectos de nube y modernización de aplicaciones. Tiene experiencia en la implementación y el suministro de documentación, guías y experimentos con el fin de promover y ayudar a los equipos empresariales a utilizar microservicios, API, mensajería, eventos y bases de datos en proyectos en la nube.
Gonzalo Vásquez Gonzalo Vásquez es Senior Solutions Architect de AWS Chile para clientes de los segmentos Independent software vendor (ISV) y Digital Native Business (DNB) de Argentina, Paraguay y Uruguay. Antes de sumarse a AWS, se desempeñó como desarrollador de software, arquitecto de sistemas, gerente de investigación y desarrollo y CTO en compañías basadas en Chile.