AWS Big Data Blog

Synchronize data lakes with CDC-based UPSERT using open table format, AWS Glue, and Amazon MSK

In the current industry landscape, data lakes have become a cornerstone of modern data architecture, serving as repositories for vast amounts of structured and unstructured data. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in a downstream system. Capturing every change from transactions in a source database and moving them to the target keeps the systems synchronized, and helps with analytics use cases and zero-downtime database migrations.

However, efficiently managing and synchronizing data within these lakes presents a significant challenge. Maintaining data consistency and integrity across distributed data lakes is crucial for decision-making and analytics. Inaccurate or outdated data can lead to flawed insights and business decisions. Businesses require synchronized data to gain actionable insights and respond swiftly to changing market conditions. Scalability is a critical concern for data lakes, because they need to accommodate growing volumes of data without compromising performance or incurring exorbitant costs.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. We use MSK connect—an AWS managed service to deploy and run Kafka Connect to build an end-to-end CDC application that uses Debezium MySQL connector to process, insert, update, and delete records from MySQL and a confluent Amazon Simple Storage Service (Amazon S3) sink connector to write to Amazon S3 as raw data that can be consumed by other downstream application for further use cases. To process batch data effectively, we use AWS Glue, a serverless data integration service that uses the Spark framework to process the data from S3 and copies the data to the open table format layer. Open table format manages large collections of files as tables and supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. We chose Delta Lake as an example open table format, but you can achieve the same results using Apache Iceberg or Apache Hudi.

The post illustrates the construction of a comprehensive CDC system, enabling the processing of CDC data sourced from Amazon Relational Database Service (Amazon RDS) for MySQL. Initially, we’re creating a raw data lake of all modified records in the database in near real time using Amazon MSK and writing to Amazon S3 as raw data. This raw data can then be used to build a data warehouse or even a special type of data storage that’s optimized for analytics, such as a Delta Lake on S3. Later, we use an AWS Glue exchange, transform, and load (ETL) job for batch processing of CDC data from the S3 raw data lake. A key advantage of this setup is that you have complete control over the entire process, from capturing the changes in your database to transforming the data for your specific needs. This flexibility allows you to adapt the system to different use cases.

This is achieved through integration with MSK Connect using the Debezium MySQL connector, followed by writing data to Amazon S3 facilitated by the Confluent S3 Sink Connector. Subsequently, the data is processed from S3 using an AWS Glue ETL job, and then stored in the data lake layer. Finally, the Delta Lake table is queried using Amazon Athena.

Note: If you require real-time data processing of the CDC data, you can bypass the batch approach and use an AWS Glue streaming job instead. This job would directly connect to the Kafka topic in MSK, grabbing the data as soon as changes occur. It can then process and transform the data as needed, creating a Delta Lake on Amazon S3 that reflects the latest updates according to your business needs. This approach ensures you have the most up-to-date data available for real-time analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this blog post. Each number represents a major component of the solution.

The workflow consists of the following:

  1. Near real-time data capture from MySQL and streaming to Amazon S3
    1. The process starts with data originating from Amazon RDS for
    2. A Debezium connector is used to capture changes to the data in the RDS instance in near real time. Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors.
    3. The captured data changes are then streamed to an Amazon MSK topic. MSK is a managed service that simplifies running Apache Kafka on AWS.
    4. The processed data stream (topic) is streamed from MSK to Amazon S3 in JSON format. The Confluent S3 Sink Connector allows near real-time data transfer from an MSK cluster to an S3 bucket.
  2. Batch processing the CDC raw data and writing it into the data lake
    1. Set up an AWS Glue ETL job to process the raw CDC
    2. This job reads bookmarked data from an S3 raw bucket and writes into the data lake in open file format (Delta). The job also creates the Delta Lake table in AWS Glue Data Catalog.
    3. Delta Lake is an open-source storage layer built on top of existing data lakes. It adds functionalities like ACID transactions and versioning to improve data reliability and manageability.
  3. Analyze the data using serverless interactive query service
    1. Athena, a serverless interactive query service, can be used to query the Delta Lake table created in Glue Data Catalog. This allows for interactive data analysis without managing infrastructure.

For this post, we create the solution resources in the us-east-1 AWS Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of the CDC pipeline and AWS Glue processing according to your use case, and if you have requirements to create specific process resources only.

  1. vpc-msk-mskconnect-rds-client.yaml – This template sets up the CDC pipeline resources such as a virtual private cloud (VPC), subnet, security group, AWS Identity and Access Management (IAM) roles, NAT, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, Amazon MSK, MSKConnect, RDS, and S3
  2. gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database and ETL

Configure MSK and MSK connect

To start, you’ll configure MKS and MSK connect using Debezium connector to capture incremental changes in table and write into Amazon S3 using an S3 sink connector. The vpc-msk-mskconnect-rds-client.yaml stack creates a VPC, private and public subnets, security groups, S3 buckets, Amazon MSK cluster, EC2 instance with Kafka client, RDS database, and MSK connectors, and its worker configurations.

  1. Launch the stack vpc-msk-mskconnect-rds-client using the CloudFormation template:
    BDB-4100-CFN-Launch-Stack
  2. Provide the parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName An environment name that is prefixed to resource names. msk-delta-cdc-pipeline
