AWS Big Data Blog

Use Amazon MSK Connect and Iceberg Kafka Connect to build a real-time data lake

As analytical workloads increasingly demand real-time insights, organizations need business data to enter the data lake immediately after generation. While various methods exist for real-time CDC data ingestion (such as AWS Glue and Amazon EMR Serverless), Amazon MSK Connect with Iceberg Kafka Connect provides a fully managed, streamlined approach that reduces operational complexity and enables continuous data synchronization.

In this post, we demonstrate how to use Iceberg Kafka Connect with Amazon Managed Streaming for Apache Kafka (Amazon MSK) Connect to accelerate real-time data ingestion into data lakes, simplifying the synchronization process from transactional databases to Apache Iceberg tables.

Solution overview

In this post, we show you how to implement capturing transaction log data from Amazon Relational Database Service (Amazon RDS) for MySQL and writing it to Amazon Simple Storage Service (Amazon S3) in Iceberg table format using append mode, covering both single-table and multi-table synchronization, as shown in the following figure.

Downstream consumers then process these change records to reconstruct the data state before writing to Iceberg tables.

In this solution, you use the Iceberg Kafka Sink Connector to implement the business on the sink side. The Iceberg Kafka Sink Connector has the following features:

  • Supports exactly-once delivery
  • Support multi-table synchronization
  • Support schema changes
  • Field name mapping through Iceberg’s column mapping feature

Prerequisites

Before beginning the deployment, ensure you have the following components in place:

Amazon RDS for MySQL: This solution assumes you already have an Amazon RDS for MySQL database instance running with the data you want to synchronize to your Iceberg data lake. Ensure that binary logging is enabled on your RDS instance to support Change Data Capture (CDC) operations.

Amazon MSK Cluster: You need an Amazon MSK cluster provisioned in your target AWS Region. This cluster will serve as the streaming platform between your MySQL database and the Iceberg data lake. Ensure the cluster is properly configured with appropriate security groups and network access.

Amazon S3 Bucket: Ensure you have an Amazon S3 bucket ready to host the custom Kafka Connect plugins. This bucket serves as the storage location from which AWS MSK Connect retrieves and installs your plugins. The bucket must exist in your target AWS Region, and you must have appropriate permissions to upload objects to it.

Custom Kafka Connect Plugins: To enable real-time data synchronization with MSK Connect, you need to create two custom plugins. The first plugin uses the Debezium MySQL Connector to read transactional logs and produce Change Data Capture (CDC) events. The second plugin uses Iceberg Kafka Connect to synchronize data from Amazon MSK to Apache Iceberg tables.

Build Environment: To build the Iceberg Kafka Connect plugin, you need a build environment with Java and Gradle installed. You can either launch an Amazon EC2 instance (recommended: Amazon Linux 2023 or Ubuntu) or use your local machine if it meets the requirements. Ensure you have sufficient disk space (at least 20GB) and network connectivity to clone the repository and download dependencies.

Build Iceberg Kafka Connect from open source

The connector ZIP archive is created as part of the Iceberg build. You can run the build using the following code:

git clone https://github.com/apache/iceberg.git
cd iceberg/
./gradlew -x test -x integrationTest clean build
The ZIP archive will be saved in ./kafka-connect/kafka-connect-runtime/build/distributions.

Create custom plugins

The next step is to create custom plugins to read and synchronize the data.

  1. Upload the custom plugin ZIP file you compiled in the previous step to your designated Amazon S3 bucket.
  2. Go to the AWS Management Console and navigate to Amazon MSK and choose Connect in the navigation pane.
  3. Choose Custom plugins, then select the plugin file you uploaded to S3 by browsing or entering its S3 URI.
  4. Specify a unique, descriptive name for your custom plugin (such as my-connector-v1).
  5. Choose Create custom plugin.

Configure MSK Connect

With the plugins installed, you’re ready to configure MSK Connect.

Configure data source access

