Orchestrate an ETL process using AWS Step Functions for Amazon Redshift
June 1, 2021 – This blog post has been updated to use a one-click launch of the ETL data pipeline directly from the AWS Step function sample projects. Navigate to the AWS Step Functions console and select “ETL job in Amazon Redshift” to have the sample pipeline setup in your account. Please see https://docs.aws.amazon.com/step-functions/latest/dg/sample-etl-orchestration.html”
Modern data lakes depend on extract, transform, and load (ETL) operations to convert bulk information into usable data. This post walks through implementing an ETL orchestration process that is loosely coupled using AWS Step Functions, AWS Lambda, and AWS Batch to target an Amazon Redshift cluster.
Because Amazon Redshift uses columnar storage, it is well suited for fast analytical insights using the convenient ANSI SQL queries. You can rapidly scale your Amazon Redshift clusters up and down in minutes to meet the demanding workloads for both your end-user reports and timely data refresh into the data warehouse.
AWS Step Functions makes it easy to develop and use repeatable workflows that scale well. Step Functions lets you build automation workflows from individual Lambda functions. Each function performs a discrete task and lets you develop, test, and modify the components of your workflow quickly and seamlessly.
An ETL process refreshes your data warehouse from source systems, organizing the raw data into a format you can more readily use. Most organizations run ETL as a batch or as part of a real-time ingest process to keep the data warehouse current and provide timely analytics. A fully automated and highly scalable ETL process helps minimize the operational effort that you must invest in managing the regular ETL pipelines. It also ensures the timely and accurate refresh of your data warehouse. You can tailor this process to refresh data into any data warehouse or the data lake.
This post also provides an AWS CloudFormation template that launches the entire sample ETL process in one click to refresh the TPC-DS dataset. Find the template link in the Set up the entire workflow using AWS CloudFormation section.
The following diagram illustrates the architectural overview of the different components involved in the orchestration of the ETL workflow. This workflow uses Step Functions to fetch source data from Amazon S3 to refresh the Amazon Redshift data warehouse.
Here are the core components of the workflow:
- Amazon CloudWatch triggers the ETL process based on a schedule, through the AWS CLI, or using the various AWS SDKs in a Lambda function.
- The ETL workflow uses Step Functions for a multi-step ETL process and manages AWS services into serverless workflows. You can build and easily iterate these using JSON-based templates. For example, a typical ETL process may involve refreshing dimensions first and later refreshing the fact tables. You can declare your order of operations using a Step Functions state machine.
- A Lambda function lets you build microservices to coordinate job submission and monitoring without needing to write code for workflow logic, parallel processes, error handling, timeouts, or retries.
- AWS Batch runs several ETL jobs such as transforms and loads into Amazon Redshift. AWS Batch manages all the infrastructure for you, avoiding the complexities of provisioning, managing, monitoring, and scaling your batch computing jobs. It also lets you wait for the jobs to complete.
- The source data in Amazon S3 refreshes an Amazon Redshift data warehouse through a PL/SQL container. To specify the ETL logic, I use.sql files that contain the SQL code for a particular step. For example, a .sql file for a typical dimension table refresh contains steps to load the data from Amazon S3 to a temporary staging table and INSERT/UPDATE the target table. Before beginning, review a sample dimensional table .sql file.
You can execute the workflow and monitor it using the state machine. You can trigger the ETL according to a schedule or an event (for example, as soon as all the data files arrive in S3).
Before you get started, create a Docker image that can execute .sql files. AWS Batch creates resources for executing the ETL steps using this Docker image. To create the Docker image, you need:
If this is your first time using AWS Batch, see Getting Started with AWS Batch. Create an environment to build and register the Docker image. For this post, register this image in an Amazon ECR repository. This is a private repository by default, making it useful for AWS Batch jobs.
Building the fetch and running psql Docker image
To build the Docker image, follow the steps outlined in the post Creating a Simple “Fetch & Run” AWS Batch Job.
Use the following Docker configuration and fetch and run psql scripts to build the images.
Follow the steps in the post to import the Docker image into the ECR container registry. After you complete the previous steps, your Docker image is ready to trigger a .sql execution for an Amazon Redshift cluster.
Example: ETL process using TPC-DS dataset
This example uses a subset of the TPC-DS dataset to demonstrate a typical dimensional model refresh. Here is the Entity Relationship diagram of the TPC-DS data model that I use for this ETL application:
The ETL process refreshes table data for the
Store_Sales fact table along with the
Customer_Address and Item dimensions for a particular dataset date.
Setting up the ETL workflow using Step Functions
Step Functions make complicated workflows more straightforward. You can set up dependency management and failure handling using a JSON-based template. Workflows are just a series of steps, with the output of one step acting as input into the next.
This example completes various dimensional table transforms and loads before triggering the Fact table load. Also, a workflow can branch out into multiple parallel steps whenever needed. You can monitor each step of execution as it happens, which means you can identify and fix problems quickly.
This illustration outlines the example ETL process set up through Step Functions:
For more information, see the detailed workflow diagram.
In the above workflow, the ETL process checks the DB connection in Step 1 and triggers the
Customer_Address (Step 2.1) and
Item_dimension (Step 2.2) steps, which execute in parallel. The
Store_Sales (Step 3) FACT table waits for the process to complete the dimensional tables. Each ETL step is autonomous, allowing you to monitor and respond to failures at any stage.
I now examine the
Store_Sales step (Step 3) in detail. Other steps follow a similar pattern of implementation.
Here is the state implementation for
Store_Sales step (Step 3):
The Parallel process that loads all the dimension tables sets up a dependency on later Store Sales Fact transformation/load SalesFACTInit through the Next attribute. The SalesFACTInit step triggers the transformation using the SubmitStoreSalesFACTJob to AWS Batch triggered through AWS Lambda job JobStatusPol-SubmitJobFunction. GetStoreSalesFACTJobStatus polls through the AWS Lambda JobStatusPoll-CheckJobFunction every 30 seconds to check for completion. CheckStoreSalesFACTJobStatus validates the status and decides to succeed or fail the process depending on the returned status.
Here is snippet of input for executing the state machine job for Step 3:
The input defines which .sql file each step invokes, along with the refresh date. You can represent any complex ETL workflow as a JSON workflow, making it easy to manage. This also decouples the inputs to invoke for each step.
Executing the ETL workflow
AWS Batch executes each .sql script (
store_sales.sql) that the state machine invokes by using an incremental data refresh for the sales data on a particular date.
Here is the load and transformation implementation for the
This ETL implementation runs through the following steps:
- A COPY command fast loads the data from S3 in bulk into the staging table
- Begin…end transactions encapsulate multiple steps in the transformation and load process. This leads to fewer commit operations in the end, making the process less expensive.
- ETL implementation is idempotent. If it fails, you can retry the job without any cleanup. For example, it recreates the
stg_store_salesis each time, then deletes target table
store_saleswith the data for the particular refresh date each time.
For best practices used in the preceding implementation, see the Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift post.
Set up the entire workflow using AWS CloudFormation
The AWS CloudFormation template includes all the steps of this solution. This template creates all the required AWS resources and invokes initial data setup and the refresh of this data for a particular day. Here is a list of all the resources it creates inside the CloudFormation stack:
- A VPC and associated subnets, security groups, and routes
- IAM roles
- An Amazon Redshift cluster
- An AWS Batch job definition and compute environment
- A Lambda function to submit and poll AWS Batch jobs
- A Step Functions state machine to orchestrate the ETL workflow and refresh the data in the Amazon Redshift cluster
Here is the architecture of this setup that shows the Amazon Redshift setup in the VPC and the ETL process orchestrated using Step Functions:
Step 1: Create the stack with AWS CloudFormation
- This stack uses the password Password#123. Change it as soon as possible. Use a minimum of eight characters, at least one uppercase letter, one lowercase letter, one number, and one special character.
- Use the default values for all other parameters.
The stack takes about ten minutes to launch. Wait for it to complete when the status changes to CREATE_COMPLETE.
Make a note of the value of ExecutionInput in the Output section of the stack. The JSON looks like the following code example:
Note the Physical ID of JobDefinition and JobQueue in the Resources section of the stack.
Step 2: Set up TPC-DS 1-GB initial data in Amazon Redshift
The following steps load an initial 1 GB of TPCDS data into the Amazon Redshift cluster:
- In the AWS Batch console, choose Job, select the job queue noted earlier, and choose Submit Job.
- Set a new job name, for example, TPCDSdataload, and select the JobDefinition value that you noted earlier. Choose Submit Job. Wait for the job to completely load the initial 1 GB of TPCDS data into the Amazon Redshift cluster.
- In the AWS Batch dashboard and monitor for the completion of TPCDS data load. This takes about ten minutes to complete.
Step 3: Execute the ETL workflow in the setup function
The ETL process is a multi-step workflow to refresh the TPCDS dimensional model with data from 2010-10-10.
- In the Step Functions console, choose JobStatusPollerStateMachine-*.
- Choose Start execution and provide an optional execution name, for example, ETLWorkflowDataRefreshfor2003-01-02. In the execution input, enter the ExecutionInput value that you noted earlier. This kickstarts the ETL process. The state machine uses the Lambda poller to submit and monitor each step of the ETL job. Each input invokes the ETL workflow. You can monitor the process of the ETL by refreshing your browser.
Step 4: Verify the ETL data refresh in the Amazon Redshift cluster
In the Amazon Redshift console, choose Query Editor. Enter the following credentials:
- Database: dev.
- Database Use: awsuser.
- Password: This requires the password that you created in Step 1 (default Password#123).
After you are logged in to the public schema, execute the following query to check the data load for 2010-10-10:
The query should display the TPC-DS dataset for 2010-10-10 that the ETL process loaded.
Step 5: Cleaning up
When you finish testing this solution, remember to clean up all the AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack that you specified previously.
In this post, I described how to implement an ETL workflow using decoupled services in AWS and set up a highly scalable orchestration that lets you refresh data into an Amazon Redshift cluster.
You can easily expand on what you learned here. Here are some options that you let you extend this solution to accommodate other analytical services or make it robust enough to be production ready:
- This example invokes the state machine manually using Step Functions. You can instead trigger the state machine automatically using a CloudWatch event or S3 event, such as whenever new files arrive in the source bucket. You also drive the ETL invocation using a schedule. For useful information for automating your ETL workflow, see Schedule a Serverless Workflow.
- You can add an alert mechanism in case of failures. To do this, create a Lambda function that sends you an email based on the status of each step in the Step Functions workflow.
- Each step of the state machine is autonomous and can invoke any service with a Lambda function. You can integrate any analytical service into your workflow. For example, you can create a separate Lambda function to invoke AWS Glue and clean some of your data before transforming the data using Amazon Redshift. In this case, you add the AWS Glue job as a dependency in the step before the dimension load.
With this Step Functions-based workflow, you can decouple the different steps of ETL orchestration using any analytical service. Because of this, the solution is adaptable and interchangeable to a wide variety of applications.
If you have questions or suggestions, please leave a comment below.
About the Author
Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.