O blog da AWS

Construa e orquestre pipelines ETL usando Amazon Athena e AWS Step Functions

Por Behram Irani, Sr Analytics Solutions Architect,
Dipankar Kushari, Sr Analytics Solutions Architect e
Rahul Sonawane, Principal Analytics Solutions Architect

Extrair, transformar e carregar (do inglês Extract, transform and Load – ETL) é o processo de ler dados de origem, aplicar regras de transformação a esses dados e carregá-los nas estruturas de destino. O ETL é realizado por vários motivos. Às vezes, o ETL ajuda a alinhar os dados de origem às estruturas de dados de destino, enquanto outras vezes o ETL é feito para derivar valor ao conjunto de dados limpando, padronizando, combinando, agregando e enriquecendo. Você pode executar ETL de várias maneiras e as opções mais populares são:

  • ETL de forma programática usando o Apache Spark. O Amazon EMR e o AWS Glue oferecem suporte a esse modelo.
  • ETL com SQL usando Apache Hive ou PrestoDB/Trino. O Amazon EMR oferece suporte a essas duas ferramentas.
  • ETL com ferramentas de terceiros.

Muitas empresas preferem a opção ETL com SQL porque já têm desenvolvedores que entendem e escrevem consultas SQL. No entanto, esses desenvolvedores querem se concentrar em escrever consultas e não se preocupar em configurar e gerenciar a infraestrutura a ser utilizada.

O Amazon Athena é um serviço de consulta interativo que facilita a análise de dados no Amazon Simple Storage Service (Amazon S3) usando SQL padrão. O Athena é serverless, portanto, não há infraestrutura para gerenciar, e você paga apenas pelas consultas executadas.

Este artigo explora como você pode usar o Athena para criar pipelines de ETL e como orquestrar esses pipelines usando o AWS Step Functions.

Visão geral da arquitetura

O diagrama a seguir ilustra nossa arquitetura.

Os dados de origem são ingeridos primeiro em um bucket do S3, que preserva os dados como estão. Você pode ingerir esses dados no Amazon S3 de várias maneiras:

Depois que os dados de origem estiverem no Amazon S3 e supondo que eles tenham uma estrutura fixa, você poderá executar um crawler do AWS Glue para gerar automaticamente o schema ou fornecer o DDL como parte do pipeline de ETL. Um crawler do AWS Glue é o principal método usado pela maioria dos usuários do AWS Glue. Você pode usar um crawler para preencher o catálogo de dados do AWS Glue com tabelas. Um crawler pode rastrear vários armazenamentos de dados em uma única execução. Após a conclusão, o crawler cria ou atualiza uma ou mais tabelas no seu Catálogo de Dados. O Athena usa esse catálogo para executar consultas nas tabelas.
Depois que os dados brutos são catalogados, a transformação da origem para o destino é feita por meio de uma série de instruções Athena Create Table as Select (CTAS) e INSERT INTO. Os dados transformados são carregados em outro bucket do S3. Os arquivos também são particionados e convertidos no formato Parquet para otimizar o desempenho e o custo.

Preparando os dados

Para este artigo, usamos o conjunto de dados públicos de táxi de Nova York. Ele tem os dados de viagens feitas por táxis e veículos de aluguel na cidade de Nova York organizados em arquivos CSV por cada mês do ano a partir de 2009. Para nosso pipeline ETL, usamos dois arquivos contendo dados de táxi: um para demonstrar a criação inicial da tabela e carregar essa tabela usando CTAS, e o outro para demonstrar as inserções de dados em andamento nessa tabela usando a instrução INSERT INTO. Também usamos um arquivo de pesquisa para demonstrar união, transformação e agregação nesse pipeline de ETL.

  1. Crie um novo bucket do S3 com um nome exclusivo em sua conta.

Vamos utilizar esse bucket para copiar os dados brutos do conjunto de dados públicos de táxi de Nova York e armazenar os dados processados pelo Athena ETL.

  1. Crie os prefixos S3 athena, nyctaxidata/data, nyctaxidata/lookup, nyctaxidata/optimized-data e nyctaxidata/optimized-data-lookup dentro deste bucket recém-criado.

Esses prefixos são usados no código Step Functions fornecido posteriormente neste artigo.

  1. Copie os arquivos de dados de táxi do bucket público nyc-tlc descrito no registro do conjunto de dados público de táxi de Nova York para janeiro e fevereiro de 2020 no prefixo nyctaxidata/data do bucket S3 que você criou em sua conta.
  2. Copie o arquivo de pesquisa no prefixo nyctaxidata/lookup que você criou.

