AWS Big Data Blog

Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions

Extract, transform, and load (ETL) is the process of reading source data, applying transformation rules to this data, and loading it into the target structures. ETL is performed for various reasons. Sometimes ETL helps align source data to target data structures, whereas other times ETL is done to derive business value by cleansing, standardizing, combining, aggregating, and enriching datasets. You can perform ETL in multiple ways; the most popular choices being:

  • Programmatic ETL using Apache Spark. Amazon EMR and AWS Glue both support this model.
  • SQL ETL using Apache Hive or PrestoDB/Trino. Amazon EMR supports both these tools.
  • Third-party ETL products.

Many organizations prefer the SQL ETL option because they already have developers who understand and write SQL queries. However, these developers want to focus on writing queries and not worry about setting up and managing the underlying infrastructure.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

This post explores how you can use Athena to create ETL pipelines and how you can orchestrate these pipelines using AWS Step Functions.

Architecture overview

The following diagram illustrates our architecture.

The source data first gets ingested into an S3 bucket, which preserves the data as is. You can ingest this data in Amazon S3 multiple ways:

After the source data is in Amazon S3 and assuming that it has a fixed structure, you can either run an AWS Glue crawler to automatically generate the schema or you can provide the DDL as part of your ETL pipeline. An AWS Glue crawler is the primary method used by most AWS Glue users. You can use a crawler to populate the AWS Glue Data Catalog with tables. A crawler can crawl multiple data stores in a single run. Upon completion, the crawler creates or updates one or more tables in your Data Catalog. Athena uses this catalog to run queries against the tables.

After the raw data is cataloged, the source-to-target transformation is done through a series of Athena Create Table as Select (CTAS) and INSERT INTO statements. The transformed data is loaded into another S3 bucket. The files are also partitioned and converted into Parquet format to optimize performance and cost.

Prepare your data

For this post, we use the NYC taxi public dataset. It has the data of trips taken by taxis and for-hire vehicles in New York City organized in CSV files by each month of the year starting from 2009. For our ETL pipeline, we use two files containing yellow taxi data: one for demonstrating the initial table creation and loading this table using CTAS, and the other for demonstrating the ongoing data inserts into this table using the INSERT INTO statement. We also use a lookup file to demonstrate join, transformation, and aggregation in this ETL pipeline.

  1. Create a new S3 bucket with a unique name in your account.

You use this bucket to copy the raw data from the NYC taxi public dataset and store the data processed by Athena ETL.

  1. Create the S3 prefixes athena, nyctaxidata/data, nyctaxidata/lookup, nyctaxidata/optimized-data, and nyctaxidata/optimized-data-lookup inside this newly created bucket.

These prefixes are used in the Step Functions code provided later in this post.

  1. Copy the yellow taxi data files from the nyc-tlc public bucket described in the NYC taxi public dataset registry for January and February 2020 into the nyctaxidata/data prefix of the S3 bucket you created in your account.
  2. Copy the lookup file into the nyctaxidata/lookup prefix you created.

Create an ETL pipeline using Athena integration with Step Functions

Step Functions is a low-code visual workflow service used to orchestrate AWS services, automate business processes, and build serverless applications. Through its visual interface, you can create and run a series of checkpointed and event-driven workflows that maintain the application state. The output of one step acts as an input to the next. Each step in your application runs in order, as defined by your business logic.

The Step Functions service integration with Athena enables you to use Step Functions to start and stop query runs, and get query results.

For the ETL pipeline in this post, we keep the flow simple; however, you can build a complex flow using different features of Step Functions.

The flow of the pipeline is as follows:

  1. Create a database if it doesn’t already exist in the Data Catalog. Athena by default uses the Data Catalog as its metastore.
  2. If no tables exist in this database, take the following actions:
    1. Create the table for the raw yellow taxi data and the raw table for the lookup data.
    2. Use CTAS to create the target tables and use the raw tables created in the previous step as input in the SELECT statement. CTAS also partitions the target table by year and month, and creates optimized Parquet files in the target S3 bucket.
    3. Use a view to demonstrate the join and aggression parts of ETL.
  3. If any table exists in this database, iterate though the list of all the remaining CSV files and process by using the INSERT INTO statement.

Different use cases may make the ETL pipeline quite complex. You may be getting continuous data from the source either with AWS DMS in batch or CDC mode or by Kinesis in streaming mode. This requires mechanisms in place to process all such files during a particular window and mark it as complete so that the next time the pipeline is run, it processes only the newly arrived files. Instead of manually adding DDL in the pipeline, you can add AWS Glue crawler steps in the Step Functions pipeline to create a schema for the raw data; and instead of a view to aggregate data, you may have to create a separate table to keep the results ready for consumption. Also, many use cases get change data as part of the feed, which needs to be merged with the target datasets. Extra steps in the Step Functions pipeline are required to process such data on a case-by-case basis.

The following code for the Step Functions pipeline covers the preceding flow we described. For more details on how to get started with Step Functions, refer the tutorials. Replace the S3 bucket names with the unique bucket name you created in your account.

