AWS Big Data Blog

Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR

Organizations across the globe are striving to improve the scalability and cost efficiency of the data warehouse. Offloading data and data processing from a data warehouse to a data lake empowers companies to introduce new use cases like ad hoc data analysis and AI and machine learning (ML), reusing the same data stored on Amazon Simple Storage Service (Amazon S3). This approach avoids data silos and allows you to process the data at very large scale while keeping the data access cost-effective.

Starting off with this new approach can bring with it several challenges:

  • Choosing the most performant data format
  • Using the Spark API instead of plain SQL
  • Handling historical data change on Amazon S3

In this post, I focus on demonstrating how to handle historical data change for a star schema by implementing Slowly Changing Dimension Type 2 (SCD2) with Apache Hudi using Apache Spark on Amazon EMR, and storing the data on Amazon S3.

Star schema and SCD2 concept overview

In data warehousing, a star schema is the simplest type of dimensional model, in which the center of the star can have one fact table and a number of associated dimension tables. A fact is an event that is counted, such as a single sale. A dimension contains reference information about the fact, such as product details or customer information.

SCD2 is a dimension that stores and manages current and historical data over time in a data warehouse. The purpose of an SCD2 is to preserve the history of changes. If a customer changes their address, for example, or any other attribute, an SCD2 allows analysts to link facts back to the customer and their attributes in the state they were at the time of the fact event.

The following diagram illustrates a star schema with a Sale fact table and Customer dimension table, which is managed as an SCD2 table.

Let’s have a deeper look at the Customer dimension table schema. You can categorize the columns into three different groups:

  • Keycustomer_dim_key, also called a surrogate key, has a unique value, generated automatically. It’s used as a foreign key for the sale fact table.
  • Attributescustomer_id, first_name, last_name, city, and country have a business value used in business intelligence (BI) reports.
  • SCD2 metadataeff_start_date, eff_end_date, and is_current are designed to manage the state of the record. eff_start_date and eff_end_date contain the time interval when the record is effective.
  • Metadatatimestamp is the actual time when the customer record was generated.

SCD2 implementation challenge

Implementing SCD2 in a data lake without using an additional framework like Apache Hudi introduces the challenge of updating data stored on immutable Amazon S3 storage, and as a result requires the implementor to create multiple copies of intermediate results. This situation may lead to a significant maintenance effort and potential data loss or data inconsistency.

Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development. Hudi enables you to manage data at the record level in Amazon S3 and helps to handle data privacy use cases requiring record-level updates and deletes. Hudi is supported by Amazon EMR starting from version 5.28 and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.

Using the Apache Hudi upsert operation allows Spark clients to update dimension records without any additional overhead, and also guarantees data consistency.

With copy-on-write mode, Hudi rewrites the files on Amazon S3 by performing a synchronous merge during the write operation. In addition, to enable fast file lookup as a part of the select query, it has an indexing mechanism by mapping a given record key consistently to a file ID.

For more information about Hudi concepts, see Concepts.

Solution overview

In a real-world use case, sale and customer records are ingested to a data lake continuously as a replication from operational databases. For this post, we generate customer and sale records inline.

To demonstrate the solution, I walk through the following steps:

  1. Generate customer records using a Spark DataFrame.
  2. Store customer records on Amazon S3 using Apache Hudi.
  3. Generate sale records.
  4. Look up a customer surrogate key used as a dimension key.
  5. Store sale records on Amazon S3.
  6. Run a BI query to calculate the amount of sales per country.
  7. Generate customer records to reflect the scenario when one customer updates the address and one new customer is created.
  8. Apply SCD2 logic for the Customer table.
  9. Generate new sale records.
  10. Run a BI query to calculate the amount of sales per country to validate the final result.

Prerequisites

Before you get started, make sure you meet the following prerequisites:

When your environment is ready, you can download the Jupyter notebook, add it to your Jupyter environment, and run it step by step.

Create customer schema and records

To create your customer schema and records, use the following code:

dim_customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

