Blog de Amazon Web Services (AWS)

Orquestando subidas de archivos dependientes con AWS Step Functions

Esta publicación está escrita por Nelson Assis, líder de soporte empresarial Serverless, y Jevon Liburd, gerente técnico de cuentas Serverless

Amazon S3 es un servicio de almacenamiento de objetos que muchos clientes utilizan para almacenar archivos. Con el uso de Amazon S3 Event Notifications o Amazon EventBridge, los clientes pueden crear cargas de trabajo con una arquitectura basada en eventos (EDA). Esta arquitectura responde a los eventos que se producen cuando se producen cambios en los objetos de los buckets de S3.

La EDA implica una comunicación asíncrona entre los componentes del sistema. Esto sirve para desacoplar los componentes, lo que permite que cada componente sea autónomo.

Algunos escenarios pueden introducir un acoplamiento en la arquitectura debido a la dependencia entre eventos. Este blog presenta un ejemplo común de este acoplamiento y de cómo se puede gestionar con AWS Step Functions.

Descripción general

En este ejemplo, una organización tiene dos equipos autónomos distribuidos, el equipo de ventas y el equipo de depósito. Cada equipo es responsable de cargar un archivo de datos mensualmente en un depósito de S3 para poder procesarlo.

Los archivos generan eventos cuando se cargan, iniciando procesos posteriores. El procesamiento de los archivo del equipo de depósito, limpia los datos y los une con los datos del equipo de envíos. El procesamiento de los archivo de ventas se correlaciona con los datos combinados del los equipos de depósito e envío. Esto permite a los analistas realizar previsiones y obtener otros conocimientos.

Para que se produzca esta correlación, el archivo del equipo de depósito debe procesarse antes que el archivo de ventas. Como los dos equipos son autónomos, no hay coordinación entre ellos. Esto significa que los archivos se pueden cargar en cualquier momento sin garantizar que el archivo de depósito se procese antes que el archivo de ventas.

Para escenarios como estos, se puede utilizar el patrón agregación. El patrón recopila y almacena los eventos y desencadena un nuevo evento en función de los eventos combinados. En el escenario descrito, los eventos combinados son de los archivos procesados del equipo de depósito y de los archivos de ventas cargado.

Los requisitos del patrón de agregación son:

  1. Correlación: una forma de agrupar los eventos relacionados. Esto se cumple mediante un identificador único en el nombre del archivo.
  2. Agregador de eventos: un almacén con estado para los eventos.
  3. Comprobación de finalización y activación: una condición para recibir los eventos combinados y una forma de publicar el evento resultante.

Descripción general de la arquitectura

La arquitectura utiliza los siguientes servicios de AWS:

  • Amazon DynamoDB como agregador de eventos.
  • Step Functions para organizar el flujo de trabajo.
  • AWS Lambda para analizar el nombre del archivo y extraer el identificador de correlación.

AWS Serverless Application Model (AWS SAM) para la infraestructura como código y despliegue.

  1. Carga de archivos: Los equipos de ventas y depósito suben sus archivos respectivos a S3.
  2. EventBridge: El evento ObjectCreated se envía a EventBridge, donde hay una regla con un objetivo del flujo de trabajo principal.
  3. Máquina de estados principal: Esta máquina de estados organiza las operaciones del agregador y el procesamiento de los archivos. Encapsula los flujos de trabajo de cada archivo para separar la lógica del agregador de la lógica del flujo de trabajo de los archivos.
  4. Analizador de archivos  y correlación: La lógica empresarial para identificar el archivo y su tipo, se ejecuta en esta función Lambda.
  5. Almacenamiento con estado: Una tabla de DynamoDB almacena información sobre el archivo, como el nombre, el tipo y el estado del procesamiento. La máquina de estados lee y escribe en la tabla de DynamoDB. Adicionalmente, los identificadores de tareas también se almacenan en esta tabla.
  6. Procesamiento de archivos: Según el tipo de archivo y las condiciones previas, se ejecutan las máquinas de estado correspondientes al tipo de archivo. Estas máquinas de estados contienen la lógica necesaria para procesar el archivo específico.
  7. Token de tarea y devolución de llamada: El token de tarea se genera cuando el archivo dependiente intenta procesarse antes que el archivo independiente. El patrón Espere una devolución de llamada (Wait for a callback) de Step Functions continúa con la ejecución del archivo dependiente una vez procesado el archivo independiente.

