AWS Big Data Blog

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

base_tbl.alias("employee")\
        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
    .whenNotMatchedInsert(
    values={
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"
    })\
    .execute()

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'
Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.


About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.