Start by configuring data source access.

  1. To create a worker configuration, choose Worker configurations in the MSK Connect console.
  2. Choose Create worker configuration and copy and paste the following configuration.
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Enable topic creation by the worker
    topic.creation.enable=true
    # Default topic creation settings for debezium connector
    topic.creation.default.replication.factor=3
    topic.creation.default.partitions=1
    topic.creation.default.cleanup.policy=delete
  3. In the Amazon MSK console, choose Connectors under Amazon MSK Connect and choose Create connector.
  4. In the setup wizard, select the Debezium MySQL Connector plugin created in the previous step, enter the connector name and select the MSK cluster of the synchronization target. Copy and paste the following content in the configuration:
    
    connector.class=io.debezium.connector.mysql.MySqlConnector
    tasks.max=1
    include.schema.changes=false
    database.server.id=100000
    database.server.name=
    database.port=3306
    database.hostname=
    database.password=
    database.user=
    
    topic.creation.default.partitions=1
    topic.creation.default.replication.factor=3
    
    topic.prefix=mysqlserver
    database.include.list=
    
    ## route
    transforms=Reroute
    transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
    transforms.Reroute.topic.regex=(.*)(.*)
    transforms.Reroute.topic.replacement=$1all_records
    
    # schema.history
    schema.history.internal.kafka.topic
    schema.history.internal.kafka.bootstrap.servers=
    # IAM/SASL
    schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.history.internal.consumer.security.protocol=SASL_SSL
    schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    schema.history.internal.producer.security.protocol=SASL_SSL
    schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

    Note that in the configuration, Route is used to write multiple records to the same topic. In the parameter transforms.Reroute.topic.regex, the regular expression is configured to filter the table names that need to be written to the same topic. In the following example, the data containing <tablename-prefix> in the table name is written to the same topic.

    ## route
    transforms=Reroute
    transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
    transforms.Reroute.topic.regex=(.*)(.*)
    transforms.Reroute.topic.replacement=$1all_records

    For example, after transforms.Reroute.topic.replacement is specified as $1all_records, the topic name created in the MSK is < database.server.name>.all_records.

  5. After you choose Create, MSK Connect creates a synchronization task for you.

Data synchronization (single table mode)

Now, you can create a real-time synchronization task for the Iceberg table. Start by creating a real-time synchronization job for a single table.

  1. In the Amazon MSK console, choose Connectors under MSK Connect
  2. Choose Create connector.
  3. On the next page, select the previously created Iceberg Kafka Connect plugin
  4. Enter the connector name and select the MSK cluster of the synchronization target.
  5. Paste the following code in the configuration.
    
    connector.class=org.apache.iceberg.connect.IcebergSinkConnector
    tasks.max=1
    topics=
    iceberg.tables=
    iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.warehouse=
    iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.client.region=
    iceberg.tables.auto-create-enabled=true
    iceberg.tables.evolve-schema-enabled=true
    iceberg.control.commit.interval-ms=120000
    transforms=debezium
    transforms.debezium.type=org.apache.iceberg.connect.transforms.DebeziumTransform
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    key.converter.schemas.enable=false
    iceberg.control.topic=control-iceberg

    For Iceberg Connector, it will create a topic named control-iceberg by default to record offset. Select the previously created worker configuration that includes topic.creation.enable = true. If you use the default worker configuration and auto-topic creation isn’t enabled at the MSK broker level, the connector will not be able to automatically create topics.

    You can also specify this topic name by setting the parameter iceberg.control.topic = <offset-topic>. If you want to use a custom topic, you can use the following code.

    $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $MYBROKERS --create --topic <my-iceberg-offset-topic> --partitions 3 --replication-factor 2 --config cleanup.policy=compact
  6. Query the synchronized data results through Amazon Athena. From the table synchronized to Athena, you can see that, in addition to the source table field, an additional _cdc field has been added to store the metadata content of the CDC.

Compaction

