Блог Amazon Web Services

Представляем Amazon Managed Workflows for Apache Airflow (MWAA)

Оригинал статьи: ссылка (Danilo Poccia, Chief Evangelist (EMEA))

По мере увеличения объема и сложности конвейеров обработки данных вы можете упростить весь процесс, разложив его на ряд более мелких задач и скоординировав выполнение этих задач в рамках рабочего процесса (workflow). Для этого многие разработчики и инженеры по работе с данными используют Apache Airflow, платформу, созданную сообществом для программного создания, планирования и мониторинга рабочих процессов. С помощью Airflow вы можете управлять рабочими процессами как скриптами, мониторить их через пользовательский интерфейс (UI), а также расширять их функциональность с помощью набора мощных плагинов. Однако, ручная установка, поддержка и масштабирование Airflow, а так же управление безопасностью, аутентификацией и авторизацией для его пользователей, занимает много времени, которое многие бы предпочли использовать, чтобы сконцентрироваться на решении актуальных бизнес-задач.

По этим причинам я рад сообщить о доступности Amazon Managed Workflows for Apache Airflow (MWAA), полностью управляемого сервиса, который упрощает запуск версий Apache Airflow с открытым исходным кодом на AWS, а также создание рабочих процессов для выполнения ваших задач извлечения-трансформации-загрузки (ETL) и конвейеров данных.

Рабочие процессы Airflow получают входные данные из таких источников, как Amazon Simple Storage Service (S3), используя запросы Amazon Athena, выполняют преобразования на кластерах Amazon EMR, а также могут использовать полученные данные для обучения моделей машинного обучения на Amazon SageMaker. Рабочие процессы в Airflow создаются автоматически как ориентированные ациклические графы (Directed Acyclic Graphs, DAG) с использованием языка программирования Python.

Ключевым преимуществом Airflow является его открытая расширяемость с помощью плагинов, которые позволяют создавать задачи, взаимодействующие с AWS или ресурсами локального дата центра, необходимыми для рабочих процессов, включая AWS Batch, Amazon CloudWatch, Amazon DynamoDB, AWS DataSync, Amazon ECS и AWS Fargate, Amazon Elastic Kubernetes Service (EKS), Amazon Kinesis Firehose, AWS Glue, AWS Lambda, Amazon Redshift, Amazon Simple Queue Service (SQS), а также Amazon Simple Notification Service (SNS).

Для улучшения мониторинга процессов метрики Airflow могут быть опубликованы как CloudWatch Metrics, а логи могут быть отправлены в CloudWatch Logs. Amazon MWAA по умолчанию предоставляет возможность автоматической установки минорных версий и патчей, с возможностью назначить временное окно для технического обслуживания, в рамках которого эта установка будет выполняться.

Вы можете использовать Amazon MWAA, выполнив следующие три шага:

  1. Создайте окружение – Каждое окружение содержит ваш кластер Airflow, включая планировщик, рабочие узлы и веб-сервер.
  2. Загрузите ваши DAG и плагины на S3 – Amazon MWAA автоматически загрузит код в Airflow.
  3. Запустите ваши DAG в Airflow – Запустите ваши DAG из пользовательского интерфейса Airflow или интерфейса командной строки (CLI) и контролируйте ваше окружение с помощью CloudWatch.

Давайте посмотрим, как это работает на практике!

Как создать окружение Airflow с помощью Amazon MWAA

Я нажимаю кнопку Create environment в консоли Amazon MWAA. Я даю окружению название и выбираю версию Airflow для дальнейшего использования.

Затем я выбираю бакет S3 и папку для загрузки кода DAG. Название бакета должно начинаться с airflow-.

Опционально, я могу указать файл плагинов и файл требований:

  • Файл плагинов представляет собой ZIP-файл, содержащий плагины, используемые моими DAG.
  • Файл требований описывает зависимости Python для запуска моих DAG.

Для файлов плагинов и требований я могу выбрать версию объекта S3. В случае, если плагины или требования, которые я использую, приведут к неустранимой ошибке в моем окружении, Amazon MWAA автоматически откатится к предыдущей рабочей версии.

Я нажимаю Next, чтобы настроить расширенные параметры, начиная с сети. Каждое окружение работает в Amazon VPC с использованием приватных подсетей в двух зонах доступности. Доступ через веб-сервер к пользовательскому интерфейсу Airflow всегда защищен безопасным входом с использованием AWS Identity and Access Management (IAM). Однако вы можете выбрать доступ через веб-сервер в публичной сети, чтобы иметь возможность войти в систему через интернет или в приватной сети в вашем VPC. Для простоты я выбираю публичную сеть (Public network). Я разрешаю Amazon MWAA создать новую Security Group с правильными входящими и исходящими правилами. Дополнительно я могу добавить одну или несколько существующих Security Group для более точного управления входящим и исходящим трафиком для вашего окружения.