Crie um pipeline de ETL usando a integração do Athena com o Step Functions

O Step Functions é um serviço de fluxo de trabalho visual low-code usado para orquestrar serviços da AWS, automatizar processos de negócios e criar máquina de estado serveless. Por meio de sua interface visual, você pode criar e executar uma série de fluxos de trabalho com pontos de verificação e orientados a eventos que mantêm o estado do seu fluxo. A saída de uma etapa atua como uma entrada para a próxima. Cada etapa do seu fluxo é executada em ordem, conforme definido pela sua lógica de negócios.

A integração do serviço Step Functions com o Athena permite que você use o Step Functions para iniciar e interromper execuções de consultas e obter resultados de consultas.

Para o pipeline ETL neste post, mantemos o fluxo simples. No entanto, você pode criar um fluxo complexo usando diferentes recursos do Step Functions.

O fluxo do pipeline é o seguinte:

  1. Crie um banco de dados se ele ainda não existir no Catálogo de Dados. O Athena, por padrão, usa o Catálogo de Dados como seu metastore.
  2. Se não houver tabelas neste banco de dados, execute as seguintes ações:
    • a. Crie a tabela para os dados brutos do táxi amarelo e a tabela bruta para os dados de pesquisa.
    • b. Use o CTAS para criar as tabelas de destino e use as tabelas brutas criadas na etapa anterior como entrada na instrução SELECT. O CTAS também particiona a tabela de destino por ano e mês e cria arquivos Parquet otimizados no bucket do S3 de destino.
    • c. Use uma exibição para demonstrar as partes de junção e agregação do ETL.
  3. Se houver alguma tabela nesse banco de dados, itere a lista de todos os arquivos CSV restantes e processe usando a instrução INSERT INTO.

Casos de uso diferentes podem tornar o pipeline de ETL bastante complexo. Você pode estar obtendo dados contínuos da origem com o AWS DMS no modo batch ou CDC ou pelo Kinesis no modo de streaming. Isso requer mecanismos para processar todos esses arquivos durante uma janela específica e marcá-lo como concluído para que, na próxima vez que o pipeline for executado, ele processe apenas os arquivos recém-chegados. Em vez de adicionar DDL manualmente no pipeline, você pode adicionar etapas do crawler do AWS Glue no pipeline do Step Functions para criar um schema para os dados brutos. Em vez de uma visualização para agregar dados, talvez seja necessário criar uma tabela separada para manter os resultados prontos para consumo. Além disso, muitos casos de uso obtêm dados de alteração como parte do feed, que precisam ser mesclados com os conjuntos de dados de destino. Etapas extras no pipeline Step Functions são necessárias para processar esses dados caso a caso.

O código a seguir para o pipeline Step Functions cobre o fluxo anterior que descrevemos. Para obter mais detalhes sobre como começar a usar o Step Functions, consulte a documentação. Substitua os nomes do bucket do S3 pelo nome exclusivo do bucket que você criou na sua conta.