customer_dim_df = spark.createDataFrame([('1', 'John', 'Smith', 
                    'London', 'UK', 
                    datetime.strptime('2020-09-27', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True),
                    ('2', 'Susan', 'Chas', 
                    'Seattle', 'US',
                    datetime.strptime('2020-10-14', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True)], dim_customer_schema)

customer_hudi_df = customer_dim_df.withColumn("customer_dim_key", random_udf())

To generate unique surrogate keys of the customer, we get a current time in microseconds resolution, using the Python time library defined as Spark UDF:

from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

The best practice is to use eff_end_date with a specific future value so you can run range queries in the following steps.

Store customer records using Apache Hudi

For an initial insert, you only use the Hudi insert operation with Overwrite Spark mode:

customers_table_path = 's3://MY-BUCKET/customers/'
customers_table_name = 'customers_table'

partition_key = "country:SIMPLE"

hudi_options = {'hoodie.insert.shuffle.parallelism':'2',
               'hoodie.upsert.shuffle.parallelism':'2',
               'hoodie.delete.shuffle.parallelism':'2',
               'hoodie.bulkinsert.shuffle.parallelism':'2',
               'hoodie.datasource.hive_sync.enable':'false',
               'hoodie.datasource.hive_sync.assume_date_partitioning':'true'
}

customer_hudi_df.write.format('org.apache.hudi')\
                .options(**hudi_options)\
                .option('hoodie.table.name',customers_table_name)\
                .option('hoodie.datasource.write.recordkey.field','customer_dim_key')\
                .option('hoodie.datasource.write.partitionpath.field',partition_key)\
                .option('hoodie.datasource.write.precombine.field','timestamp')\
                .option('hoodie.datasource.write.operation', 'insert')\
                .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                .mode('Overwrite')\
                .save(customers_table_path)

For more information about Hudi configuration, see Configurations.

To validate the write operation, read the Customer Hudi table.

Create sale records

Next, we generate sales records of existing customers. The sales record contains a customer_id, which is a customer business key used to find a corresponding record in the Customer table. See the following code:

fact_sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_fact_df = spark.createDataFrame([('100', 25, 123.46,
                    datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('101', 300, 123.46,
                    datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('102', 5, 1038.0,
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], 
                    fact_sales_schema)

The following screenshot shows our results.

Customer dimension key lookup

To find a proper customer dimension surrogate key, we use LEFT OUTER JOIN. If there is no match, customer_dim_key gets -1 value by default. The customer dimension record may be missing due to the delay. This may happen when the components involved in a replication system can’t keep up with a pace of the replication from the operational database and are lagging behind. See the following code:

from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_hudi_df.customer_id,
             sales_fact_df.timestamp >= customer_hudi_df.eff_start_date,
             sales_fact_df.timestamp < customer_hudi_df.eff_end_date]

customers_dim_key_df = (sales_fact_df
                          .join(customer_hudi_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_hudi_df.customer_dim_key.isNull(), '-1')
                                  .otherwise(customer_hudi_df.customer_dim_key)
                                  .alias("customer_dim_key") )
                       )

The join condition describes the expression where a sale record belongs to the specific customer record, which has an effective date interval that contains the sale record timestamp.

The following screenshot shows our output.

Save sale records

Sales represent an event that happened in a point of time in the past and never changes. We use Parquet format with append mode to store sale records:

sales_table_path = 's3://MY-BUCKET/sales/'
sales_table_name = 'sales_table'

customers_dim_key_df.write.format('parquet')\
                             .mode('Append')\
                             .save(sales_table_path)

To demonstrate the dimensional query, we join the Sale table with the Customer table and count the amount of sales per country. SQL is commonly used for analytics queries, so we use SparkSQL to query the SCD2 data:

spark.sql(
    'SELECT ct.country, '
    'SUM(st.quantity) as sales_quantity,'
    'COUNT(*) as count_sales '
    'FROM sales_table st '
    'INNER JOIN customers_table ct on st.customer_dim_key = ct.customer_dim_key '
    'group by ct.country').show()

The following screenshot shows our results.

Handle customer address change

Let’s assume that the customer Susan updates their address from US to France. Also, a new customer, Bastian, is created. See the following code:

new_customer_dim_df = spark.createDataFrame([('3', 'Bastian', 'Back', 'Berlin', 'GE',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Susan', 'Chas','Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                dim_customer_schema)

new_customer_dim_df = new_customer_dim_df.withColumn("customer_dim_key", random_udf())

According to the SCD2 concept, when a new customer record is created, the historical record needs to expire. To implement the expiration, we find Susan’s customer record in the existing dataset, set eff_end_date to the eff_start_date of the newest record, and set the is_current value to false:

join_cond = [customer_hudi_df.customer_id == new_customer_dim_df.customer_id,
             customer_hudi_df.is_current == True]

## Find customer records to update
customers_to_update_df = (customer_hudi_df
                          .join(new_customer_dim_df, join_cond)
                          .select(customer_hudi_df.customer_id,
                                  customer_hudi_df.first_name,
                                  customer_hudi_df.last_name,
                                  customer_hudi_df.city,
                                  customer_hudi_df.country,
                                  customer_hudi_df.eff_start_date,
                                  new_customer_dim_df.eff_start_date.alias('eff_end_date'),
                                  customer_hudi_df.customer_dim_key,
                                  customer_hudi_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )

We create a union with the new customer records and store them using the Hudi upsert operation and Spark Append mode:

## Union with new customer records
merged_customers_df = new_customer_dim_df. unionByName(customers_to_update_df)

partition_key = "country:SIMPLE"

# Upsert
merged_customers_df.write.format('org.apache.hudi')\
                    .options(**hudi_options)\
                    .option('hoodie.table.name',customers_table_name)\
                    .option('hoodie.datasource.write.recordkey.field','customer_dim_key')\
                    .option('hoodie.datasource.write.partitionpath.field', partition_key)\
                    .option('hoodie.datasource.write.precombine.field', 'timestamp')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .option('hoodie.datasource.hive_sync.table',customers_table_name)\
                    .mode('append')\
                    .save(customers_table_path)

The Customer Hudi table shows two records existing for the customer Susan; one is historical and one is current. The historical one with US location has an updated eff_end_time and is_current set to false. Also, the new customer Bastian was added.

We can generate new sales made by Susan and look up a Customer dimension key using LEFT OUTER JOIN exactly as we did earlier.

Running the query to get a number of sales per country reflects an address change, showing two sales for Susan with the up-to-date address in France.

Conclusion

In this post, I demonstrated how to continuously build SCD2 using Apache Hudi, while maintaining low operational overhead and fully eliminating the need to handle intermediate results on Amazon S3.

By combining the low-cost storage of Amazon S3, the ability to separate storage and compute, and Hudi’s native integration with Amazon EMR, we now have an effective way to store SCD2 data in our data lake.

You should now have a good understanding of SCD2 and be able to experiment and reuse my example notebook to implement a solution with your own data.


About the author

David Greenshtein is a Specialist Solutions Architect for Analytics at AWS with a passion for ETL and automation. He works with AWS customers to design and build analytics solutions enabling business to make data-driven decisions. In his free time, he likes jogging and riding bikes with his son.