AWS Big Data Blog
Implement slowly changing dimensions in a data lake using AWS Glue and Delta
In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.
As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.
In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:
- Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
- Use consumption tools such as Amazon Athena to query historical records seamlessly
Solution overview
This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:
- The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
- The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
- Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer
The architecture is implemented as follows:
- Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
- An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
- The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a
delete_flag
value - An AWS Glue crawler catalogs the data, which can be queried by Athena
The following diagram illustrates our architecture.
Prerequisites
Before you get started, make sure you have the following prerequisites:
- An AWS account
- Appropriate AWS Identity and Access Management (IAM) permissions to deploy AWS CloudFormation stack resources
Deploy the solution
For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:
- Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
- A Lambda function to generate sample records
- An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket
To deploy the solution, complete the following steps:
- Choose Launch Stack to launch the CloudFormation stack:
- Enter a stack name.
- Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
- Choose Create stack.
After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:
- Data lake resources – The S3 buckets
scd-blog-landing-xxxx
andscd-blog-processed-xxxx
(referred to asscd-blog-landing
andscd-blog-processed
in the subsequent sections in this post) - Sample records generator Lambda function –
SampleDataGenaratorLambda-<CloudFormation Stack Name>
(referred to asSampleDataGeneratorLambda
) - AWS Glue Data Catalog database –
deltalake_xxxxxx
(referred to asdeltalake
) - AWS Glue Delta job –
<CloudFormation-Stack-Name>-src-to-processed
(referred to assrc-to-processed
)
Note that deploying the CloudFormation stack in your account incurs AWS usage charges.
Test SCD Type 2 implementation
With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:
- Run an AWS Glue job for the initial load
- Simulate a scenario where there are no changes to the source
- Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
- Simulate a scenario where the deleted record comes back as a new insert
Generate a sample employee dataset
To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.
Open the function and configure a test event, with the default hello-world
template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.
Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.
When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.
Run the AWS Glue job
Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/
. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:
Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.
To run the job, complete the following steps:
- On the AWS Glue console, choose Jobs in the navigation pane.
- Choose the job
src-to-processed
. - On the Runs tab, choose Run.
When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.
When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.
- Choose Crawlers in the navigation pane.
- Choose Create crawler.
- Name your crawler
delta-lake-crawler
, then choose Next.
- Select Not yet for data already mapped to AWS Glue tables.
- Choose Add a data source.
- On the Data source drop-down menu, choose Delta Lake.
- Enter the path to the Delta table.
- Select Create Native tables.
- Choose Add a Delta Lake data source.
- Choose Next.
- Choose the role that was created by the CloudFormation template, then choose Next.
- Choose the database that was created by the CloudFormation template, then choose Next.
- Choose Create crawler.
- Select your crawler and choose Run.
Query the data
After the crawler is complete, you can see the table it created.
To query the data, complete the following steps:
- Choose the employee table and on the Actions menu, choose View data.
You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.
- Under Administration in the navigation pane, choose Workgroups.
- Choose Create workgroup.
- Provide a name for the workgroup, such as
DeltaWorkgroup
. - Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.
- Choose Create workgroup.
- After you create the workgroup, select the workgroup (
DeltaWorkgroup
) on the drop-down menu in the Athena query editor.
- Run the following query on the
employee
table:
Note: Update the correct database name from the CloudFormation outputs before running the above query.
You can observe that the employee
table has 25 records. The following screenshot shows the total employee records with some sample records.
The Delta table is stored with an emp_key
, which is unique to each and every change and is used to track the changes. The emp_key
is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id
.
The emp_key
is created using the SHA256 hashing algorithm, as shown in the following code:
Perform inserts, updates, and deletes
Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select
query in the Athena query editor and confirm that there are still 25 active records with the following values:
- All 25 records with the column
isCurrent=true
- All 25 records with the column
end_date=Null
- All 25 records with the column
delete_flag=false
After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:
- Change the
isContractor
flag tofalse
(change it totrue
if your dataset already showsfalse
) foremp_id=12
. - Delete the entire row where
emp_id=8
(make sure to save the record in a text editor, because we use this record in another use case). - Copy the row for
emp_id=25
and insert a new row. Change theemp_id
to be26
, and make sure to change the values for other columns as well.
After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):
- Now, upload the changed
fake_emp_data.json
file to the same source prefix.
- After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
- When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
Note: Update the correct database name from the CloudFormation output before running the above query.
- Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
Note: Update the correct database name from the CloudFormation output before running the above query.
You will see two records for emp_id=12
:
- One
emp_id=12
record with the following values (for the record that was ingested as part of the initial load):emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
isCurrent=false
delete_flag=false
end_date=’2023-03-02’
- A second
emp_id=12
record with the following values (for the record that was ingested as part of the change to the source):emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
isCurrent=true
delete_flag=false
end_date=Null
(or empty string)
The record for emp_id=8
that was deleted in the source as part of this run will still exist but with the following changes to the values:
isCurrent=false
end_date=’2023-03-02’
delete_flag=true
The new employee record will be inserted with the following values:
emp_id=26
isCurrent=true
end_date=NULL
(or empty string)delete_flag=false
Note that the emp_key
values in your actual table may be different than what is provided here as an example.
- For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
- If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
- We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
isCurrent=false
delete_flag=true
end_date=current_date
See the following code:
- For both the updates and the inserts, we check for the condition if the base table
employee.emp_id
is equal to thenew changes.emp_id
and theemployee.emp_key
is equal tonew changes.emp_key
, while only retrieving the current records. - If this condition evaluates to
true
, we then get the current record (isCurrent=true
anddelete_flag=false
). - We merge the changes by updating the following:
- If the second condition evaluates to
true
:isCurrent=false
end_date=current_date
- Or we insert the entire row as follows if the second condition evaluates to
false
:emp_id=new record’s emp_key
emp_key=new record’s emp_key
first_name=new record’s first_name
last_name=new record’s last_name
address=new record’s address
phone_number=new record’s phone_number
isContractor=new record’s isContractor
start_date=current_date
end_date=NULL
(or empty string)isCurrent=true
delete_flag=false
- If the second condition evaluates to
See the following code:
As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee
table in the data lake and observe how the complete history is maintained.
Let’s modify our changed dataset from the previous step and make the following changes.
- Add the deleted
emp_id=8
back to the dataset.
After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
- Upload the changed employee dataset file to the same source prefix.
- After you upload the changed
fake_emp_data.json
dataset to Amazon S3, navigate to the AWS Glue console and run the job again. - When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
Note: Update the correct database name from the CloudFormation output before running the above query.
- Run the following query and confirm there are 5 records:
You will see two records for emp_id=8
:
- One
emp_id=8
record with the following values (the old record that was deleted):emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
isCurrent=false
deleted_flag=true
end_date=’2023-03-02’
- Another
emp_id=8
record with the following values (the new record that was inserted in the last run):emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
isCurrent=true
deleted_flag=false
end_date=NULL
(or empty string)
The emp_key
values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key
.
End-user sample queries
The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:
- Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
The preceding query would return two employee records who left the organization.
- Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
Note: Update the correct database name from the CloudFormation output before running the above query.
The preceding query would return 23 active employee records who joined the organization.
- Query 3 – Find the history of any given employee in the organization (in this case employee 18).
Note: Update the correct database name from the CloudFormation output before running the above query.
In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.
Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.
Clean up
When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:
- Empty the S3 buckets.
- Delete the stack from the AWS CloudFormation console.
Conclusion
In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.
Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.
About the authors
Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.
Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.
Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.