3 DatabasePassword Database admin account password. S3cretPwd99
4 InstanceType MSK client EC2 instance type. t2.micro
5 LatestAmiId Latest AMI ID of Amazon Linux 2023 for EC2 instance. You can use the default value. /aws/service/ami-amazon-linux- latest/al2023-ami-kernel-6.1-x86_64
6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
11 PrivateSubnet3CIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. 10.192.22.0/24
  1. The stack creation process can take approximately one hour to complete. Check the Outputs tab for the stack after the stack is created.

Next, you set up the AWS Glue data processing resources such as the AWS Glue database, table, and ETL job.

Implement UPSERT on an S3 data lake with Delta Lake using AWS Glue

The gluejob-setup.yaml CloudFormation template creates a database, IAM role, and AWS Glue ETL job. Retrieve the values for S3BucketNameForOutput, and S3BucketNameForScript from the vpc-msk-mskconnect-rds-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup.
    Launch Cloudformation Stack
  2. Provide parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName Environment name that is prefixed to resource names. gluejob-setup
3 GlueDataBaseName Name of the Data Catalog database. glue_cdc_blog_db
4 GlueTableName Name of the Data Catalog table. blog_cdc_tbl
5 S3BucketForGlueScript Bucket name for the AWS Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws- gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentNam e
6 GlueWorkerType Worker type for AWS Glue job. For example, G.1X G.1X
7 NumberOfWorkers Number of workers in the AWS Glue job. 3
8 S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
9 S3ConnectorTargetBucketname Bucket name where the Amazon MSK S3 sink connector writes the data from the Kafka topic. msk-lab-${AWS::AccountId}- target-bucket
  1. The stack creation process can take approximately 2 minutes to complete. Check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created an AWS Glue database and AWS Glue job. For further clarity, you can examine the AWS Glue database and job generated using the CloudFormation template.

After successfully creating the CloudFormation stack, you can proceed with processing data using the AWS Glue ETL job.

Run the AWS Glue ETL job

To process the data created in the S3 bucket from Amazon MSK using the AWS Glue ETL job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue ETL job from the GlueJobName In the following screenshot, the name is GlueCDCJob-glue-delta-cdc.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue ETL job named GlueCDCJob-glue-delta-cdc.
  3. Choose the job name to open its details page.
  4. Choose Run to start the On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template output.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

Note: We have enabled AWS Glue job bookmark, which will make sure job will process the new data in each job run.

Query the Delta Lake table using Athena

After the AWS Glue ETL job has successfully created the Delta Lake table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blog_db created using gluejob-setup stack.
  4. To validate the data, run the following query to preview the data and find the total count.
SELECT * FROM "glue_cdc_blog_db"."blog_cdc_tbl" ORDER BY cust_id DESC LIMIT 40;
SELECT COUNT(*) FROM "glue_cdc_blog_db"."blog_cdc_tbl";

The following screenshot shows the output of our example query.

Upload incremental (CDC) data for further processing

After we process the initial full load, let’s perform insert, update, and delete records in MySQL, which will be processed by the Debezium mysql connector and written to Amazon S3 using a confluent S3 sink connector.

  1. On the Amazon EC2 console, go to the EC2 instance named KafkaClientInstance that you created using the CloudFormation template.

  1. Sign in to the EC2 instance using SSM. Select KafkaClientInstance and then choose Connect.

  1. Run the following commands to insert the data into the RDS table. Use the database password from the CloudFormation stack parameter tab.
sudo su - ec2-user
RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password
  1. Now perform the insert into the CUSTOMER table.
use salesdb;
INSERT into CUSTOMER values(8887,'Customer Name 8887','Market segment 8887');
INSERT into CUSTOMER values(8888,'Customer Name 8888','Market segment 8888');
INSERT into CUSTOMER values(8889,'Customer Name 8889','Market segment 8889');

  1. Run the AWS Glue job again to update the Delta Lake table with new records.
  2. Use the Athena console to validate the data.
  3. Perform the insert, update, and delete in the CUSTOMER table.
    UPDATE CUSTOMER SET NAME='Customer Name update 8888',MKTSEGMENT='Market segment update 8888' where CUST_ID = 8888;
    UPDATE CUSTOMER SET NAME='Customer Name update 8889',MKTSEGMENT='Market segment update 8889' where CUST_ID = 8889;
    DELETE FROM CUSTOMER where CUST_ID = 8887;
    INSERT into CUSTOMER values(9000,'Customer Name 9000','Market segment 9000');
    

  4. Run the AWS Glue job again to update the Delta Lake table with the insert, update, and delete records.
  5. Use the Athena console to validate the data to verify the update and delete records in the Delta Lake table.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client.

Conclusion

Organizations continually seek high-performance, cost-effective, and scalable analytical solutions to extract value from their operational data sources in near real time. The analytical platform must be capable of receiving updates to operational data as they happen. Traditional data lake solutions often struggle with managing changes in source data, but the Delta Lake framework addresses this challenge. This post illustrates the process of constructing an end-to-end change data capture (CDC) application using Amazon MSK, MSK Connect, AWS Glue, and native Delta Lake tables, alongside guidance on querying Delta Lake tables from Amazon Athena. This architectural pattern can be adapted to other data sources employing various Kafka connectors, enabling the creation of data lakes supporting UPSERT operations using AWS Glue and native Delta Lake tables. For further insights, see the MSK Connect examples.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specializing in AWS Glue and Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.