O blog da AWS

Apresentando Amazon Managed Workflows para Apache Airflow (MWAA)

Por Danilo Poccia, Chief Evangelist AWS Londres

 

À medida que o volume e a complexidade dos pipelines de processamento de dados aumentam, é possível simplificar o processo geral decompondo-o em uma série de tarefas menores e coordenar a execução dessas tarefas como parte de um workflow. Para isso, muitos desenvolvedores e engenheiros de dados usam o Apache Airflow, uma plataforma criada pela comunidade para criar, programar e monitorar workflows programaticamente. Com o Airflow, você pode gerenciar workflows como scripts, monitorá-los através da interface do usuário (UI) e estender sua funcionalidade através de um conjunto de plug-ins poderosos. No entanto, instalar, manter e dimensionar manualmente o Airflow e, ao mesmo tempo, lidar com segurança, autenticação e autorização para seus usuários toma muito tempo, que poderia ser aproveitado na solução de problemas reais de negócios.

Por esses motivos, tenho o prazer de anunciar a disponibilidade do Amazon Managed Workflows para Apache Airflow (MWAA), um serviço totalmente gerenciado que facilita a execução de versões de código aberto do Apache Airflow na AWS e a criação de workflows para executar trabalhos de extract-transform-load (ETL) e pipelines de dados.

Os workflows do Airflow recuperam a entrada de fontes como o Amazon Simple Storage Service (S3) usando consultas ao Amazon Athena, realizam transformações em clusters Amazon EMR e podem usar os dados resultantes para treinar modelos de machine learning no Amazon SageMaker. Os workflows do Airflow são criados como Directed Acyclic Graphs (DAGs) usando a linguagem de programação Python.

Um dos principais benefícios do Airflow é sua extensibilidade por meio de plug-ins, que permite criar tarefas que interagem com a AWS ou recursos locais necessários para seus workflows, incluindo o AWS Batch, Amazon CloudWatch, Amazon DynamoDB, AWS DataSync, Amazon ECS e AWS Fargate, Amazon Elastic Kubernetes Service (EKS), Amazon Kinesis Firehose, AWS Glue, AWS Lambda, Amazon Redshift, Amazon Simple Queue Service (SQS)e Amazon Simple Notification Service (SNS).

Para melhorar a observabilidade, as métricas do Airflow podem ser publicadas como métricas do CloudWatch e os logs podem ser enviados para o CloudWatch Logs. O Amazon MWAA fornece atualizações e patches automáticos de versão secundária por padrão, com a opção de definir uma janela de manutenção na qual essas atualizações serão executadas.

Você pode usar o Amazon MWAA com estas três etapas:

  1. Criar um ambiente — Cada ambiente contém seu cluster de Airflow, incluindo seu agendador, workers e servidor web.
  2. Faça upload de seus DAGs e plugins para o S3 — O Amazon MWAA carrega o código no Airflow automaticamente.
  3. Execute seus DAGs no Airflow — Execute seus DAGs a partir da interface do usuário do Airflow ou da interface de linha de comando (CLI) e monitore seu ambiente com o CloudWatch.

Vamos ver como isso funciona na prática!

 

Como criar um ambiente de Airflow usando o Amazon MWAA

Na console do Amazon MWAA, clique em Create environment. Dê um nome ao ambiente e selecione a versão do Airflow a ser usada.

 

 

Em seguida, selecione o bucket do S3 e a pasta para carregar o código DAG. O nome do bucket precisa começar com airflow-.

Opcionalmente, é possível especificar um arquivo de plug-ins e um arquivo de requisitos:

  • O arquivo plug-ins é um arquivo ZIP contendo os plug-ins usados pelos DAGs.
  • O arquivo de requisitos descreve as dependências do Python para executar os DAGs.

Para plug-ins e requisitos, selecione a versão dos objetos em S3 a serem utilizadas. Caso os plug-ins ou os requisitos que foram selecionados criem um erro não recuperável no ambiente, o Amazon MWAA reverterá automaticamente para a versão de trabalho anterior.

 

Clique em Next para definir as configurações avançadas, começando pela rede. Cada ambiente é executado em uma Amazon Virtual Private Cloud usando sub-redes privadas em duas zonas de disponibilidade. O acesso ao servidor Web da UI do Airflow é sempre protegido por um login seguro usando o AWS Identity and Access Management (IAM). Você pode optar por ter acesso ao servidor Web em uma rede pública para que você possa fazer login pela Internet ou em uma rede privada em sua VPC. Por simplicidade, foi selecionado uma rede pública. Deixe que o Amazon MWAA crie um novo security group com as regras de entrada e saída corretas. Opcionalmente, um ou mais security groups podem ser adicionados aos existentes para ajustar o controle do tráfego de entrada e saída para o seu ambiente.

 

 

Agora, configure a classe de ambiente. Cada ambiente inclui um agendador, um servidor Web e um worker. Os workers escalam automaticamente para cima e para baixo de acordo com a carga de trabalho. É fornecida uma sugestão sobre qual classe usar com base no número de DAGs, mas você pode monitorar a carga em seu ambiente e modificar sua classe a qualquer momento.

 

 