Compaction is an essential maintenance operation for Iceberg tables. Although frequent ingestion of small files can negatively impact query performance, regular compaction mitigates this issue by consolidating small files, minimizing metadata overhead, and substantially improving query efficiency. To maintain optimal table performance, you should implement dedicated compaction workflows. AWS Glue offers an excellent solution for this purpose, providing automated compaction capabilities that intelligently merge small files and restructure table layouts for enhanced query performance.

Schema Evolution Demonstration

To demonstrate the schema evolution capabilities of this solution, we conducted a test to show how field changes at the source database are automatically synchronized to the Iceberg tables through MSK Connect and Iceberg Kafka Connect.

Initial Setup:

First, we created an RDS MySQL database with a customer information table (tb_customer_info) containing the following schema:

+----------------+--------------+------+-----+-------------------+-----------------------------------------------+
| Field          | Type         | Null | Key | Default           | Extra                                         |
+----------------+--------------+------+-----+-------------------+-----------------------------------------------+
| id             | int unsigned | NO   | PRI | NULL              | auto_increment                                |
| user_name      | varchar(64)  | YES  |     | NULL              |                                               |
| country        | varchar(64)  | YES  |     | NULL              |                                               |
| province       | mediumtext   | NO   |     | NULL              |                                               |
| city           | int          | NO   |     | NULL              |                                               |
| street_address | varchar(20)  | NO   |     | NULL              |                                               |
| street_name    | varchar(20)  | NO   |     | NULL              |                                               |
| created_at     | timestamp    | NO   |     | CURRENT_TIMESTAMP | DEFAULT_GENERATED                             |
| updated_at     | timestamp    | YES  |     | CURRENT_TIMESTAMP | DEFAULT_GENERATED on update CURRENT_TIMESTAMP |
+----------------+--------------+------+-----+-------------------+-----------------------------------------------+

We then configured MSK Connect using the Debezium MySQL Connector to capture changes from this table and stream them to Amazon MSK in real time. Following that, we set up Iceberg Kafka Connect to consume the data from MSK and write it to Iceberg tables.

Schema Modification Test:

To test the schema evolution capability, we added a new field named phone to the source table:

ALTER TABLE tb_customer_info ADD COLUMN phone VARCHAR(20) NULL;

We then inserted a new record with the phone field populated:

INSERT INTO tb_customer_info (user_name,country,province,city,street_address,street_name,phone) values ('user_demo','China','Guangdong',755,'Street1 No.369','Street1','13099990001');

Results:

When we queried the Iceberg table in Amazon Athena, we observed that the phone field had been automatically added as the last column, and the new record was successfully synchronized with all field values intact. This demonstrates that Iceberg Kafka Connect’s self-adaptive schema capability seamlessly handles DDL changes at the source, eliminating the need for manual schema updates in the data lake.

Data synchronization (multi-table mode)

It’s common that data admins want to use a single connector for moving data in multiple tables. For example, you can use the CDC collection tool to write data from multiple tables to a topic and then write data from one topic to multiple Iceberg tables through the consumer side. In Configure data source access, you configured a MySQL synchronization Connector to synchronize tables with specified rules to a topic using Route. Now let’s review how to distribute data from this topic to multiple Iceberg tables.

  1. When using Iceberg Kafka Connect to synchronize multiple tables to Iceberg tables using AWS Glue Data Catalog, you must pre-create a database in the Data Catalog before starting the synchronization process. The database name in AWS Glue must exactly match the source database name, because the Iceberg Kafka Connect connector automatically uses the source database name as the target database name during multi-table synchronization. This naming consistency is required because the connector doesn’t provide an option to map source database names to different target database names in multi-table scenarios.
  2. If you want to use your custom topic name, you can create a new topic to store the MSK Connect record offset, see Data synchronization (single table mode).
  3. In the Amazon MSK console, create another connector using the following configuration.
    connector.class= org.apache.iceberg.connect.IcebergSinkConnector
    tasks.max=2
    topics=
    iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.warehouse=
    iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.client.region=
    iceberg.tables.auto-create-enabled=true
    iceberg.tables.evolve-schema-enabled=true
    iceberg.control.commit.interval-ms=120000
    transforms=debezium
    transforms.debezium.type=org.apache.iceberg.connect.transforms.DebeziumTransform
    iceberg.tables.route-field=_cdc.source
    iceberg.tables.dynamic-enabled=true
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    key.converter.schemas.enable=false
    iceberg.control.topic=control-iceberg

    In this configuration, two parameters have been added:

    • iceberg.tables.route-field: Specifies the routing field that distinguishes between different tables, specified as cdc.source for CDC data parsed by Debezium
    • iceberg.tables.dynamic-enabled: If the iceberg.tables parameter isn’t set, it must be specified as true here
  4. After completion, MSK Connect will creates a sink connector for you.
  5. After the process is complete, you can view the newly created table through Athena.

