AWS Big Data Blog

Create a most-recent view of your data lake using Amazon Redshift Serverless

Building a robust data lake is very beneficial because it enables organizations have a holistic view of their business and empowers data-driven decisions. The curated layer of a data lake is able to hydrate multiple homogeneous data products, unlocking limitless capabilities to address current and future requirements. However, some concepts of how data lakes work feel counter-intuitive for professionals with a traditional database background.

Data lakes are by design append-only, meaning that the new records with an existing primary key don’t update the existing values out-of-the box; instead the new values are appended, resulting in having multiple occurrences of the same primary key in the data lake. Furthermore, special care needs to be taken to handle row deletions, and even if there is a way to identify deleted primary keys, it’s not straightforward to incorporate them to the data lake.

Traditionally, processing between the different layers of data lakes is performed using distributed processing engines such as Apache Spark that can be deployed in a managed or serverless way using services such as Amazon EMR or AWS Glue. Spark has recently introduced frameworks that give data lakes a flavor of ACID properties, such as Apache Hudi, Apache Iceberg, and Delta Lake. However, if you’re coming from a database background, there is a significant learning curve to adopt these technologies that involves understanding the concepts, moving away from SQL to a more general-purpose language, and adopting a specific ACID framework and its complexities.

In this post, we discuss how to implement a data lake that supports updates and deletions of individual rows using the Amazon Redshift ANSI SQL compliant syntax as our main processing language. We also take advantage of its serverless offering to address scenarios where consumption patterns don’t justify creating a managed Amazon Redshift cluster, which makes our solution cost-efficient. We use Python to interact with the AWS API, but you can also use any other AWS SDK. Finally, we use AWS Glue auto-generated code to ingest data to Amazon Redshift Serverless and AWS Glue crawlers to create metadata for our datasets.

Solution overview

Most of the services we use for this post can be treated as placeholders. For instance, we use Amazon DocumentDB (with MongoDB compatibility) as our data source system, because one of the key features of a data lake is that it can support structured and unstructured data. We use AWS Database Migration Service (AWS DMS) to ingest data from Amazon DocumentDB to the data lake on Amazon Simple Storage Service (Amazon S3). The change data capture (CDC) capabilities of AWS Database Migration Service (AWS DMS) enable us to identify both updated and deleted rows that we want to propagate to the data lake. We use an AWS Glue job to load raw data to Redshift Serverless to use job bookmarks, which allows each run of the load job to ingest new data. You could replace the AWS Glue job with Amazon Redshift Spectrum or the Amazon Redshift COPY command if there is a different way to identify newly arrived data.

After new data is ingested to Amazon Redshift, we use it as our extract, transform, and load (ETL) engine. We trigger a stored procedure to curate the new data, upsert it to our existing table, and unload it to the data lake. To handle data deletions, we have created a scalar UDF in AWS Lambda that we can call from Amazon Redshift to delete the S3 partitions that have been affected by the newly ingested dataset before rewriting them with the updated values. The following diagram showcases the architecture of our solution.

Data ingestion

The dataset we use for this post is available on GitHub. After we create our Amazon DocumentDB instance (for this post, we used engine version 4.0.0 and the db.t3.medium instance class), we ingest the sample dataset. Ingested records look like the this:

{
	"_id": ObjectId("61f998d9b7599a1e904ae84d"),
	"Case_Type": "Confirmed",
	"Cases": 1661,
	"Difference": 135,
	"Date": "5/12/2020",
	"Country_Region": "Sudan",
	"Province_State": "N/A",
	"Admin2": "",
	"Combined_Key": "Sudan",
	"Lat": 12.8628,
	"Long": 30.2176,
	"Prep_Flow_Runtime": "6/5/2022 11:15:39 PM"
}

We then create an AWS DMS dms.t3.medium instance using engine version 3.4.6, a source endpoint for Amazon DocumentDB, and a target endpoint for Amazon S3, adding dataFormat=parquet; as an extra configuration, so that records are stored in Parquet format in the landing zone of the data lake. After we confirm the connectivity of the endpoints using the Test connection feature, we create a database migration task for full load and ongoing replication. We can confirm data is migrated successfully to Amazon S3 by browsing in the S3 location and choosing Query with S3 Select on the Parquet file that has been generated. The result looks like the following screenshot.

We then catalog the ingested data using an AWS Glue crawler on the landing zone S3 bucket, so that we can query the data with Amazon Athena and process it with AWS Glue.

Set up Redshift Serverless

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers. It’s not uncommon to want to take advantage of the rich features Amazon Redshift provides without having a workload demanding enough to justify purchasing an Amazon Redshift provisioned cluster. To address such scenarios, we have launched Redshift Serverless, which includes all the features provisioned clusters have but enables you to only pay for the workloads you run rather than paying for the entire time your Amazon Redshift cluster is up.