A criptografia é sempre ativada para dados em repouso e, embora seja possível selecionar uma chave personalizada gerenciada pelo AWS Key Management Service (KMS), você pode manter a chave padrão que a AWS possui e faz o gerenciamento por você.

 

 

Para monitoramento, é possível publicar o desempenho do ambiente no CloudWatch Metrics. Para os logs, especifique o nível de log e quais componentes do Airflow devem enviar seus logs para o CloudWatch Logs. Deixe o padrão para enviar apenas os logs de tarefas e usar o nível de log INFO.

 

 

É possível modificar a configuração padrão do Airflow em configuration options, como default_task_retries ou worker_concurrency. Por hora, não mude esses valores.

 

 

Finalmente, mas o mais importante, configure as permissões que serão utilizadas pelo ambiente para acessar meus DAGs, gravar logs e executar DAGs acessando outros recursos da AWS. Selecione Create a new role e clique em Create environment. Após alguns minutos, o novo ambiente de Airflow está pronto para ser usado.

 

 

 

Usando a interface do usuário do Airflow

Na console do Amazon MWAA, procure o novo ambiente criado e clique em Open Airflow UI. Uma nova janela do navegador é criada e você é autenticado com um login seguro via AWS IAM.

Os DAGs que foram enviados ao S3 na pasta “DAGs folder” aparecerão na interface do Airflow. Você pode utilizar o exemplo de DAG abaixo, que baixa um conjunto de dados do MovieLens, processa os arquivos no S3 usando o Amazon Athena e carrega o resultado em um cluster do Redshift, criando uma tabela se necessário.

Segue o código-fonte completo do DAG:

Python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='files/'
redshift_cluster='my-redshift-cluster'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/latest/'
download_http='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query=""" CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies ( `movieId` int, `title` string, `genres` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ',' ) LOCATION 's3://my-bucket/files/ml-latest-small/movies.csv/ml-latest-small/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); """
create_athena_ratings_table_query=""" CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings ( `userId` int, `movieId` int, `rating` int, `timestamp` bigint ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ',' ) LOCATION 's3://my-bucket/files/ml-latest-small/ratings.csv/ml-latest-small/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); """
create_athena_tags_table_query=""" CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags ( `userId` int, `movieId` int, `tag` int, `timestamp` bigint ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ',' ) LOCATION 's3://my-bucket/files/ml-latest-small/tags.csv/ml-latest-small/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); """
join_tables_athena_query=""" SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating FROM demo_athena_db.ML_Latest_Small_Movies m INNER JOIN (SELECT rating, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE rating > 4) r on m.movieId = r.movieId """
def download_zip():
    s3c = boto3.client('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.read(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Body=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.client('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Body=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:role/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title character varying, rating int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False 
}
with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",query=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",query=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",query=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",query=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

 

No código, diferentes tarefas são criadas usando operadores como PythonOperator, para código Python genérico, ou AWSAthenaOperator, para usar a integração com o Amazon Athena. Para ver como essas tarefas estão conectadas no workflow, você pode ver as últimas linhas, que repito aqui (sem indentação) para simplificar:

 

Python
check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

 

O código do Airflow está fazendo overloading do operador >> no Python para criar uma dependência, o que significa que a tarefa à esquerda deve ser executada primeiro, e a saída é passada para a tarefa à direita. O código é de fácil leitura. Cada uma das quatro linhas acima está adicionando dependências, e todas elas são avaliadas juntas para executar as tarefas na ordem correta.

No console do Airflow, é possível ter uma visualização gráfica do DAG para ter uma representação clara de como as tarefas serão executadas:

 

 

Disponível agora

Amazon Managed Workflows para Apache Airflow (MWAA) está disponível hoje em Leste dos EUA (Norte da Virgínia), Oeste dos EUA (Oregon), Leste dos EUA (Ohio), Ásia-Pacífico (Cingapura), Ásia-Pacífico (Tóquio), Ásia-Pacífico (Sydney), Europa (Irlanda), Europa (Frankfurt) e Europa (Estocolmo). Você pode criar um novo ambiente do Amazon MWAA a partir da console, da AWS Command Line Interface (CLI) ou AWS SDKs. Em seguida, você pode desenvolver workflows em Python usando o ecossistema de integrações do Airflow.

Com o Amazon MWAA, você paga com base na classe de ambiente e nos workers que você utiliza. Para obter mais informações, consulte a página de preços.

A compatibilidade upstream é um princípio fundamental do Amazon MWAA. Nossas alterações de código na plataforma AirFlow são liberadas de volta ao open source.

Com o Amazon MWAA, você pode focar seu tempo criando workflows para suas tarefas de engenharia e ciência de dados e menos tempo gerenciando e dimensionando a infraestrutura de sua plataforma Airflow.

Saiba mais sobre o Amazon MWAA e comece hoje mesmo!

 

Este artigo foi traduzido do Blog da AWS em Inglês.


Sobre o autor

Danilo Poccia é Chief Evangelist na AWS Londres.

Sobre o tradutor

Daniel Bento é Analytics Solutions Architect na AWS.

 

 

 

 

Use seus dados para impulsionar o crescimento do negócio. Inove continuamente usando o data flywheel.