{
  "Comment": "An example of using Athena to query logs, get query results and send results through notification.",
  "StartAt": "Create Glue DB",
  "States": {
    "Create Glue DB": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE DATABASE if not exists nyctaxidb",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Table Lookup"
    },
    "Run Table Lookup": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "show tables in nyctaxidb",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Get lookup query results"
    },
    "Get lookup query results": {
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Parameters": {
        "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
      },
      "Type": "Task",
      "Next": "ChoiceStateFirstRun"
    },
    "ChoiceStateFirstRun": {
      "Comment": "Based on the input table name, a choice is made for moving to the next step.",
      "Type": "Choice",
      "Choices": [
        {
          "Not": {
            "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue",
            "IsPresent": true
          },
          "Next": "Run Create data Table Query"
        },
        {
          "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue",
          "IsPresent": true,
          "Next": "Check All Tables"
        }
      ],
      "Default": "Check All Tables"
    },
    "Run Create data Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.yellowtaxi_data_csv(  vendorid bigint,   tpep_pickup_datetime string,   tpep_dropoff_datetime string,   passenger_count bigint,   trip_distance double,   ratecodeid bigint,   store_and_fwd_flag string,   pulocationid bigint,   dolocationid bigint,   payment_type bigint,   fare_amount double,   extra double,   mta_tax double,   tip_amount double,   tolls_amount double,   improvement_surcharge double,   total_amount double,   congestion_surcharge double) ROW FORMAT DELIMITED   FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT   'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION  's3://MY-BUCKET/nyctaxidata/data/' TBLPROPERTIES (  'skip.header.line.count'='1')",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create lookup Table Query"
    },
    "Run Create lookup Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.nyctaxi_lookup_csv(  locationid bigint,   borough string,   zone string,   service_zone string,   latitude double,   longitude double)ROW FORMAT DELIMITED   FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT   'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION  's3://MY-BUCKET/nyctaxidata/lookup/' TBLPROPERTIES (  'skip.header.line.count'='1')",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create Parquet data Table Query"
    },
    "Run Create Parquet data Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE  table if not exists nyctaxidb.yellowtaxi_data_parquet WITH (format='PARQUET',parquet_compression='SNAPPY',partitioned_by=array['pickup_year','pickup_month'],external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data/') AS SELECT vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '01'",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create Parquet lookup Table Query"
    },
    "Run Create Parquet lookup Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE table if not exists nyctaxidb.nyctaxi_lookup_parquet WITH (format='PARQUET',parquet_compression='SNAPPY', external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data-lookup/') AS SELECT locationid, borough, zone , service_zone , latitude ,longitude  FROM nyctaxidb.nyctaxi_lookup_csv",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create View"
    },
    "Run Create View": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "create or replace view nyctaxidb.yellowtaxi_data_vw as select a.*,lkup.* from (select  datatab.pulocationid pickup_location ,pickup_month, pickup_year, sum(cast(datatab.total_amount AS decimal(10, 2))) AS sum_fare , sum(cast(datatab.trip_distance AS decimal(10, 2))) AS sum_trip_distance , count(*) AS countrec   FROM nyctaxidb.yellowtaxi_data_parquet datatab WHERE datatab.pulocationid is NOT null  GROUP BY  datatab.pulocationid, pickup_month, pickup_year) a , nyctaxidb.nyctaxi_lookup_parquet lkup where lkup.locationid = a.pickup_location",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "End": true
    },
    "Check All Tables": {
      "Type": "Map",
      "InputPath": "$.ResultSet",
      "ItemsPath": "$.Rows",
      "MaxConcurrency": 0,
      "Iterator": {
        "StartAt": "CheckTable",
        "States": {
          "CheckTable": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.Data[0].VarCharValue",
                "StringMatches": "*data_csv",
                "Next": "passstep"
              },
              {
                "Variable": "$.Data[0].VarCharValue",
                "StringMatches": "*data_parquet",
                "Next": "Insert New Parquet Data"
              }
            ],
            "Default": "passstep"
          },
          "Insert New Parquet Data": {
            "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
            "Parameters": {
              "QueryString": "INSERT INTO nyctaxidb.yellowtaxi_data_parquet select vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '02'",
              "WorkGroup": "primary",
              "ResultConfiguration": {
                "OutputLocation": "s3://MY-BUCKET/athena/"
              }
            },
            "Type": "Task",
            "End": true
          },
          "passstep": {
            "Type": "Pass",
            "Result": "NA",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

The first time we run this pipeline, it follows the CTAS path and creates the aggregation view.

The second time we run it, it follows the INSERT INTO statement path to add new data into the existing tables.

When to use this pattern

You should use this pattern when the raw data is structured and the metadata can easily be added to the catalog.

Because Athena charges are calculated by the amount of data scanned, this pattern is best suitable for datasets that aren’t very large and need continuous processing.

The pattern is best suitable to convert raw data into columnar formats like Parquet or ORC, and aggregate a large number of small files into larger files or partition and bucket your datasets.

Conclusion

In this post, we showed how to use Step Functions to orchestrate an ETL pipeline in Athena using CTAS and INSERT INTO statements.

As next steps to enhance this pipeline, consider the following:

  • Create an ingestion pipeline that continuously puts data in the raw S3 bucket at regular intervals
  • Add an AWS Glue crawler step in the pipeline to automatically create the raw schema
  • Add extra steps to identify change data and merge this data with the target
  • Add error handling and notification mechanisms in the pipeline
  • Schedule the pipeline using Amazon EventBridge to run at regular intervals

About the Authors

Behram Irani, Sr Analytics Solutions Architect

Dipankar Kushari, Sr Analytics Solutions Architect

Rahul Sonawane, Principal Analytics Solutions Architect