Other tips

In this section, we share some more things that you can use to customize your deployment to fit your use case.

  • Specified table synchronizationIn the Data synchronization (multi-table mode) section, you specify iceberg.tables.route-field = _cdc.Source and iceberg.tables.dynamic-enabled=true, these two parameter settings can write multiple tables stored in the Iceberg table. If you want to synchronize only the specified tables, you can specify the table name you want to synchronize by setting iceberg.tables.dynamic-enabled = false and then setting the iceberg.tables parameter. For example,
    iceberg.tables.dynamic-enabled = false
    iceberg.tables = default.tablename1,default.tablename2
     
    iceberg.table.default.tablename1.route-regex = tablename1
    iceberg.table.default.tablename2.route-regex = tablename2
  • Performance Testing Results
    We conducted a performance test using sysbench to evaluate the data synchronization capabilities of this solution. The test simulated a high-volume write scenario to demonstrate the system’s throughput and scalability.Test Configuration:

    1. Database setup: Created 25 tables in the MySQL database using sysbench
    2. Data loading: Wrote 20 million records to each table (500 million total records)
    3. Real-time streaming: Configured MSK Connect to stream data from MySQL to Amazon MSK in real time during the write process
    4. Kafka Connect configuration:
      • Started Kafka Iceberg Connect
      • Minimum workers: 1
      • Maximum workers: 8
      • Allocated two MCUs per worker

    Performance Results:

    In our test using the configuration above, each MCU achieved peak writing performance of approximately 10,000 records per second, as shown in the following figure. This demonstrates the solution’s ability to handle high-throughput data synchronization workloads effectively.

Clean up

To clean up your resources, complete the following steps:

  1. Delete MSK Connect connectors: Remove both the Debezium MySQL Connector and Iceberg Kafka Connect connector created for this solution.
  2. Delete the Amazon MSK cluster: If you created a new MSK cluster specifically for this demonstration, delete it to stop incurring charges.
  3. Delete the S3 buckets: Remove the S3 buckets used to store the custom Kafka Connect plugins and Iceberg table data. Ensure you have backed up any data you need before deletion.
  4. Delete the EC2 instance: If you launched an EC2 instance to build the Iceberg Kafka Connect plugin, terminate it.
  5. Delete the RDS MySQL instance (optional): If you created a new RDS instance specifically for this demonstration, delete it. If you’re using an existing production database, skip this step.
  6. Remove IAM roles and policies (if created): Delete any IAM roles and policies that were created specifically for this solution to maintain security best practices.

Conclusion

In this post, we presented a solution to achieve real-time, efficient data synchronization from transactional databases to data lakes using Amazon MSK Connect and Iceberg Kafka Connect. This solution provides a low-cost and efficient data synchronization paradigm for enterprise-level big data analysis. Whether you’re working with ecommerce transactions, financial transactions, or IoT device logs, this solution can help you achieve quick access to a data lake, enabling analytical businesses to quickly obtain the latest business data. We encourage you to try this solution in your own environment and share your experiences in the comments section. For more information, visit Amazon MSK Connect.


About the author

Huang Xiao

Huang Xiao

Huang is a Senior Specialist Solution Architect with Analytics at AWS. He focuses on big data solution architecture design, with years of experience in development and architectural design within the big data field.