AWS Big Data Blog

Stream data with Amazon MSK Connect using an open-source JDBC connector

Customers are adopting Amazon Managed Service for Apache Kafka (Amazon MSK) as a fast and reliable streaming platform to build their enterprise data hub. In addition to streaming capabilities, setting up Amazon MSK enables organizations to use a pub/sub model for data distribution with loosely coupled and independent components.

To publish and distribute the data between Apache Kafka clusters and other external systems including search indexes, databases, and file systems, you’re required to set up Apache Kafka Connect, which is the open-source component of Apache Kafka framework, to host and run connectors for moving data between various systems. As the number of upstream and downstream applications grow, so does the complexity to manage, scale, and administer the Apache Kafka Connect clusters. To address these scalability and manageability concerns, Amazon MSK Connect provides the functionality to deploy fully managed connectors built for Apache Kafka Connect, with the capability to automatically scale to adjust with the workload changes and pay only for the resources consumed.

In this post, we walk through a solution to stream data from Amazon Relation Database Service (Amazon RDS) for MySQL to an MSK cluster in real time by configuring and deploying a connector using Amazon MSK Connect.

Solution overview

For our use case, an enterprise wants to build a centralized data repository that has multiple producer and consumer applications. To support streaming data from applications with different tools and technologies, Amazon MSK is selected as the streaming platform. One of the primary applications that currently writes data to Amazon RDS for MySQL would require major design changes to publish data to MSK topics and write to the database at the same time. Therefore, to minimize the design changes, this application would continue writing the data to Amazon RDS for MySQL with the additional requirement to synchronize this data with the centralized streaming platform Amazon MSK to enable real-time analytics for multiple downstream consumers.

To solve this use case, we propose the following architecture that uses Amazon MSK Connect, a feature of Amazon MSK, to set up a fully managed Apache Kafka Connect connector for moving data from Amazon RDS for MySQL to an MSK cluster using the open-source JDBC connector from Confluent.

Set up the AWS environment

To set up this solution, you need to create a few AWS resources. The AWS CloudFormation template provided in this post creates all the AWS resources required as prerequisites:

The following table lists the parameters you must provide for the template.

Parameter Name Description Keep Default Value
Stack name Name of CloudFormation stack. No
DBInstanceID Name of RDS for MySQL instance. No
DBName Database name to store sample data for streaming. Yes
DBInstanceClass Instance type for RDS for MySQL instance. No
DBAllocatedStorage Allocated size for DB instance (GiB). No
DBUsername Database user for MySQL database access. No
DBPassword Password for MySQL database access. No
JDBCConnectorPluginBukcetName Bucket for storing MSK Connect connector JAR files and plugin. No
ClientIPCIDR IP address of client machine to connect to EC2 instance. No
EC2KeyPair Key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your local machine to the EC2 client instance. No
EC2ClientImageId Latest AMI ID of Amazon Linux 2. You can keep the default value for this post. Yes
VpcCIDR IP range (CIDR notation) for this VPC. No
PrivateSubnetOneCIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. No
PrivateSubnetTwoCIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. No
PrivateSubnetThreeCIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. No
PublicSubnetCIDR IP range (CIDR notation) for the public subnet. No

To launch the CloudFormation stack, choose Launch Stack:

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the resource details.

Validate sample data in the RDS for MySQL instance

To prepare the sample data for this use case, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>
  2. Run the following commands to validate the data has been loaded successfully:
    $ mysql -h <rds_instance_endpoint_name> -u <user_name> -p
    
    MySQL [(none)]> use dms_sample;
    
    MySQL [dms_sample]> select mlb_id, mlb_name, mlb_pos, mlb_team_long, bats, throws from mlb_data limit 5;

Synchronize all tables’ data from Amazon RDS to Amazon MSK

To sync all tables from Amazon RDS to Amazon MSK, create an Amazon MSK Connect managed connector with the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the ZIP file named confluentinc-kafka-connect-jdbc-plugin.zip (created by the CloudFormation template) for the JDBC connector in the S3 bucket bkt-msk-connect-plugins-<aws_account_id>.
  4. For Custom plugin name, enter msk-confluent-jdbc-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

After the custom plugin has been successfully created, it will be available in Active status

  1. Choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-confluent-jdbc-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-jdbc-connector-rds-to-msk.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    ### Provide the configuration properties to connect to source and destination endpoints including authentication
    
    ### mechanism, credentials and task details such as polling interval, source and destination object names, data
    
    ### transfer mode, parallelism
    
    ### Many of these properties are connector and end-point specific, so please review the connector documentation ### for more details
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-
    
    mode=bulk
    
    connection.attempts=1
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

The following table provides a brief summary of all the preceding configuration options.

