AWS Big Data Blog
Break data silos and stream your CDC data with Amazon Redshift streaming and Amazon MSK
Data loses value over time. We hear from our customers that they’d like to analyze the business transactions in real time. Traditionally, customers used batch-based approaches for data movement from operational systems to analytical systems. Batch load can run once or several times a day. A batch-based approach can introduce latency in data movement and reduce the value of data for analytics. Change Data Capture (CDC)-based approach has emerged as alternative to batch-based approaches. A CDC-based approach captures the data changes and makes them available in data warehouses for further analytics in real-time.
CDC tracks changes made in source database, such as inserts, updates, and deletes, and continually updates those changes to target database. When the CDC is high-frequency, the source database is changing rapidly, and the target database (i.e., usually a data warehouse) needs to reflect those changes in near real-time.
With the explosion of data, the number of data systems in organizations has grown. Data silos causes data to live in different sources, which makes it difficult to perform analytics.
To gain deeper and richer insights, you can bring all the changes from different data silos into one place, like data warehouse. This post showcases how to use streaming ingestion to bring data to Amazon Redshift.
Redshift streaming ingestion provides low latency, high-throughput data ingestion, which enables customers to derive insights in seconds instead of minutes. It’s simple to set up, and directly ingests streaming data into your data warehouse from Amazon Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK) without the need to stage in Amazon Simple Storage Service (Amazon S3). You can create materialized views using SQL statements. After that, using materialized-view refresh, you can ingest hundreds of megabytes of data per second.
Solution overview
In this post, we create a low-latency data replication between Amazon Aurora MySQL to Amazon Redshift Data Warehouse, using Redshift streaming ingestion from Amazon MSK. Using Amazon MSK, we securely stream data with a fully managed, highly available Apache Kafka service. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. We store CDC events in Amazon MSK, for a set duration of time, which makes it possible to deliver CDC events to additional destinations such as Amazon S3 data lake.
We deploy Debezium MySQL source Kafka connector on Amazon MSK Connect. Amazon MSK Connect makes it easy to deploy, monitor, and automatically scale connectors that move data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is a fully compatible with Apache Kafka Connect, which enables you to lift and shift your Apache Kafka Connect applications with zero code changes.
This solution uses Amazon Aurora MySQL hosting the example database salesdb
. Users of the database can perform the row-level INSERT, UPDATE, and DELETE operations to produce the change events in the example salesdb
database. Debezium MySQL source Kafka Connector reads these change events and emits them to the Kafka topics in Amazon MSK. Amazon Redshift then read the messages from the Kafka topics from Amazon MSK using Amazon Redshift Streaming feature. Amazon Redshift stores these messages using materialized views and process them as they arrive.
You can see how CDC performs create event by looking at this example here. We are going to use OP field – its mandatory string describes the type of operation that caused the connector to generate the event, in our solution for processing. In this example, c indicates that the operation created a row. Valid values for OP field are:
- c = create
- u = update
- d = delete
- r = read (applies to only snapshots)
The following diagram illustrates the solution architecture:
The solution workflow consists of the following steps:
- Amazon Aurora MySQL has a binary log (i.e., binlog) that records all operations(INSERT, UPDATE, DELETE) in the order in which they are committed to the database.
- Amazon MSK Connect runs the source Kafka Connector called Debezium connector for MySQL, reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics in amazon MSK.
- An Amazon Redshift-provisioned cluster is the stream consumer and can read messages from Kafka topics from Amazon MSK.
- A materialized view in Amazon Redshift is the landing area for data read from the stream, which is processed as it arrives.
- When the materialized view is refreshed, Amazon Redshift compute nodes allocate a group of Kafka partition to a compute slice.
- Each slice consumes data from the allocated partitions until the view reaches parity with last Offset for the Kafka topic.
- Subsequent materialized view refreshes read data from the last offset of the previous refresh until it reaches parity with the topic data.
- Inside the Amazon Redshift, we created stored procedure to process CDC records and update target table.
Prerequisites
This post assumes you have a running Amazon MSK Connect stack in your environment with the following components:
- Aurora MySQL hosting a database. In this post, you use the example database
salesdb
. - The Debezium MySQL connector running on Amazon MSK Connect, which connects Amazon MSK in your Amazon Virtual Private Cloud (Amazon VPC).
- Amazon MSK cluster
If you don’t have an Amazon MSK Connect stack, then follow the instructions in the MSK Connect lab setup and verify that your source connector replicates data changes to the Amazon MSK topics.
You should provision the Amazon Redshift cluster in same VPC of Amazon MSK cluster. If you haven’t deployed one, then follow the steps here in the AWS Documentation.
We use AWS Identity and Access Management (AWS IAM) authentication for communication between Amazon MSK and Amazon Redshift cluster. Please make sure you have created an AWS IAM role with a trust policy that allows your Amazon Redshift cluster to assume the role. For information about how to configure the trust policy for the AWS IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it’s created, the role should have the following AWS IAM policy, which provides permission for communication with the Amazon MSK cluster.
Please replace the ARN containing xxx from above example policy with your Amazon MSK cluster’s ARN.
- Also, verify that Amazon Redshift cluster has access to Amazon MSK cluster. In Amazon Redshift Cluster’s security group, add the inbound rule for MSK security group allowing port 9098. To see how to manage redshift cluster security group, refer Managing VPC security groups for a cluster.
- And, in the Amazon MSK cluster’s security group add the inbound rule allowing port 9098 for leader IP address of your Amazon Redshift Cluster, as shown in the following diagram. You can find the IP address for your Amazon Redshift Cluster’s leader node on properties tab of Amazon Redshift cluster from AWS Management Console.
Walkthrough
Navigate to the Amazon Redshift service from AWS Management Console, then set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:
- Enable_case_sensitive_identifier to true – In case you are using default parameter group for Amazon Redshift Cluster, you won’t be able to set
enable_case_sensitive_identifier
to true. You can create new parameter group withenable_case_sensitive_identifier
to true and attach it to Amazon Redshift cluster. After you modify parameter values, you must reboot any clusters that are associated with the modified parameter group. It may take few minutes for Amazon Redshift cluster to reboot.
This configuration value that determines whether name identifiers of databases, tables, and columns are case sensitive. Once done, please open a new Amazon Redshift Query Editor V2, so that config changes we made are reflected, then follow next steps.
- Create an external schema that maps to the streaming data source.
Once done, verify if you are seeing below tables created from MSK Topics:
- Create a materialized view that references the external schema.
Now, you can query newly created materialized view customer_debezium using below command.
Check the materialized view is populated with the CDC records
- REFRESH MATERIALIZED VIEW (optional). This step is optional as we have already specified
AUTO REFRESH AS YES
while creating MV (materialized view).
NOTE: Above the materialized view is auto-refreshed, which means if you don’t see the records immediately, then you have wait for few seconds and rerun the select statement. Amazon Redshift streaming ingestion view also comes with the option of a manual refresh, which allow you to manually refresh the object. You can use the following query that pulls streaming data to Redshift object immediately.
Process CDC records in Amazon Redshift
In following steps, we create the staging table to hold the CDC data, which is target table that holds the latest snapshot and stored procedure to process CDC records and update in target table.
- Create staging table: The staging table is a temporary table that holds all of the data that will be used to make changes to the target table, including both updates and inserts.
- Create target table
We use customer_target
table to load the processed CDC events.
- Create
Last_extract_time
debezium table and Inserting Dummy value.
We need to store the timestamp of last extracted CDC events. We use of debezium_last_extract
table for this purpose. For initial record we insert a dummy value, which enables us to perform a comparison between current and next CDC processing timestamp.
- Create stored procedure
This stored procedure processes the CDC records and updates the target table with the latest changes.
Test the solution
Update example salesdb
hosted on Amazon Aurora
- This will be your Amazon Aurora database and we access it from Amazon Elastic Compute Cloud (Amazon EC2) instance with
Name= KafkaClientInstance
. - Please replace the Amazon Aurora endpoint with value of your Amazon Aurora endpoint and execute following command and the
use salesdb
.
- Do an update, insert , and delete in any of the tables created. You can also do update more than once to check the last updated record later in Amazon Redshift.
- Invoke the stored procedure incremental_sync_customer created in the above steps from Amazon Redshift Query Editor v2. You can manually run proc using following command or schedule it.
call incremental_sync_customer();
- Check the target table for latest changes. This step is to check latest values in target table. You’ll see that all the updates and deletes that you did in source table are shown at top as a result order by
refresh_time
.
Extending the solution
In this solution, we showed CDC processing for the customer table, and you can use the same approach to extend it to other tables in the example salesdb
database or add more databases to MSK Connect configuration property database.include.list
.
Our proposed approach can work with any MySQL source supported by Debezium MySQL source Kafka Connector. Similarly, to extend this example to your workloads and use-cases, you need to create the staging and target tables according to the schema of the source table. Then you need to update the coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id
statements with the column names and types in your source and target tables. Like in example stated in this post, we used LZO encoding as LZO encoding, which works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. Another consideration to keep in mind while creating target and staging tables is choosing a distribution style and key based on data in source database. Here we have chosen distribution style as key with Customer_id, which are based on source data and schema update by following the best practices mentioned here.
Cleaning up
- Delete all the Amazon Redshift clusters
- Delete Amazon MSK Cluster and MSK Connect Cluster
- In case you don’t want to delete Amazon Redshift clusters, you can manually drop MV and tables created during this post using below commands:
Also, please remove inbound security rules added to your Amazon Redshift and Amazon MSK Clusters, along with AWS IAM roles created in the Prerequisites section.
Conclusion
In this post, we showed you how Amazon Redshift streaming ingestion provided high-throughput, low-latency ingestion of streaming data from Amazon Kinesis Data Streams and Amazon MSK into an Amazon Redshift materialized view. We increased speed and reduced cost of streaming data into Amazon Redshift by eliminating the need to use any intermediary services.
Furthermore, we also showed how CDC data can be processed rapidly after generation, using a simple SQL interface that enables customers to perform near real-time analytics on variety of data sources (e.g., Internet-of-Things [ IoT] devices, system telemetry data, or clickstream data) from a busy website or application.
As you explore the options to simplify and enable near real-time analytics for your CDC data,
We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.
About the Authors
Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with AWS customers to design and build real time data processing systems. He has 13 years of working experience in software engineering including architecting, designing, and developing data analytics systems.
Vishal Khatri is a Sr. Technical Account Manager and Analytics specialist at AWS. Vishal works with State and Local Government helping educate and share best practices with customers by leading and owning the development and delivery of technical content while designing end-to-end customer solutions.