Our next step is to set up our Redshift Serverless instance. On the Amazon Redshift console, we choose Redshift Serverless, select the required settings (similar to creating a provisioned Amazon Redshift cluster), and choose Save configuration. If you intend to access your Redshift Serverless instance via a JDBC client, you might need to enable public access. After the setup is complete, we can start using the Redshift Serverless endpoint.

Load data to Redshift Serverless

To confirm our serverless point is accessible, we can create an AWS Glue connection and test its connectivity. The type of the connection is JDBC, and we can find our serverless JDBC URL from the Workgroup configuration section on our Redshift Serverless console. For the connection to be successful, we need to configure the connectivity between the Amazon Redshift and AWS Glue security groups. For more details, refer to Connecting to a JDBC Data Store in a VPC. After connectivity is configured correctly, we can test the connection and if the test is successful, we should see the following message.

We use an AWS Glue job to load the historical dataset to Amazon Redshift; however, we don’t apply any business logic at this step because we want to implement our transformations using ANSI SQL in Amazon Redshift. Our AWS Glue job is mostly auto-generated boilerplate code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

job.commit()

The job reads the table cases from the database document_landing from our AWS Glue Data Catalog, created by the crawler after data ingestion. It copies the underlying data to the table cases_stage in the database dev of the cluster defined in the AWS Glue connection redshift-serverless. We can run this job and use the Amazon Redshift query editor v2 on the Amazon Redshift console to confirm its success by seeing the newly created table, as shown in the following screenshot.

We can now query and transform the historical data using Redshift Serverless.

Transform stored procedure

We have created a stored procedure that performs some common transformations, such as converting a text to date and extracting the year, month, and day from it, and creating some boolean flag fields. After the transformations are performed and stored to the cases table, we want to unload the data to our data lake S3 bucket and finally empty the cases_stage table, preparing it to receive the next CDC load of our pipeline. See the following code:

CREATE OR REPLACE PROCEDURE public.historicalload() LANGUAGE plpgsql AS $$
DECLARE
  sql text;
  s3folder varchar(65535);
  iamrole varchar(1000);
  unload varchar(65535);
begin
  create table cases as (
  SELECT oid__id
  , case_type
  , cases
  , difference
  , TO_DATE(date, 'mm/dd/yyyy') date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime
  , fips
  , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year
  , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month
  , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day
  , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death
  , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed
  FROM "dev"."public"."cases_stage");
  sql:='select * from "dev"."public"."cases"';
  s3folder:= s3://object-path/name-prefix ';
  iamrole:='arn:aws:iam::<AWS account-id>:role/<role-name>';
  unload := 'unload ('''||sql||''') to '''||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';
  execute unload;
  truncate "dev"."public"."cases_stage";
END;
$$

Note that for unloading data to Amazon S3, we need to create the AWS Identity and Access Management (IAM) role arn:aws:iam::<AWS account-id>:role/<role-name>, give it the required Amazon S3 permissions, and associate it with our Redshift Serverless instance. Calling the stored procedure after ingesting the historical dataset to the cases_stage table results in loading the transformed and partitioned dataset in the S3 folder specified in Parquet format. After we crawl the folder with an AWS Glue crawler, we can verify the results using Athena.

We can revisit the loading AWS Glue job we described before to automatically invoke the steps we triggered manually. We can use an Amazon Redshift postactions configuration to trigger the stored procedure as soon as the loading of the staging table is complete. We can also use Boto3 to trigger the AWS Glue crawler when data is unloaded to Amazon S3. Therefore, the revisited AWS Glue job looks like the following code:

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

glueClient = boto3.client('glue')

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
glueClient = boto3.client('glue')
response = glueClient.start_crawler(Name='document-lake')
job.commit()

CDC load

Because the AWS DMS migration task we configured is for full load and ongoing replication, it can capture updates and deletions on a record level. Updates come with an OP column with value U, such as in the following example:

U,61f998d9b7599a1e843038cc,Confirmed,25,1,3/24/2020,Sudan,N/A,,Sudan,12.8628,30.2176,6/4/2020 11:15:39 PM,

Deletions have an OP column with value D, the primary key to be deleted, and all other fields empty:

D,61f998d9b7599a1e904ae941,,,,,,,,,,,,

Enabling job bookmarks for our AWS Glue job guarantees that subsequent runs of the job only process new data since the last checkpoint that was set in the previous run of the job without any changes in our code. After the CDC data is loaded in the staging table, our next step is to delete rows with OP column D, perform a merge operation to replace existing rows, and upload the result to our S3 bucket. However, because S3 objects are immutable, we need to delete the partitions that would be affected by the latest ingested dataset and rewrite them from Amazon Redshift. Amazon Redshift doesn’t have an out-of-the-box way to delete S3 objects; however, we can use a scalar Lambda UDF that takes as an argument the partition to be deleted and removes all the objects under the partition. The Python code of our Lambda function uses Boto3 and looks like the following example:

import json
import boto3

