AWS Big Data Blog

Orchestrate an ETL process using AWS Step Functions for Amazon Redshift

Modern data lakes depend on extract, transform, and load (ETL) operations to convert bulk information into usable data. This post walks through implementing an ETL orchestration process that is loosely coupled using AWS Step Functions, AWS Lambda, and AWS Batch to target an Amazon Redshift cluster.

Because Amazon Redshift uses columnar storage, it is well suited for fast analytical insights using the convenient ANSI SQL queries. You can rapidly scale your Amazon Redshift clusters up and down in minutes to meet the demanding workloads for both your end-user reports and timely data refresh into the data warehouse.

AWS Step Functions makes it easy to develop and use repeatable workflows that scale well. Step Functions lets you build automation workflows from individual Lambda functions. Each function performs a discrete task and lets you develop, test, and modify the components of your workflow quickly and seamlessly.

An ETL process refreshes your data warehouse from source systems, organizing the raw data into a format you can more readily use. Most organizations run ETL as a batch or as part of a real-time ingest process to keep the data warehouse current and provide timely analytics. A fully automated and highly scalable ETL process helps minimize the operational effort that you must invest in managing the regular ETL pipelines. It also ensures the timely and accurate refresh of your data warehouse. You can tailor this process to refresh data into any data warehouse or the data lake.

This post also provides an AWS CloudFormation template that launches the entire sample ETL process in one click to refresh the TPC-DS dataset. Find the template link in the Set up the entire workflow using AWS CloudFormation section.

Architectural overview

The following diagram illustrates the architectural overview of the different components involved in the orchestration of the ETL workflow. This workflow uses Step Functions to fetch source data from Amazon S3 to refresh the Amazon Redshift data warehouse.

Here are the core components of the workflow:

  • Amazon CloudWatch triggers the ETL process based on a schedule, through the AWS CLI, or using the various AWS SDKs in a Lambda function.
  • The ETL workflow uses Step Functions for a multi-step ETL process and manages AWS services into serverless workflows. You can build and easily iterate these using JSON-based templates. For example, a typical ETL process may involve refreshing dimensions first and later refreshing the fact tables. You can declare your order of operations using a Step Functions state machine.
  • A Lambda function lets you build microservices to coordinate job submission and monitoring without needing to write code for workflow logic, parallel processes, error handling, timeouts, or retries.
  • AWS Batch runs several ETL jobs such as transforms and loads into Amazon Redshift. AWS Batch manages all the infrastructure for you, avoiding the complexities of provisioning, managing, monitoring, and scaling your batch computing jobs. It also lets you wait for the jobs to complete.
  • The source data in Amazon S3 refreshes an Amazon Redshift data warehouse through a PL/SQL container. To specify the ETL logic, I use.sql files that contain the SQL code for a particular step. For example, a .sql file for a typical dimension table refresh contains steps to load the data from Amazon S3 to a temporary staging table and INSERT/UPDATE the target table. Before beginning, review a sample dimensional table .sql file.

You can execute the workflow and monitor it using the state machine. You can trigger the ETL according to a schedule or an event (for example, as soon as all the data files arrive in S3).

Prerequisites

Before you get started, create a Docker image that can execute .sql files. AWS Batch creates resources for executing the ETL steps using this Docker image. To create the Docker image, you need:

If this is your first time using AWS Batch, see Getting Started with AWS Batch. Create an environment to build and register the Docker image. For this post, register this image in an Amazon ECR repository. This is a private repository by default, making it useful for AWS Batch jobs.

Building the fetch and running psql Docker image

To build the Docker image, follow the steps outlined in the post Creating a Simple “Fetch & Run” AWS Batch Job.

Use the following Docker configuration and fetch and run psql scripts to build the images.

  1. DockerFetchRunPsqlUbundu
  2. fetch_and_run_psql.sh

Follow the steps in the post to import the Docker image into the ECR container registry. After you complete the previous steps, your Docker image is ready to trigger a .sql execution for an Amazon Redshift cluster.

Example: ETL process using TPC-DS dataset

This example uses a subset of the TPC-DS dataset to demonstrate a typical dimensional model refresh. Here is the Entity Relationship diagram of the TPC-DS data model that I use for this ETL application:

The ETL process refreshes table data for the Store_Sales fact table along with the Customer_Address and Item dimensions for a particular dataset date.