Paso a paso

Necesita los siguientes prerequisitos:

  • AWS CLI y AWS SAM CLI instaladas.
  • Una cuenta de AWS.
  • Permisos suficientes para administrar los recursos de AWS.
  • Git instalado.

Para implementar el ejemplo, sigue las instrucciones del repositorio de GitHub.

En este tutorial, se muestra lo que ocurre si el archivo dependiente (archivo de ventas) se carga antes que el archivo independiente (archivo de depósito).

  1. El flujo de trabajo comienza con la carga del archivo de ventas al depósito dedicado de S3 para el equipo de ventas. En el ejemplo se utilizan depósitos S3 independientes para los dos archivos, ya que asume que los equipos de ventas y depósito están distribuidos y son autónomos. Puede encontrar los  archivos de ejemplo en el repositorio de código.
  2. Al cargar el archivo en S3, se envía un evento a EventBridge, sobre el que actúa la máquina de estados del agregador. El patrón de eventos usado en la regla de EventBridge es:
    {
      "detail-type": ["Object Created"],
      "source": ["aws.s3"],
      "detail": {
        "bucket": {
          "name": ["sales-mfu-eda-09092023", "warehouse-mfu-eda-09092023"]
        },
        "reason": ["PutObject"]
      }
    }
  3. La máquina de estados del agregador comienza invocando la función Lambda del analizador de archivos. Esta función analiza el tipo de archivo y utiliza el identificador para correlacionar los archivos. En este ejemplo, el nombre del archivo contiene el tipo de archivo y el identificador de correlación (year_month). Para utilizar otras formas de representar el tipo de archivo y el identificador de correlación, puedes modificar esta función para analizar esa información.
  4. El siguiente paso de la máquina de estados inserta un registro del evento en la tabla de DynamoDB del agregador de eventos. La tabla tiene una clave primaria compuesta con el identificador de correlación como clave de partición y el tipo de archivo como clave de clasificación. Se realiza un seguimiento del estado de procesamiento del archivo para proporcionar información sobre el estado del flujo de trabajo.
  5. Según el tipo de archivo, la máquina de estados determina qué rama seguir. En el ejemplo, se ejecuta la rama de ventas. La máquina de estados intenta obtener el estado del archivo de depósito (dependiente) de DynamoDB mediante el identificador de correlación. Con el resultado de esta consulta, la máquina de estados determina si el archivo de depósito correspondiente ya se ha procesado.
  6. Como el archivo Warehouse aún no se ha procesado, se utiliza el patrón de integración WaitForTaskToken. La máquina de estados espera en este paso y crea un token de tarea, con el cual los servicios externos utilizan para activar la máquina de estados para que continúe su ejecución. El registro de ventas de la tabla de DynamoDB se actualiza con el token de tarea.
  7. Navegue hasta la consola S3 y cargue el archivo ejemplo del equipo de depósito en su respectivo S3. Esto invoca una nueva instancia del flujo de trabajo de Step Functions, que pasa por la otra rama después del paso de elección del tipo de archivo. En esta rama, se ejecuta la máquina de estado del depósito y el estado de procesamiento del archivo se actualiza en DynamoDB.
  8. Cuando el estado del archivo de depósito cambia a «Completado», la máquina de estado del depósito comprueba en DynamoDB si hay un archivo de ventas pendiente. Si lo hay, recupera el token de la tarea y llama al método sendTaskSuccess. Esto activa la máquina de estado de ventas, que está en estado de espera para continuar. Se inicia la máquina de estados de ventas y se actualiza el estado del procesamiento.

    Conclusión

    Este blog muestra cómo gestionar las dependencias de los archivos en arquitecturas basadas en eventos. Puede personalizar el ejemplo que se proporciona en el repositorio de código para su propio caso de uso.

    Esta solución es específica para las dependencias de archivos en arquitecturas basadas en eventos. Para obtener más información sobre cómo resolver las dependencias y los agregadores de eventos, lea la entrada del blog: Pasar a arquitecturas basadas en eventos con agregadores de eventos serverless.

    Para obtener más información sobre las arquitecturas basadas en eventos, visita la sección de arquitectura basadas en eventos en Serverless Land.

    Este contenido es una traducción del Blog Original en inglés traducido por Diego Casas y revisado por Diego Riveros.