Configuration Options Description
connector.class JAVA class for the connector
connection.user User name to authenticate with the MySQL endpoint
connection.url JDBC URL identifying the hostname and port number for the MySQL endpoint
connection.password Password to authenticate with the MySQL endpoint
tasks.max Maximum number of tasks to be launched for this connector
poll.interval.ms Time interval in milliseconds between subsequent polls for each table to pull new data
topic.prefix Custom prefix value to append with each table name when creating topics in the MSK cluster
mode The operation mode for each poll, such as bulk, timestamp, incrementing, or timestamp+incrementing
connection.attempts Maximum number of retries for JDBC connection
security.protocol Sets up TLS for encryption
sasl.mechanism Identifies the SASL mechanism to use
ssl.truststore.location Location for storing trusted certificates
ssl.keystore.location Location for storing private keys
sasl.client.callback.handler.class Encapsulates constructing a SigV4 signature based on extracted credentials
sasl.jaas.config Binds the SASL client implementation

  1. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  2. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  3. For Worker configuration, select Use the MSK default configuration.
  4. Under Access permissions, choose the custom IAM role msk-connect-rds-jdbc-MSKConnectServiceIAMRole-* created earlier.
  5. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  6. For Log group, choose the log group msk-jdbc-source-connector created earlier.
  7. Choose Next.
  8. Under Review and Create, validate all the settings and choose Create connector.

After the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

Validate the data

To validate and compare the data, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>
  2. To connect to the MSK cluster with IAM authentication, enter the latest version of the aws-msk-iam-auth JAR file in the class path:
    $ export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar
  3. On the Amazon MSK console, choose Clusters in the navigation pane and choose the cluster MSKConnect-msk-connect-rds-jdbc.
  4. On the Cluster summary page, choose View client information.
  5. In the View client information section, under Bootstrap servers, copy the private endpoint for Authentication type IAM.

  1. Set up additional environment variables for working with the latest version of Apache Kafka installation and connecting to Amazon MSK bootstrap servers, where <bootstrap servers> is the list of bootstrap servers that allow connecting to the MSK cluster with IAM authentication:
    $ export PATH=~/kafka/bin:$PATH
    
    $ cp ~/aws-msk-iam-auth-1.1.0-all.jar ~/kafka/libs/.
    
    $ export BOOTSTRAP_SERVERS=<bootstrap servers>
  2. Set up a config file named client/properties to be used for authentication:
    $ cd /home/ec2-user/kafka/config/
    
    $ vi client.properties
    
    # Sets up TLS for encryption and SASL for authN.
    
    security.protocol = SASL_SSL
    
    # Identifies the SASL mechanism to use.
    
    sasl.mechanism = AWS_MSK_IAM
    
    # Binds SASL client implementation.
    
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
  3. Validate the list of topics created in the MSK cluster:
    $ cd /home/ec2-user/kafka/
    
    $ bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVERS --command-config /home/ec2-user/kafka/config/client.properties

  1. Validate that data has been loaded to the topics in the MSK cluster:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-seat --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Synchronize data using a query to Amazon RDS and write to Amazon MSK

To synchronize the results of a query that flattens data by joining multiple tables in Amazon RDS for MySQL, create an Amazon MSK Connect managed connector with the following steps:

  1. On Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the pluginmsk-confluent-jdbc-plugin-v1.
  4. For Connector name, enter msk-jdbc-connector-rds-to-msk-query.
  5. Enter an optional description.
  6. For Cluster type, select MSK cluster.
  7. For MSK clusters, select the cluster you created earlier.
  8. For Authentication, choose IAM.
  9. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-query-topic
    
    mode=bulk
    
    connection.attempts=1
    
    query=select last_name, name as team_name, sport_type_name, sport_league_short_name, sport_division_short_name from dms_sample.sport_team join dms_sample.player on player.sport_team_id = sport_team.id;
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  10. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  11. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  12. For Worker configuration, select Use the MSK default configuration.
  13. Under Access permissions, choose the custom IAM role role_msk_connect_serivce_exec_custom.
  14. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  15. For Log group, choose the log group created earlier.
  16. Choose Next.
  17. Under Review and Create, validate all the settings and choose Create connector.

Once the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

  1. For data validation, SSH into the EC2 client instance MSKEC2Client and run the following command to see the data in the topic:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-query-topic --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Search for the bucket with the naming convention bkt-msk-connect-plugins-<aws_account_id>.
  5. Delete all the folders and objects in this bucket.
  6. Delete the bucket after all contents have been removed.
  7. To delete all other resources created using the CloudFormation stack, delete the stack via the AWS CloudFormation console.

Conclusion

Amazon MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we saw how to set up the open-source JDBC connector from Confluent to stream data between an RDS for MySQL instance and an MSK cluster. We also explored different options to synchronize all the tables as well as use the query-based approach to stream denormalized data into the MSK topics.

For more information about Amazon MSK Connect, see Getting started using MSK Connect.


About the Authors

Manish Virwani is a Sr. Solutions Architect at AWS. He has more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and partners.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.