Setting up the ETL workflow using Step Functions

Step Functions make complicated workflows more straightforward. You can set up dependency management and failure handling using a JSON-based template. Workflows are just a series of steps, with the output of one step acting as input into the next.

This example completes various dimensional table transforms and loads before triggering the Fact table load. Also, a workflow can branch out into multiple parallel steps whenever needed. You can monitor each step of execution as it happens, which means you can identify and fix problems quickly.

This illustration outlines the example ETL process set up through Step Functions:

For more information, see the detailed workflow diagram.

In the above workflow, the ETL process checks the DB connection in Step 1 and triggers the Customer_Address (Step 2.1) and Item_dimension (Step 2.2) steps, which execute in parallel. The Store_Sales (Step 3) FACT table waits for the process to complete the dimensional tables. Each ETL step is autonomous, allowing you to monitor and respond to failures at any stage.

I now examine the Store_Sales step (Step 3) in detail. Other steps follow a similar pattern of implementation.

Here is the state implementation for Store_Sales step (Step 3):

{
   "Comment":"A simple ETL example that submits a Job to AWS Batch",
   "StartAt":"DBConnectionInit",
	...
      "Parallel":{
         "Type":"Parallel",
         "Next":"SalesFACTInit",
         "ResultPath":"$.status",
         "Branches":[
	...
      },
      "SalesFACTInit":{
         "Type":"Pass",
         "Next":"SubmitStoreSalesFACTJob",
         "Result":"SalesFACT",
         "ResultPath":"$.stepId"
      },
      "SubmitStoreSalesFACTJob":{
         "Type":"Task",
         "Resource":"arn:aws:lambda:us-west-2:1234567890:function:StepFunctionsSample-JobStatusPol-SubmitJobFunction-5M2HCJIG81R1",
         "Next":"GetStoreSalesFACTJobStatus"
      },
      "GetStoreSalesFACTJobStatus":{
         "Type":"Task",
         "Resource":"arn:aws:lambda:us-west-2:1234567890:function:StepFunctionsSample-JobStatusPoll-CheckJobFunction-1SKER18I6FU24",
         "Next":"CheckStoreSalesFACTJobStatus",
         "InputPath":"$",
         "ResultPath":"$.status"
      },
      "CheckStoreSalesFACTJobStatus":{
         "Type":"Choice",
         "Choices":[
            {
               "Variable":"$.status",
               "StringEquals":"FAILED",
               "Next":"FailState"
            },
            {
               "Variable":"$.status",
               "StringEquals":"SUCCEEDED",
               "Next":"GetFinalStoreSalesFACTJobStatus"
            }
         ],
         "Default":"StoreSalesFACTWait30Seconds"
      },
	...
   }
}

The Parallel process that loads all the dimension tables sets up a dependency on later Store Sales Fact transformation/load SalesFACTInit through the Next attribute. The SalesFACTInit step triggers the transformation using the SubmitStoreSalesFACTJob to AWS Batch triggered through AWS Lambda job JobStatusPol-SubmitJobFunction. GetStoreSalesFACTJobStatus polls through the AWS Lambda JobStatusPoll-CheckJobFunction every 30 seconds to check for completion. CheckStoreSalesFACTJobStatus validates the status and decides to succeed or fail the process depending on the returned status.

Here is snippet of input for executing the state machine job for Step 3:

{  
"DBConnection":{  
..
   "SalesFACT":{  
      "jobName":"my-job",
      "jobDefinition":"arn:aws:batch:us-west-2:1234567890:job-definition/JobDefinition-cd6aa175c07fb2a:1",
      "jobQueue":"arn:aws:batch:us-west-2:1234567890:job-queue/JobQueue-217beecdb0caa3f",
      "wait_time":60,
      "containerOverrides":{  
         "environment":[  
            {  
               "name":"BATCH_FILE_TYPE",
               "value":"script_psql"
            },
            {  
               "name":"BATCH_FILE_S3_URL",
               "value":"s3://salamander-us-east-1/reinvent2018/ant353/etlscript/psql_rs.sh"
            },
            {  
               "name":"BATCH_FILE_SQL_S3_URL",
               "value":"s3://salamander-us-east-1/reinvent2018/ant353/etlscript/store_sales.sql"
            },
            {  
               "name":"DATASET_DATE",
               "value":"2003-01-02"
            }
         ]     
}}}

The input defines which .sql file each step invokes, along with the refresh date. You can represent any complex ETL workflow as a JSON workflow, making it easy to manage. This also decouples the inputs to invoke for each step.

Executing the ETL workflow

AWS Batch executes each .sql script (store_sales.sql) that the state machine invokes by using an incremental data refresh for the sales data on a particular date.

Here is the load and transformation implementation for the store_sales.sql:

\set s3datareadrolevar 'aws_iam_role=' :s3datareadrole
-- This transform ETL will refresh data for the store_sales table
-- Start a new transaction

begin transaction;

-- Create a stg_store_sales staging table and COPY data from input S3 location it with updated rows from SALES_UPDATE

DROP TABLE if exists public.stg_store_sales;
CREATE TABLE public.stg_store_sales
(
	sold_date DATE   ENCODE lzo
	,sold_time INTEGER   ENCODE lzo
	,i_item_id CHAR(16)   ENCODE lzo
	,c_customer_id CHAR(16)   ENCODE lzo
	,cd_demo_sk INTEGER   ENCODE lzo
	,hd_income_band_sk INTEGER   ENCODE lzo
	,hd_buy_potential CHAR(15)   ENCODE lzo
	,hd_dep_count INTEGER   ENCODE lzo
	,hd_vehicle_count INTEGER   ENCODE lzo
	,ca_address_id CHAR(16)   ENCODE lzo
	,s_store_id CHAR(16)   ENCODE lzo
	,p_promo_id CHAR(16)   ENCODE lzo
	,ss_ticket_number INTEGER   ENCODE lzo
	,ss_quantity INTEGER   ENCODE lzo
	,ss_wholesale_cost NUMERIC(7,2)   ENCODE lzo
	,ss_list_price NUMERIC(7,2)   ENCODE lzo
	,ss_sales_price NUMERIC(7,2)   ENCODE lzo
	,ss_ext_discount_amt NUMERIC(7,2)   ENCODE lzo
	,ss_ext_sales_price NUMERIC(7,2)   ENCODE lzo
	,ss_ext_wholesale_cost NUMERIC(7,2)   ENCODE lzo
	,ss_ext_list_price NUMERIC(7,2)   ENCODE lzo
	,ss_ext_tax NUMERIC(7,2)   ENCODE lzo
	,ss_coupon_amt NUMERIC(7,2)   ENCODE lzo
	,ss_net_paid NUMERIC(7,2)   ENCODE lzo
	,ss_net_paid_inc_tax NUMERIC(7,2)   ENCODE lzo
	,ss_net_profit NUMERIC(7,2)   ENCODE lzo
)
BACKUP NO
DISTSTYLE EVEN
;


\set s3loc 's3://salamander-us-east-1/reinvent2018/ant353/store_sales/saledate=' :dt '/'
-- COPY input data to the staging table
copy public.stg_store_sales
from
:'s3loc'
CREDENTIALS :'s3datareadrolevar'
DELIMITER '~' gzip region 'us-east-1';

-- Delete any rows from target store_sales for the input date for idempotency
delete from store_sales where ss_sold_date_sk in (select d_date_sk from date_dim where d_date=:'dt');
--Insert data from staging table to the target TABLE
INSERT INTO store_sales
(
  ss_sold_date_sk,
  ss_sold_time_sk,
  ss_item_sk,
  ss_customer_sk,
  ss_cdemo_sk,
  ss_hdemo_sk,
  ss_addr_sk,
  ss_store_sk,
  ss_promo_sk,
  ss_ticket_number,
  ss_quantity,
  ss_wholesale_cost,
  ss_list_price,
  ss_sales_price,
  ss_ext_discount_amt,
  ss_ext_sales_price,
  ss_ext_wholesale_cost,
  ss_ext_list_price,
  ss_ext_tax,
  ss_coupon_amt,
  ss_net_paid,
  ss_net_paid_inc_tax,
  ss_net_profit
)
SELECT date_dim.d_date_sk ss_sold_date_sk,
       time_dim.t_time_sk ss_sold_time_sk,
       item.i_item_sk ss_item_sk,
       customer.c_customer_sk ss_customer_sk,
       customer_demographics.cd_demo_sk ss_cdemo_sk,
       household_demographics.hd_demo_sk ss_hdemo_sk,
       customer_address.ca_address_sk ss_addr_sk,
       store.s_store_sk ss_store_sk,
       promotion.p_promo_sk ss_promo_sk,
       ss_ticket_number,
       ss_quantity,
       ss_wholesale_cost,
       ss_list_price,
       ss_sales_price,
       ss_ext_discount_amt,
       ss_ext_sales_price,
       ss_ext_wholesale_cost,
       ss_ext_list_price,
       ss_ext_tax,
       ss_coupon_amt,
       ss_net_paid,
       ss_net_paid_inc_tax,
       ss_net_profit
FROM stg_store_sales AS store_sales
  JOIN date_dim ON store_sales.sold_date = date_dim.d_date
  LEFT JOIN time_dim ON store_sales.sold_time = time_dim.t_time
  LEFT JOIN item
         ON store_sales.i_item_id = item.i_item_id
        AND i_rec_end_date IS NULL
  LEFT JOIN customer ON store_sales.c_customer_id = customer.c_customer_id
  LEFT JOIN customer_demographics ON store_sales.cd_demo_sk = customer_demographics.cd_demo_sk
  LEFT JOIN household_demographics
         ON store_sales.hd_income_band_sk = household_demographics.hd_income_band_sk
        AND store_sales.hd_buy_potential = household_demographics.hd_buy_potential
        AND store_sales.hd_dep_count = household_demographics.hd_dep_count
        AND store_sales.hd_vehicle_count = household_demographics.hd_vehicle_count
  LEFT JOIN customer_address ON store_sales.ca_address_id = customer_address.ca_address_id
  LEFT JOIN store
         ON store_sales.s_store_id = store.s_store_id
        AND s_rec_end_date IS NULL
  LEFT JOIN promotion ON store_sales.p_promo_id = promotion.p_promo_id;

  --drop staging table

  DROP TABLE if exists public.stg_store_sales;

  -- End transaction and commit
  end transaction;

This ETL implementation runs through the following steps:

  1. A COPY command fast loads the data from S3 in bulk into the staging table stg_store_sales.
  2. Begin…end transactions encapsulate multiple steps in the transformation and load process. This leads to fewer commit operations in the end, making the process less expensive.
  3. ETL implementation is idempotent. If it fails, you can retry the job without any cleanup. For example, it recreates the stg_store_sales is each time, then deletes target table store_sales with the data for the particular refresh date each time.

For best practices used in the preceding implementation, see the Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift post.

Furthermore, Customer_Address demonstrates a Type 1 implementation and Item follows a Type 2 implementation in a typical dimensional model.

Set up the entire workflow using AWS CloudFormation

The AWS CloudFormation template includes all the steps of this solution. This template creates all the required AWS resources and invokes initial data setup and the refresh of this data for a particular day. Here is a list of all the resources it creates inside the CloudFormation stack:

  • A VPC and associated subnets, security groups, and routes
  • IAM roles
  • An Amazon Redshift cluster
  • An AWS Batch job definition and compute environment
  • A Lambda function to submit and poll AWS Batch jobs
  • A Step Functions state machine to orchestrate the ETL workflow and refresh the data in the Amazon Redshift cluster

Here is the architecture of this setup that shows the Amazon Redshift setup in the VPC and the ETL process orchestrated using Step Functions:

Step 1: Create the stack with AWS CloudFormation

To deploy this application in your AWS account, start by launching this CloudFormation stack:

  • This stack uses the password Password#123. Change it as soon as possible. Use a minimum of eight characters, at least one uppercase letter, one lowercase letter, one number, and one special character.
  1. Use the default values for all other parameters.

The stack takes about ten minutes to launch. Wait for it to complete when the status changes to CREATE_COMPLETE.

Make a note of the value of ExecutionInput in the Output section of the stack. The JSON looks like the following code example:

“
{ "DBConnection":{ "jobName":"
…
alue":"s3://salamander-us-east-1/reinvent2018/ant353/etlscript/store_sales.sql" }, { "name":"DATASET_DATE", "value":"2003-01-02" } ] } } }
”

Note the Physical ID of JobDefinition and JobQueue in the Resources section of the stack.

Step 2: Set up TPC-DS 1-GB initial data in Amazon Redshift

The following steps load an initial 1 GB of TPCDS data into the Amazon Redshift cluster:

  • In the AWS Batch console, choose Job, select the job queue noted earlier, and choose Submit Job.
  • Set a new job name, for example, TPCDSdataload, and select the JobDefinition value that you noted earlier. Choose Submit Job. Wait for the job to completely load the initial 1 GB of TPCDS data into the Amazon Redshift cluster.
  • In the AWS Batch dashboard and monitor for the completion of TPCDS data load. This takes about ten minutes to complete.

Step 3: Execute the ETL workflow in the setup function

The ETL process is a multi-step workflow to refresh the TPCDS dimensional model with data from 2010-10-10.

  1. In the Step Functions console, choose JobStatusPollerStateMachine-*.
  2. Choose Start execution and provide an optional execution name, for example, ETLWorkflowDataRefreshfor2003-01-02. In the execution input, enter the ExecutionInput value that you noted earlier. This kickstarts the ETL process. The state machine uses the Lambda poller to submit and monitor each step of the ETL job. Each input invokes the ETL workflow. You can monitor the process of the ETL by refreshing your browser.

Step 4: Verify the ETL data refresh in the Amazon Redshift cluster

In the Amazon Redshift console, choose Query Editor. Enter the following credentials:

  • Database: dev.
  • Database Use: awsuser.
  • Password: This requires the password that you created in Step 1 (default Password#123).

After you are logged in to the public schema, execute the following query to check the data load for 2010-10-10:

SELECT c_last_name, 
               c_first_name, 
               ca_city, 
               bought_city, 
               ss_ticket_number, 
               amt, 
               profit 
FROM   (SELECT ss_ticket_number, 
               ss_customer_sk, 
               ca_city            bought_city, 
               Sum(ss_coupon_amt) amt, 
               Sum(ss_net_profit) profit 
        FROM   store_sales, 
               date_dim, 
               store, 
               household_demographics, 
               customer_address 
        WHERE  store_sales.ss_sold_date_sk = date_dim.d_date_sk 
               AND store_sales.ss_store_sk = store.s_store_sk 
               AND store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk 
               AND store_sales.ss_addr_sk = customer_address.ca_address_sk 
               AND ( household_demographics.hd_dep_count = 6 
                      OR household_demographics.hd_vehicle_count = 0 ) 
               AND d_date =  '2003-01-02'
        GROUP  BY ss_ticket_number, 
                  ss_customer_sk, 
                  ss_addr_sk, 
                  ca_city) dn, 
       customer, 
       customer_address current_addr 
WHERE  ss_customer_sk = c_customer_sk 
       AND customer.c_current_addr_sk = current_addr.ca_address_sk 
       AND current_addr.ca_city <> bought_city 
ORDER  BY c_last_name, 
          c_first_name, 
          ca_city, 
          bought_city, 
          ss_ticket_number
LIMIT 100;

The query should display the TPC-DS dataset for 2010-10-10 that the ETL process loaded.

Step 5: Cleaning up

When you finish testing this solution, remember to clean up all the AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack that you specified previously.

Conclusion

In this post, I described how to implement an ETL workflow using decoupled services in AWS and set up a highly scalable orchestration that lets you refresh data into an Amazon Redshift cluster.

You can easily expand on what you learned here. Here are some options that you let you extend this solution to accommodate other analytical services or make it robust enough to be production ready:

  • This example invokes the state machine manually using Step Functions. You can instead trigger the state machine automatically using a CloudWatch event or S3 event, such as whenever new files arrive in the source bucket. You also drive the ETL invocation using a schedule. For useful information for automating your ETL workflow, see Schedule a Serverless Workflow.
  • You can add an alert mechanism in case of failures. To do this, create a Lambda function that sends you an email based on the status of each step in the Step Functions workflow.
  • Each step of the state machine is autonomous and can invoke any service with a Lambda function. You can integrate any analytical service into your workflow. For example, you can create a separate Lambda function to invoke AWS Glue and clean some of your data before transforming the data using Amazon Redshift. In this case, you add the AWS Glue job as a dependency in the step before the dimension load.

With this Step Functions-based workflow, you can decouple the different steps of ETL orchestration using any analytical service. Because of this, the solution is adaptable and interchangeable to a wide variety of applications.

If you have questions or suggestions, please leave a comment below.

 


About the Author

Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.