{
 "Comment": "Um exemplo de como usar o Athena para consultar registros, obter resultados de consultas e enviar resultados por meio de notificação.", "StartAt": "Create Glue DB", "States": { "Create Glue DB": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "CREATE DATABASE if not exists nyctaxidb", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Run Table Lookup" }, "Run Table Lookup": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "show tables in nyctaxidb", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Get lookup query results" }, "Get lookup query results": { "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "Type": "Task", "Next": "ChoiceStateFirstRun" }, "ChoiceStateFirstRun": { "Comment": "Based on the input table name, a choice is made for moving to the next step.", "Type": "Choice", "Choices": [ { "Not": { "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue", "IsPresent": true }, "Next": "Run Create data Table Query" }, { "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue", "IsPresent": true, "Next": "Check All Tables" } ], "Default": "Check All Tables" }, "Run Create data Table Query": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.yellowtaxi_data_csv( vendorid bigint, tpep_pickup_datetime string, tpep_dropoff_datetime string, passenger_count bigint, trip_distance double, ratecodeid bigint, store_and_fwd_flag string, pulocationid bigint, dolocationid bigint, payment_type bigint, fare_amount double, extra double, mta_tax double, tip_amount double, tolls_amount double, improvement_surcharge double, total_amount double, congestion_surcharge double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://MY-BUCKET/nyctaxidata/data/' TBLPROPERTIES ( 'skip.header.line.count'='1')", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Run Create lookup Table Query" }, "Run Create lookup Table Query": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.nyctaxi_lookup_csv( locationid bigint, borough string, zone string, service_zone string, latitude double, longitude double)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION 's3://MY-BUCKET/nyctaxidata/lookup/' TBLPROPERTIES ( 'skip.header.line.count'='1')", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Run Create Parquet data Table Query" }, "Run Create Parquet data Table Query": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "CREATE table if not exists nyctaxidb.yellowtaxi_data_parquet WITH (format='PARQUET',parquet_compression='SNAPPY',partitioned_by=array['pickup_year','pickup_month'],external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data/') AS SELECT vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '01'", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Run Create Parquet lookup Table Query" }, "Run Create Parquet lookup Table Query": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "CREATE table if not exists nyctaxidb.nyctaxi_lookup_parquet WITH (format='PARQUET',parquet_compression='SNAPPY', external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data-lookup/') AS SELECT locationid, borough, zone , service_zone , latitude ,longitude FROM nyctaxidb.nyctaxi_lookup_csv", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "Next": "Run Create View" }, "Run Create View": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "create or replace view nyctaxidb.yellowtaxi_data_vw as select a.*,lkup.* from (select datatab.pulocationid pickup_location ,pickup_month, pickup_year, sum(cast(datatab.total_amount AS decimal(10, 2))) AS sum_fare , sum(cast(datatab.trip_distance AS decimal(10, 2))) AS sum_trip_distance , count(*) AS countrec FROM nyctaxidb.yellowtaxi_data_parquet datatab WHERE datatab.pulocationid is NOT null GROUP BY datatab.pulocationid, pickup_month, pickup_year) a , nyctaxidb.nyctaxi_lookup_parquet lkup where lkup.locationid = a.pickup_location", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "End": true }, "Check All Tables": { "Type": "Map", "InputPath": "$.ResultSet", "ItemsPath": "$.Rows", "MaxConcurrency": 0, "Iterator": { "StartAt": "CheckTable", "States": { "CheckTable": { "Type": "Choice", "Choices": [ { "Variable": "$.Data[0].VarCharValue", "StringMatches": "*data_csv", "Next": "passstep" }, { "Variable": "$.Data[0].VarCharValue", "StringMatches": "*data_parquet", "Next": "Insert New Parquet Data" } ], "Default": "passstep" }, "Insert New Parquet Data": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "INSERT INTO nyctaxidb.yellowtaxi_data_parquet select vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '02'", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://MY-BUCKET/athena/" } }, "Type": "Task", "End": true }, "passstep": { "Type": "Pass", "Result": "NA", "End": true } } }, "End": true } } }

Na primeira vez que executamos esse pipeline, ele segue o caminho do CTAS e cria a visualização de agregação.

Na segunda vez que o executamos, ele segue o caminho da instrução INSERT INTO para adicionar novos dados às tabelas existentes.

Quando usar esse padrão

Você deve usar esse padrão quando os dados brutos estiverem estruturados e os metadados puderem ser facilmente adicionados ao catálogo.
Como as cobranças do Athena são calculadas pela quantidade de dados varridos, esse padrão é mais adequado para conjuntos de dados que não são muito grandes e precisam de processamento contínuo.
Esse padrão é mais adequado para converter dados brutos em formatos colunares, como Parquet ou ORC, e agregar um grande número de arquivos pequenos em arquivos maiores ou particionar e agrupar seus conjuntos de dados.

Conclusão

Neste post, mostramos como usar o Step Functions para orquestrar um pipeline ETL no Athena usando instruções CTAS e INSERT INTO.
Como próximas etapas para aprimorar esse pipeline, considere o seguinte:

  • Crie um pipeline de ingestão que coloque dados continuamente no bucket bruto do S3 em intervalos regulares
  • Adicione uma etapa do crawler do AWS Glue no pipeline para criar automaticamente o schema bruto
  • Adicione etapas extras para identificar dados de alteração e mesclar esses dados com o destino
  • Adicionar mecanismos de tratamento e notificação de erros no pipeline
  • Agende o pipeline usando o Amazon EventBridge para ser executado em intervalos regulares

Este artigo foi traduzido do Blog da AWS em Inglês.


Sobre os autores

Behram Irani, Sr Analytics Solutions Architect

 

 

 

 

Dipankar Kushari, Sr Analytics Solutions Architect

 

 

 

 

Rahul Sonawane, Principal Analytics Solutions Architect

 

 

 

Tradutora

Erika Nagamine é Technical Trainer na AWS