Теперь я настраиваю класс своего окружения (environment class). Каждое окружение включает в себя планировщик, веб-сервер и один или несколько рабочих узлов. Количество рабочих узлов автоматически масштабируется в зависимости от моей рабочей нагрузки. Мы предоставляем вам рекомендации по тому, какой класс использовать, исходя из количества групп DAG, но вы можете мониторить нагрузку на ваше окружение и изменять его класс в любое время.

Шифрование всегда включено для хранимых данных, и хотя я могу выбрать индивидуальный ключ шифрования, управляемый сервисом AWS Key Management Service (KMS), я предпочту ключ шифрования по умолчанию, который AWS создаёт и поддерживает специально для меня.

Я публикую метрики производительности окружения в CloudWatch Metrics для мониторинга. Это включено по умолчанию, но я могу отключить CloudWatch Metrics после запуска. Что касается логирования, то я могу указать его детализацию и то, какие компоненты Airflow должны отправлять свои логи в CloudWatch Logs. Я оставляю конфигурацию по умолчанию, в которой включены только логи задач и используется уровень детализации INFO.

Я также могу изменить настройки конфигурации Airflow по умолчанию, такие как default_task_retries или worker_concurrency. На данный момент, я не буду этого делать.

Наконец, что самое важное, я настраиваю права доступа, которые будут использоваться моим окружением для доступа к моим DAG, для сохранения логов и запуска DAG с доступом к другим ресурсам AWS. Я выбираю опцию Create a new role и нажимаю кнопку Create environment. Через несколько минут новое окружение Airflow будет готово к использованию.

Использование интерфейса Airflow

В консоли Amazon MWAA я нахожу новое окружение, которое только что создал, и нажимаю на Open Airflow UI. Открывается новое окно браузера, и я аутентифицируюсь с помощью безопасного входа в систему через AWS IAM.

Там я нахожу DAG, который я загрузил на S3 в файле movie_list_dag.py. DAG загружает набор данных MovieLens, обрабатывает файлы на S3, используя Amazon Athena, и загружает результат в кластер Redshift, создавая таблицу, если она отсутствует.

Вот полный исходный код DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='files/'
redshift_cluster='my-redshift-cluster'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/latest/'
download_http='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies (
  `movieId` int,
  `title` string,
  `genres` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/movies.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_ratings_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings (
  `userId` int,
  `movieId` int,
  `rating` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/ratings.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_tags_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags (
  `userId` int,
  `movieId` int,
  `tag` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://my-bucket/files/ml-latest-small/tags.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
join_tables_athena_query="""
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM demo_athena_db.ML_Latest_Small_Movies m
INNER JOIN (SELECT rating, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE rating > 4) r on m.movieId = r.movieId
"""
def download_zip():
    s3c = boto3.client('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.read(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Body=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.client('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Body=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:role/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title	character varying, rating	int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False 
}
with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",query=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",query=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",query=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",query=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

В коде создаются различные задачи с использованием таких операторов, как PythonOperator для общего кода Python, или AWSAthenaOperator для использования интеграции с Amazon Athena. Чтобы увидеть, как эти задачи связаны в рабочем процессе, вы можете посмотреть последние несколько строк, которые я повторю здесь (без отступов) для простоты:

check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

Код Airflow перегружает оператор правого сдвига >> в Python, чтобы создать зависимость между задачами, то есть сначала должна быть выполнена задача слева и её результат должен быть передан в задачу справа. Такой код достаточно легко читается. Каждая из четырех вышеперечисленных строк добавляет зависимости между задачами, поэтому все они выполняются в правильном порядке.

В консоли Airflow я вижу визуализацию графа задач DAG, чтобы иметь четкое представление о том, как выполняются задачи:

Функциональность уже доступна для использования 

Amazon Managed Workflows for Apache Airflow (MWAA) доступен для использования в регионах US East (Northern Virginia), US West (Oregon), US East (Ohio), Asia Pacific (Singapore), Asia Pacific (Tokyo), Asia Pacific (Sydney), Europe (Ireland), Europe (Frankfurt) и Europe (Stockholm). Вы можете запустить новую среду Amazon MWAA с помощью консоли управления AWS, интерфейса командной строки AWS (CLI) или AWS SDK. После этого, вы можете разрабатывать рабочие процессы на Python, используя экосистему интеграции Airflow.

Используя Amazon MWAA, вы платите в зависимости от класса окружения и используемых вами рабочих узлов. Для получения дополнительной информации см. страницу с ценами.

Совместимость с основной веткой разработки является одним из основных принципов Amazon MWAA. Наши изменения в коде платформы AirFlow отправляются в основную ветку разработки с открытым исходным кодом.

Используя Amazon MWAA, вы можете тратить больше времени на построение рабочих процессов для ваших задач по обработке данных, а также меньше времени на управление и масштабирование инфраструктуры вашей платформы Airflow.

Узнайте больше об Amazon MWAA и начните работать с ним уже сегодня!