datalakeBucket = '<DATA-LAKE-BUCKET>'
folder = 'prod/cases/'
    
def lambda_handler(event, context):
    ret = {}
    res = []
    s3Resource = boto3.resource('s3')
    bucket = s3Resource.Bucket(datalakeBucket)
    for argument in event['arguments']:
        partition = argument[0]
        bucket.objects.filter(Prefix=folder + partition).delete()
        res.append(True)
    ret['success'] = True
    ret['results'] = res
    return json.dumps(ret)

We then need to register the Lambda UDF from in our Amazon Redshift cluster by running the following code:

create or replace external function deletePartition(path VARCHAR)
returns boolean stable 
LAMBDA 'deletePartition'
IAM_ROLE 'arn:aws:iam::<AWS account-id>:role/<role-name>';

The last edge case we might need to take care of is the scenario of a CDC dataset containing multiple records for the same dataset. The approach we take here is to use another date field that is available in the dataset, prep_flow_time, to keep the latest record. To implement this logic in SQL, we use a nested query with the row_number window function.

On a high level, the upsert stored procedure includes the following steps:

  1. Delete partitions that are being updated from the S3 data lake, using the scalar Lambda UDF.
  2. Delete rows that are being updated from Amazon Redshift.
  3. Implement the transformations and compute the latest values of the updated rows.
  4. Insert them into the target table in Amazon Redshift.
  5. Unload the affected partitions to the S3 data lake.
  6. Truncate the staging table.

See the following code:

CREATE OR REPLACE PROCEDURE upsert()
AS $$
DECLARE
  sql text;
  deletePartition text;
  s3folder varchar(65535);
  iamrole varchar(1000);
  unload varchar(65535);
begin
  
  drop table if exists affected_dates;
  create temp table affected_dates as select date from cases where oid__id in (select distinct oid__id from cases_stage cs);
  deletePartition:='select deletePartition(partitionPath) from 
  (select distinct ''year='' || year::VARCHAR || ''/month='' || month::VARCHAR || ''/day='' || day::VARCHAR partitionPath
  from cases
  where date in (select date from affected_dates));';
  execute deletePartition;
	
  delete from cases using cases_stage 
  where cases.oid__id = cases_stage.oid__id;
  insert into cases 
  select oid__id
  , case_type
  , cases
  , difference
  , date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , prep_flow_runtime
  , fips
  , year
  , month
  , day
  , is_death
  , is_confirmed
from
(SELECT row_number() over (partition by oid__id order by TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') desc) seq
  , oid__id
  , case_type
  , cases
  , difference
  , TO_DATE(date, 'mm/dd/yyyy') date
  , country_region
  , province_state
  , admin2
  , combined_key
  , lat
  , long
  , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime
  , fips
  , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year
  , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month
  , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day
  , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death
  , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed
  from cases_stage where op = 'U') where seq = 1;
  
  sql:='select *
  from cases
  where date in (select date from affected_dates);';
  s3folder:='s3://<DATA-LAKE-BUCKET>/prod/cases/';
  iamrole:=' arn:aws:iam::<AWS account-id>:role/<role-name>';
  unload := 'unload ('''||sql||''') to ''' ||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';    
  execute unload;
  truncate cases_stage;
END;
$$ LANGUAGE plpgsql;

Finally, we can modify the aforementioned AWS Glue job to use Boto3 to decide whether this is a historical load or a CDC by using Boto3 to check if the table exists in the AWS Glue Data Catalog. The final version of the AWS Glue job is as follows:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
glueClient = boto3.client('glue')
tableExists = True
post_query="call upsert();"
try:
    response = glueClient.get_table(DatabaseName='document-data-lake', Name='cases')
except glueClient.exceptions.EntityNotFoundException:
    tableExists = False
    post_query="call historicalLoad();"

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
if not tableExists:
    response = glueClient.start_crawler(Name='document-lake')

job.commit()

Running this job for the first time with job bookmarks enabled loads the historical data to the Redshift Serverless staging table and triggers the historical load stored procedure, which in turn performs the transformations we implemented using SQL and unloads the result to the S3 data lake. Subsequent runs ingest newly arrived data from the landing zone, ingests them to the staging table, and performs the upsert process, deleting and repopulating the affected partitions in the data lake.

Conclusion

Building a modern data lake that maintains a most recent view involves some edge cases that aren’t intuitive if you come from a RDBMS background. In this post, we described an AWS native approach that takes advantage of the rich features and SQL syntax of Amazon Redshift, paying only for the resources used thanks to Redshift Serverless and without using any external framework.

The introduction of Amazon Redshift Serverless unlocks the hundreds of Redshift features released every year to users that do not require a cluster that’s always up and running. You can start experimenting with this approach of managing your data lake with Redshift, as well as addressing other use cases that are now easier to solve with Redshift Serverless.


About the author

George Komninos is a solutions architect for the AWS Data Lab. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent three years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.