Load ongoing data lake changes with AWS DMS and AWS Glue
July 2022: This blog post was reviewed and updated with an additional AWS CloudFormation stack to deploy MySQL database.
Building a data lake on Amazon S3 provides an organization with countless benefits. It allows you to access diverse data sources, determine unique relationships, build AI/ML models to provide customized customer experiences, and accelerate the curation of new datasets for consumption. However, capturing and loading continuously changing updates from operational data stores—whether on-premises or on AWS—into a data lake can be time-consuming and difficult to manage.
The following post demonstrates how to deploy a solution that loads ongoing changes from popular database sources—such as Oracle, SQL Server, PostgreSQL, and MySQL—into your data lake. The solution streams new and changed data into Amazon S3. It also creates and updates appropriate data lake objects, providing a source-similar view of the data based on a schedule you configure. The AWS Glue Data Catalog then exposes the newly updated and de-duplicated data for analytics services to use.
I divide this solution into two AWS CloudFormation stacks. You can download the AWS CloudFormation templates I reference in this post from a public S3 bucket, or you can launch them using the links featured later. You can likewise download the AWS Glue jobs referenced later in this post.
The first stack contains reusable components. You only have to deploy it one time. It launches the following AWS resources:
- AWS Glue jobs: Manages the workflow of the load process from the raw S3 files to the de-duped and optimized parquet files.
- Amazon DynamoDB table: Persists the state of data load for each data lake table.
- IAM role: Runs these services and accesses S3. This role contains policies with elevated privileges. Only attach this role to these services and not to IAM users or groups.
- AWS DMS replication instance: Runs replication tasks to migrate ongoing changes via AWS DMS.
The second stack contains objects that you should deploy for each source you bring in to your data lake. It launches the following AWS resources:
- AWS DMS replication task: Reads changes from the source database transaction logs for each table and stream that write data into an S3 bucket.
- S3 buckets: Stores raw AWS DMS initial load and update objects, as well as query-optimized data lake objects.
- AWS Glue trigger: Schedules the AWS Glue jobs.
- AWS Glue crawler: Builds and updates the AWS Glue Data Catalog on a schedule.
The AWS CloudFormation stack requires that you input parameters to configure the ingestion and transformation pipeline:
- DMS source database configuration: The database connection settings that the DMS connection object needs, such as the DB engine, server, port, user, and password.
- DMS task configuration: The settings the AWS DMS task needs, such as the replication instance ARN, table filter, schema filter, and the AWS DMS S3 bucket location. The table filter and schema filter allow you to choose which objects the replication task syncs.
- Data lake configuration: The settings your stack passes to the AWS Glue job and crawler, such as the S3 data lake location, data lake database name, and run schedule.
After you deploy the solution, the AWS CloudFormation template starts the DMS replication task and populates the DynamoDB controller table. Data does not propagate to your data lake until you review and update the DynamoDB controller table.
In the DynamoDB console, configure the following fields to control the data load process shown in the following table:
|ActiveFlag||Required. When set to true, it enables this table for loading.|
|PrimaryKey||A comma-separated list of column names. When set, the AWS Glue job uses these fields for processing update and delete transactions. When set to “null,” the AWS Glue job only processes inserts.|
|PartitionKey||A comma-separated list of column names. When set, the AWS Glue job uses these fields to partition the output files into multiple subfolders in S3. Partitions can be valuable when querying and processing larger tables but may overcomplicate smaller tables. When set to “null,” the AWS Glue job only loads data into one partition.|
|LastFullLoadDate||The data of the last full load. The AWS Glue job compares this to the date of the DMS-created full load file. Setting this field to an earlier value triggers AWS Glue to reprocess the full load file.|
|LastIncrementalFile||The file name of the last incremental file. The AWS Glue job compares this to any new DMS-created incremental files. Setting this field to an earlier value triggers AWS Glue to reprocess any files with a larger name.|
At this point, the setup is complete. At the next scheduled interval, the AWS Glue job processes any initial and incremental files and loads them into your data lake. At the next scheduled AWS Glue crawler run, AWS Glue loads the tables into the AWS Glue Data Catalog for use in your down-stream analytical applications.
Amazon Athena and Amazon Redshift
Your pipeline now automatically creates and updates tables. If you use Amazon Athena, you can begin to query these tables right away. If you use Amazon Redshift, you can expose these tables as an external schema and begin to query.
You can analyze these tables directly or join them to tables already in your data warehouse, or use them as inputs to an extract, transform, and load (ETL) process. For more information, see Creating External Schemas for Amazon Redshift Spectrum.
AWS Lake Formation
At the time of writing this post, AWS Lake Formation has been announced but not released. AWS Lake Formation makes it easy to set up a secure data lake. To incorporate Lake Formation in this solution, add the S3 location specified during launch as a “data lake storage” location and use Lake Formation to vend credentials to your IAM users.
AWS Lake Formation eliminates the need to grant S3 access via user, group, or bucket policies and instead provides a centralized console for granting and auditing access to your data lake.
A few built-in AWS CloudFormation key configurations make this solution possible. Understanding these features helps you replicate this strategy for other purposes or customize the application for your needs.
- The first AWS CloudFormation template deploys an AWS DMS replication instance. Before launching the second AWS CloudFormation template, ensure that the replication instance connects to your on-premises data source.
- The AWS DMS endpoint for the S3 target has an extra connection attribute:
addColumnName=true. This attribute tells DMS to add column headers to the output files. The process uses this header to build the metadata for the parquet files and the AWS Glue Data Catalog.
- When the AWS DMS replication task begins, the initial load process writes files to the following location:
s3://<bucket>/<schema>/<table>/. It writes one file per table for the initial load named
LOAD00000001.csv. It writes up to one file per minute for any data changes named
<datetime>.csv. The load process uses these file names to process new data incrementally.
- The AWS DMS change data capture (CDC) process adds an additional field in the dataset “Op.” This field indicates the last operation for a given key. The change detection logic uses this field, along with the primary key stored in the DynamoDB table, to determine which operation to perform on the incoming data. The process passes this field along to your data lake, and you can see it when querying data.
- The AWS CloudFormation template deploys two roles specific to DMS (DMS-CloudWatch-logs-role, DMS-VPC-role) that may already be in place if you previously used DMS. If the stack fails to build because of these roles, you can safely remove these roles from the template.
- AWS Glue has two types of jobs: Python shell and Apache Spark. The Python shell job allows you to run small tasks using a fraction of the compute resources and at a fraction of the cost. The Apache Spark job allows you to run medium- to large-sized tasks that are more compute- and memory-intensive by using a distributed processing framework. This solution uses the Python shell jobs to determine which files to process and to maintain the state in the DynamoDB table. It also uses Spark jobs for data processing and loading.
- As changes stream in from your relational database, you may see new transactions appear as new files within a given folder. This load process behavior minimizes the impact on already loaded data. If this causes inconsistency in your file sizes or query performance, consider incorporating a compaction (file merging) process.
- Between job runs, AWS Glue sequences duplicate transactions to the same primary key (for example, insert, then update) by file name and order. It determines the last transaction and uses it to re-write the impacted object to S3.
- Configuration settings allow the Spark-type AWS Glue jobs a maximum of two DPUs of processing power. If your load jobs underperform, consider increasing this value. Increasing the job DPUs is most effective for tables set up with a partition key or when the DMS process generates multiple files between executions.
- If your organization already has a long-running Amazon EMR cluster in place, consider replacing the AWS Glue jobs with Apache Spark jobs running within your EMR cluster to optimize your expenses.
- The solution deploys an IAM role named
DMSCDC_Execution_Role. The role is attached to AWS services and is associated with AWS managed policies as well as an inline policy.
AssumeRolePolicyDocumenttrust document for the role includes the following policies, which attach to the AWS Glue and AWS DMS services to ensure that the jobs have the necessary permissions to execute. AWS CloudFormation custom resources also use this role, backed by AWS Lambda, to initialize the environment.
- The IAM role includes the following AWS managed policies. For more information, see Managed Policies and Inline Policies.
- The IAM role includes the following inline policy. This policy includes permissions to execute the Lambda-backed AWS CloudFormation custom resources, initialize and manage the DynamoDB table, and initialize the DMS replication task.
In order for a database to be a candidate for DMS as a source it needs to meet certain requirements whether it’s managed through Amazon RDS or if it’s an Amazon EC2 or on-premise installation. Launch the following AWS CloudFormation stack to deploy a MySQL database that has been pre-configured to meet these requirements and which is pre-loaded. Be sure to choose the same VPC/Subnet/Security Group as your DMS Replication Instance to ensure they have network connectivity. The following example illustrates what you see after deploying this solution using a sample database.
The sample database includes three tables: product, store, and productorder. After deploying the AWS CloudFormation templates, you should see a folder created for each table in your raw S3 bucket.
Each folder contains an initial load file.
The table list populates the DynamoDB table.
Set the active flag, primary key, and partition key values for these tables. In this example, I set the primary key for the product and store tables to ensure it processes the updates. I leave the primary key for the productorder table alone, because I do not expect update transactions. However, I set the partition key to ensure it partitions data by date.
When the next scheduled AWS Glue job runs, it creates a folder for each table in your data lake S3 bucket.
When the next scheduled AWS Glue crawler runs, your AWS Glue Data Catalog lists these tables. You can now query them using Athena.
Similarly, you can query the data lake from within your Amazon Redshift cluster after first cataloging the external database.
On subsequent AWS Glue job runs, the process compares the timestamp of the initial file with the “LastFullLoadDate” field in the DynamoDB table to determine if it should process the initial file again. It also compares the new incremental file names with the “LastIncrementalFile” field in the DynamoDB table to determine if it should process any incremental files. In the following example, it created a new incremental file for the product table.
Examining the file shows two transactions: an update and a delete.
When the AWS Glue job runs again, the DynamoDB table updates to list a new value for the “LastIncrementalFile.”
Finally, the solution reprocesses the parquet file. You can query the data to see the new values for the updated record and ensure that it removes the deleted record.
In this post, I provided a set of AWS CloudFormation templates that allow you to quickly and easily sync transactional databases with your AWS data lake. With data in your AWS data lake, you can perform analysis on data from multiple data sources, build machine learning models, and produce rich analytics for your data consumers.
If you have questions or suggestions, please comment below.
About the Author
Rajiv Gupta is a data warehouse specialist solutions architect with Amazon Web Services.