Blog de Amazon Web Services (AWS)
Construyendo Pipelines de datos con Amazon Managed Workflows for Apache Airflow, Amazon EMR, AWS Glue, y Amazon Athena
Por: Gabriel Paredes, Sr. Solutions Architect, WWPS LCC
Apache Airflow es uno de los proyectos del ámbito de analítica con mayor crecimiento y adopción. Airflow permite la creación, orquestación y ejecución de Pipelines de datos como parte de un flujo de ETL (Extracción Transformación y Carga de Datos). Airflow simplifica la interacción con múltiples plataformas mediante la descomposición de las tareas en pequeñas unidades de trabajo y el manejo de dependencias entre éstas para generar un Grafo Acíclico Dirigido (Directed Acyclic Graph – DAG) con los ordenes de prioridad de ejecución.
La plataforma de Airflow cuenta con una Interfaz Grafica (GUI) y utilidades de líneas de comando (CLI) que permiten monitorear y gestionar los DAGs. Adicional a esto, Airflow maneja un componente de Scheduling y Executors que se encargan de procesar las tareas y operaciones según las instrucciones del DAG.
Recientemente hemos anunciado el lanzamiento de Amazon Managed Workflows for Apache Airflow (MWAA), el cual es un servicio administrado de Apache Airflow que facilita el despliegue y operación del ciclo de vida completo de los pipelines de datos a escala de cloud. Con MWAA podemos usar las librerías de Airflow y Python para crear flujos de trabajo de transformación de datos sin tener que preocuparnos de administrar o configurar la plataforma subyacente para soportar la escalabilidad, disponibilidad y seguridad. MWAA escala automáticamente a medida que los flujos de trabajo y las tareas contenidas en los DAGs los requiere.
Uno de los grandes beneficios de Airflow es su extensibilidad mediante el uso de conectores y operadores que le permiten interactuar directamente con recursos de analítica en AWS o en ambientes On-Premises. Entre los conectores disponibles podemos destacar Amazon Simple Storage Service (Amazon S3) como repositorio centralizado de datos o Data Lake, Amazon Elastic Map Reduce (Amazon EMR) y AWS Glue para la ejecución de tareas de ETL, y Amazon Athena parar la generación de consultas a los datos, Amazon SageMaker para el entrenamiento y despliegue de modelos de Machine Learning (ML). Adicionalmente a estos, Airflow cuenta con un listado extenso de integración con servicios de AWS, y de igual forma podemos hacer uso de la librería de Boto3 en los flujos de trabajo.
Airflow cuenta con seis (6) componentes principales, que conforman los elementos requeridos para construir y orquestar los Pipelines de datos:
- DAG (Directed Acyclic Graph): Describen las tareas, las dependencias y condiciones de ejecución. Contiene la lógica para manejar las fallas en las tareas. Dentro del DAG, encontramos un conjunto de variables de ejecución que facilitan la coordinación entre recursos (Fecha y tiempo de Inicio de Ejecución, Próxima Ejecución, Periodicidad de Ejecución, entre otros).
- Operator: Es el elemento que conoce y tiene las herramientas necesarias para realizar una tarea. Dentro del DAG, los Operators son instanciados como Objetos de clases como Python Operator, Postgres Operator, Bash Operator, EMR Operator, S3 Operator.
- Task: Es la instancia de un operator que describe una acción a ser realizada, los tasks son agendados posteriormente por el componente de Scheduler de Airfllow y ejecutado por los nodos de Executor. Ejemplos de tasks pueden ser, copiar datos entre dos Buckets de Amazon S3 mediante el Operador de S3.
- Xcom: En algunos casos es necesario intercambiar datos entre Taks Instances en un DAG, el mecanismo de Xcom nos permite realizar push/pull de pequeños mensajes. Un caso común de Xcom es obtener el ClusterID de un clúster de EMR que ha sido previamente desplegado por un Task.
- Connections: Abarca las instrucciones básicas, manejo de credenciales para acceder de forma segura a un sistema externo a Apache Airflow, así como el ciclo de vida de las comunicaciones (Inicio y finalización). Si quisiéramos conectarnos a una base de datos PostgreSQL, el componente de Connection es el encargado de enviar la cadenasaa de conexión a la DB.
- Hooks: Son interfaces que permiten interactuar enviando instrucciones a plataformas externas y bases de datos. Todos los Hooks implementan una interfaz similar y hacen uso de los Connections.
Procesando datos con Amazon MWAA
Imagen 1: Flujo de procesamiento de datos
En este blog post se plantea el despliegue de un ambiente de MWAA para orquestar un Pipeline de datos tomando como fuente el data set (DS) de Movie Lens. Como se observa en la imagen 1, el proceso inicia con la descarga del DS que será almacenado en un Bucket de Amazon S3 (PythonOperator) en formato crudo (CSV) y particionado por fecha (YYYY-MM-DD), adicionalmente se realizará la validacion para comprobar que la información ha sido transferida correctamente (S3KeySensor) mediante la existencia de un archivo de nombre “_SUCESS”.
Posteriormente, se invocará la creación de un clúster transitorio de Amazon EMR (EmrCreateJobFlowOperator) y se cargarán tres (3) tareas de Apache Spark para el procesamiento del conjunto de datos (EmrAddStepsOperator). El resultado del procesamiento se almacenará en un bucket de amazon S3 (Processed Data), y se monitoreará la finalización exitosa de las tareas antes de continuar con el resto del flujo de datos (EmrStepSensor).
Una vez, los datos han sido procesados y que las tareas de EMR han finalizado de forma exitosa, se iniciará un Crawler de AWS Glue para escanear el esquema de datos del resultado del procesamiento (Operador personalizado GlueTriggerCrawlerOperator). Finalmente, se iniciará una tarea de consulta mediante Amazon Athena para generar una tabla de resumen del top de reviews de películas por género (AWSAthenaOperator).
El caso de uso que se presenta en este blog post, es soportado por la arquitectura de servicios de AWS que observan en la Imagen 2.
Imagen 2: Diagrama de Arquitectura de la Solución
Despliegue
La solución se ha construido usando AWS Cloud Development Kit (AWS CDK) para ser desplegada en una cuenta de AWS. Para hacer uso del código, acceda a éste repositorio de GitHub y posteriormente aplique los siguientes pasos:
Prerrequisitos
Para desplegar esta solución, es necesario contar con los siguientes prerrequisitos:
- Una cuenta de AWS
- AWS CDK instalado
- Python3.6+
- AWS CLI configurado con credenciales de acceso
- Permisos para desplegar los componentes listados en esta solución
- Amazon MWAA
- Amazon S3 Buckets
- AWS IAM Roles and Policies
- Amazon EMR Clusters
- Amazon Athena
- AWS Glue Crawler and Databases
- Cloud9
- AWS CloudFormation
- AWS VPC
Puede existir un cobro por el uso de los servicios
Configuración del proyecto con AWS CDK
1 – Desplegar un ambiente de AWS Cloud9
1.1 Busque el servicio AWS Cloud9 en la consola de AWS (us-east-1).
1.2 Haga clic en «Crear entorno»
1.3 Ingrese el entorno de nombre de su preferencia
1.4 Configurar ajustes: Simplemente haga clic en el botón “Siguiente”.
1.5 Una vez desplegado el entorno. Haga clic en «Abrir IDE»
2 – Clonar el código del siguiente repositorio – https://github.com/aws-samples/amazon-mwaa-workflow-demo
3 – Ejecutar los siguientes comandos en la terminal de AWS Cloud9. Monitorear el terminal, en caso que se solicite una confirmación de despliegue, colocar “Y”.
npm install -g aws-cdk
cd ~/environment/amazon-mwaa-workflow-demo
python3 -m venv .env
source .env/bin/activate
pip install -r requirements.txt
cdk bootstrap
cdk deploy --all
4 – Espere hasta que los Stacks de AWS CDK finalicen satisfactoriamente.
5 – Ejecute el script posterior [CCC3] a la implementación en AWS Cloud9 Terminal para generar un archivo JSON que contenga las variables de Apache Airflow necesarias.
cd ~/environment/amazon-mwaa-workflow-demo/post_scripts
python3 CreateMWAAVariables.py[CCC4] [MOU5]
El resultado se guardará en el archivo mwaa_demo_variables.json y será similar al que observa en el siguiente bloque de código.
{
"datalake_processed_bucket": "cdk-mwaa-s3-datalakeprocessedXXXXXXX",
"datalake_raw_bucket": "cdk-mwaa-s3-datalakerawXXXXXXX",
"emr_logs_bucket": "cdk-mwaa-s3-emrlogsXXXXXXX",
"emr_scripts_bucket": "cdk-mwaa-s3-emrscriptsXXXXXXX",
"emr_jobflow_role": "EMR_EC2_DefaultRole_MWAA",
"emr_service_role": "EMR_DefaultRole_MWAA",
"emr_subnet_id": "subnet-XXXXXXX",
"glue_crawler_name": "mwaa_crawler_movie_lens",
"glue_database_name": "mwaa_movie_lens"
}
Paso a Paso del Pipeline de Datos
Una vez completado los pre-requisitos descritos en el punto anterior, se procede con la creación del entorno de Amazon MWAA, para esto seguimos las siguientes instrucciones:
1 – Ir a la consola de AWS, en la barra de búsqueda de servicios de la parte superior colocamos Apache Airflow, seleccionamos Managed Apache Airflow y damos clic en “Crear Entorno” (Imagen 3). Se desplegará el configurador del servicio.
Imagen 3: Portal del servicio MWAA en la consola de AWS
2 – Colocamos el nombre del entorno que será desplegado (Imagen 4).
Imagen 4: Configuración del entorno de MWAA
3 – Se procede con la configuración del Bucket de Amazon S3 donde se almacenarán los DAGs. Tomamos como referencia el nombre del bucket que ha sido creado con el script de AWS CDK (cdk-mwaa-s3-mwaaconfigXXXXXXX) y se configura como se observa en la imagen 5. Dentro del folder cdk-mwaa-s3-mwaaconfigXXXXXXX /dags/scripts/ se encuentra el script de Python mwaa_blogpost_data_pipeline.py que construye el dag para este blog. Puede entrar a revisar este script para mayor detalle de la implementación.
4- Dejamos el resto de los valores por defecto y se da clic en “Siguiente”.
Imagen 5: Configuración del entorno de MWAA – Amazon S3
5 – Continuamos con la configuración de Redes, en la casilla de VPC seleccionamos el VPC ID que se ha creado mediante el script de AWS CDK (Imagen 6), adicionalmente, seleccionamos las Subnets ID del despliegue de MWAA (Estos valores se pueden observar en el output de consola de CDK).
Imagen 6: Configuración del entorno de MWAA – Redes.
6 – Al alcance de la presente demostración, seleccionamos el acceso al servidor WEB “Red Publica” (Imagen 7) y mantenemos el resto de los valores por defecto que se presentan en la pantalla de configuración. Damos clic en “Siguiente”, validamos los datos del entorno que será creado y finalmente damos clic en “Crear Entorno”. El proceso de aprovisionamiento tomará unos 20 minutos aproximadamente.
Imagen 7: Configuración del entorno de MWAA – Redes cont
7 – Una vez el entorno se encuentre en proceso de creación, podremos ver en sus parámetros el Rol de IAM que ha sido creado con el configurador del servicio. (MWAA > Entornos > Clic en el entorno que hemos creado). Al darle clic al nombre del rol (Imagen 8), se abrirá en una pestaña del navegador en la consola de IAM. Procedemos a dar clic en el botón de “Asociar políticas”, buscamos el nombre de política “mwaa_airflow_policy”, selecciónela y damos clic en “Asociar Políticas” (Esta ha sido creada por el Script de AWS CDK y contiene los permisos necesarios para correr el Pipeline de datos en los servicios involucrados). El resultado final será similar al de la imagen 9.
Imagen 8: Configuración del entorno de MWAA – IAM 1
Imagen 9: Configuración del entorno de MWAA – IAM 2
8 – Una vez el entorno de Amazon MWAA ha sido creado satisfactoriamente veremos en la consola del servicio la URL de acceso al GUI de Apache Airflow. Damos clic en el enlace y se abrirá una pestana (Imagen 10).
Imagen 10: Configuración del entorno de MWAA – URL de Acceso
9 – Antes de iniciar la ejecución del Pipeline de datos, es necesario cargar las variables que usará el código del DAG para encontrar las ubicaciones de origen y destino en los buckets de Amazon S3, Roles de IAM, DB y Crawler de AWS Glue. Para esto procedemos a dar clic en “Admin > Variables” en la parte superior (Imagen 11). En la siguiente ventana seleccionamos “Choose File” y ubicamos el archivo “mwaa_blogpost_data_pipeline.json” que se ha creado en el paso 5 de la verificación del despliegue. Finalmente se da Clic en “Import Variables”, el resultado final será similar a lo que se observa en la Imagen 12.
Imagen 11: GUI de Apache Airflow – Vista detallada del DAG
Imagen 12: GUI de Apache Airflow – Vista detallada del DAG
10 – Una vez cargadas las variables al entorno de Amazon MWAA, se procede a dar Clic en “DAGs” en el panel superior del GUI de Apache Airflow, se observará el dag “mwaa_blogpost_data_pipeline” listado (Imagen 13).
Imagen 13: GUI de Apache Airflow – Vista general de DAGs
11 – Procedemos a dar Clic en DAG “mwaa_blogpost_data_pipeline”, tal como se observa en la imagen 14, esto desplegará los detalles del DAG, al dar Clic en “Graph View” podremos observar la dependencia de las tareas que será ejecutado por el Pipeline de datos. Estos corresponden a los pasos que se han explicado al inicio del blogpost.
Imagen 14: GUI de Apache Airflow – Vista detallada del DAG
12 – Se procede a habilitar el DAG (Clic en el Botón de “Off”; cambiará a “On” como se observa en la imagen 15), seguido a esto se iniciará de forma automática la ejecución de un Pipeline de datos. Podremos observar como progresivamente el motor de scheduling de Apache Airflow agendará la ejecución de las tareas iniciando por “download_movie_lens”, progresivamente hasta llegar a “query_athena_results”. Airflow utilizará las variables cargadas en el paso 9 para encontrar la ubicación y destino de los datos, asi como los parámetros creación y uso de los servicios de AWS (Roles de IAM, Subnets, VPC, etc).
A medida que las dependencias entre las tareas se completan satisfactoriamente, Airflow se encargará de manejar el Input/Output de las tareas mediante el mecanismo de XCOMs, de tal forma que se pueda pasar información resultante de la tarea previa. Así mismo, podrán observar como cada tarea es monitoreada en cuanto a tiempos de ejecución.
Durante este proceso, pueden acceder desde la Consola de AWS al portal de los servicios de Amazon EMR, AWS Glue, y Amazon Athena a medida que estos son invocados.
Imagen 15: GUI de Apache Airflow – Vista detallada del DAG en funcionamiento
13 – Una vez finalizada la ejecución del DAG, en el bucket de datos crudos (cdk-mwaa-s3-datalakerawXXXXXXX) se encontrarán cuatro (4) carpetas que contienen la información original del Data Set de Movie Lens en formato csv particionados por fecha de ejecución del DAG. De igual forma en el Bucket de data procesada (cdk-mwaa-s3-datalakeprocessedXXXXXXX) existirá el Data Set transformado en formato parquet. Para finalizar, dentro del bucket de data procesada también se observará una carpeta llamada “athena_results”, la cual contiene un archivo csv con el resultado de la consulta SQL invocada por Amazon Athena desde MWAA (Imagen 16).
Imagen 16: Consulta SQL en Amazon Athena
Adicionalmente al flujo de los datos, en la infraestructura se han desplegado un bucket para almacenar el script de Apache Spark que es ejecutado por el cluster de Amazon EMR (cdk-mwaa-s3-emrscriptsXXXXXXX) así como un bucket para almacenar los logs de ejecución del mismo (cdk-mwaa-s3-emrlogsXXXXXXX). Esta información es cargada en las variables de Apache Airflow y son tomadas como parámetros en los pasos de “créate_emr_cluster” y “add_emr_spark_step_*”
Limpieza
Para eliminar la solución desplegada:
1 – Paso inverso 7.8. Haga clic en el entorno de Amazon MWAA creado en este blog post desde la consola de AWS y busque el «Rol de ejecución». Haga clic en el nombre del rol y será redirigido a la configuración del rol de IAM. Seleccione la política mwaa_airflow_policy, selecciónela y haga clic en «Desasociar Política»
2 – Elimine el entorno Amazon MWAA desde la consola de AWS, seleccionado el entorno creado para este blog y haciendo clic en Eliminar. Espere hasta que el entorno se elimine correctamente.
3 – Ejecute el script posterior al despliegue en AWS Cloud9 Terminal con el fin de eliminar los grupos de seguridad creados para el entorno de Amazon MWAA y las instancias de clúster de Amazon EMR (maestro y núcleo). Supervise la salida del script para verificar que se haya completado correctamente.
cd ~/environment/amazon-mwaa-workflow-demo/post_scripts
python3 CleanUpScript.py
4 – Elimine los Stacks de AWS CDK. En terminal de AWS Cloud9, ejecute los siguientes comandos. Supervise el terminal e ingrese «Y» cuando se le solicite eliminar las stacks cdk-mwaa-s3, cdk-mwaa-vpc y cdk-mwaa-iam
cd ~/environment/amazon-mwaa-workflow-demo
cdk destroy --all
Conclusión
Con esta solución se observa la flexibilidad y capacidad de orquestación de Apache Airflow y Amazon MWAA para integrarse directamente y coordinar los servicios de almacenamiento, procesamiento de datos, y analítica de la plataforma de AWS que apalancan la estrategia de Lagos de datos (Data Lake) y Lake House.
Sobre el autor
Gabriel Paredes es Arquitecto de Soluciones en Amazon Web Services para Sector Público. Gabriel ayuda a múltiples instituciones de educación en Latinoamérica en la adopción tecnológica y mejora de sus servicios estudiantiles.
Revisores Técnicos
Cristian Castellanos es Arquitecto de Soluciones en Amazon Web Services para el Sector Público. Cristian ha ayudado a múltiples NPOs, instituciones educativas, y de gobierno en la adopción de nuevas tecnologías e implementación de soluciones de analítica.
Gabriel Gasca es Arquitecto de soluciones en Sector Público, en Amazon Web Services México donde colabora con organizaciones sin fines de lucro, gubernamentales, educativas y de salud a solucionar los desafios que enfrentan a través de la nube.