前言
 
       对于CDC(change data capture)进行近实时的消费和计算的需求,是越来越多企业在大数据领域日益迫切的需求。比如,互联网券商在如下场景需要对数据做实时的计算:
 
        
        - 股票价格的实时变化进行账户持仓分析
- 盘中对成交价格的实时计算和分析
因为大多数的企业在现有的大数据平台上,已经搭建了T+1的大数据平台,我们希望在步改变现有的数据分析路径和逻辑的前提下:
 
        
        - 使用RDS read replica进行CDC数据变更的捕捉
- 使用MSK托管Connector进行数据CDC的发布和订阅
- 使用基于EMR的Flink进行MSK订阅数据的计算
- 进行Secret Manager, IAM 和MSK的集成
架构设计
 
       整体架构图如下所示:
 
       
 
       设计要点:
 
        
        - 开启RDS binlog使用RDS/Aurora read replica进行数据抽取,避免影响生产系统
- 托管MSK Connector,无需维护Connector实例
- 托管EMR,免运维运行Flink
- 集成AWS Secret Manager,无需明文在托管MSK Connector中暴露数据库用户名和密码
EMR版本为6.4,Flink版本为1.13
 
       整体过程
 
       AWS RDS
 
       1. 开启RDS二进制日志,以及二进制日志的留存时间
 
       call mysql.rds_set_configuration('binlog retention hours', 168);
 
       
 
       2. 编辑RDS参数组二进制日志设置为, format为ROW
 
       
 
       
 
       创建MSK
 
       这里使用同时支持IAM和明文认证的MSK集群。
 
       如下为MSK的配置,其中replication factor为3,auto create topic 为true。
 
        
        auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=24
num.replica.fetchers=4
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.hours=6
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
max.incremental.fetch.session.cache.slots=10000
 
         
       如下为MSK broker配置,采用3 az,每az 1 broker:
 
       
 
       选择上一步创建好的MSK配置:
 
       
 
       认证方式同时选择
 
        
        - Unauthenticated access (后续Flink端在测试过程中使用plaintext方式连接)
- IAM role-based authentication

 
       查看集群连接字符串:
 
       
 
       测试MSK;
 
       创建客户端配置文件client-config.properties
 
        
        # Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
 
         
       1. 使用console producer生产数据:
 
        
         
         kafka-console-consumer.sh --bootstrap-server b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --topic junjie-xxxxxx-topic --from-beginning --consumer.config client-config.properties
 
          
         
       
 
       2. 使用相同客户端配置,在另一终端消费数据
 
       
 
       至此,MSK集群创建完毕,并能够成功进行数据的生产和消费。
 
       创建托管MSK Connector
 
       1. 创建MSK Connector worker configuration
 
       MSK worker configuration是影响 Kafka Connect 集群中的配置存储、工作分配以及偏移量和任务状态存储的设置和参数, 比如对JSON数据的格式转换,这里我们会忽略元数据的转换,只集中在数据库表数据的转换。MSK connector worker configuration的配置如下:
 
        
        key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=ap-southeast-1
 
         
       其中针对schema的转换我们设置为false,对key使用StringConverter,对value的转换使用JsonConveter。
 
       因为在MSK 连接器连接数据库需要使用到用户名和密码,在本文中我们使用AWS Secret Manager来保存对应数据库的用户名和密码。
 
        
        config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
 
         
       创建完成的MSK Connector如下:
 
       
 
       2. 在Secret Manager中创建保存数据库用户名和密码的Secret
 
       
 
       3. 创建MSK Debezium Connector plugin
 
       从https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.1.Final/debezium-connector-mysql-1.6.1.Final-plugin.tar.gz下载debezioum mysql plugin
 
       解压缩之后把下载之后的plugin压缩成zip包上传到S3
 
       
 
       使用该上传的zip包制作custom plugin
 
       
 
       4. 创建MSK Connector
 
       创建MSK Connector过程中
 
        
       
 
        
        - 使用创建MSK过程中创建的MSK,并选择认证方式为IAM

 
        
        - 完善Connector Configuration

 
       完整配置如下:
 
        
         
         connector.class=io.debezium.connector.mysql.MySqlConnector
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.user=${secretManager:xxxxxx-database-secret:dbusername}
database.server.id=2234
tasks.max=1
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.security.protocol=SASL_SSL
database.history.kafka.topic=xxxxxx.quotation
database.history.kafka.bootstrap.servers=b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098
database.server.name=junjie-sgp-xxxxx
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.port=3306
include.schema.changes=true
database.hostname=junjie-sgp-xxxx.xxxxxxx.ap-southeast-1.rds.amazonaws.com
database.password=${secretManager:xxxxx-database-secret:dbpassword}
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.include.list=xxx
 
          
         
       注意这里使用AWS secret manager中的secret来连接到数据库:
 
        
        database.user=${secretManager:xxxxx-database-secret:dbusername}
database.password=${secretManager:xxxxx-database-secret:dbpassword}
 
         
       配置MSK的连接参数为IAM连接字符串
 
        
        database.history.kafka.bootstrap.servers=b-2.xxxxx-msk-flink.xxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.xxxx-msk-flink.xxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098
 
         
       选择上述过程中的worker configuration,并配置所需的MSK connector所需的角色(参考:https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)
 
       
 
       选择日志组,让connector的日志打到cloudwatch日志组:
 
       
 
       创建完成后,观察对应日志组输出:
 
       
 
       验证MSK Connector是否可以成功从RDS里消费CDC数据:
 
       查看自动创建的topics:
 
        
         
         kafka-topics.sh --bootstrap-server b-2.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --list --command-config client-config.properties
 
          
         
       
 
       消费其中一个表的CDC,其中topic的格式为<connector里的database.server.name>.<databas ename>.<table name>, 如junjie-sgp-hstong.hst.flink
 
        
         
         kafka-console-consumer.sh --bootstrap-server b-2.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-4.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098 --topic junjie-sgp-xxxxxx.xxx.flink --from-beginning --consumer.config client-config.properties
 
          
         
       于对应数据库插入测试数据,观测消费端输出:
 
       
 
       
 
       至此,MSK Connector可以成功从数据库消费数据.
 
       创建Flink on EMR
 
       Flink on EMR集群配置如下:
 
       EMR 版本: 6.4.0
 
       Flink 版本: 1.13.1
 
       本文采用单master节点的EMR。
 
       
 
       创建完成后使用master DNS登陆EMR集群:
 
       
 
       准备flink的Flink table connector,下载Source connector到/lib/flink/lib:
 
        
        curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.13.1/flink-connector-kafka_2.11-1.13.1.jar
 
         
       
 
       测试验证
 
       使用flink SQL Client进行验证
 
       登陆EMR master节点,并运行如下命令进入flink sql终端:
 
        
        cd /lib/flink && ./bin/yarn-session.sh –detached
./bin/sql-client.sh
SET execution.checkpointing.interval = 1min;
 
         
       
 
       进入Flink SQL交互客户端
 
       
 
       创建flink table,这里对应数据库里的table<flink>:
 
        
         
         CREATE TABLE Flink (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `id` BIGINT,
      `name` STRING,
      `addr` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'junjie-sgp-xxxx.xxx.flink', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database.
      'properties.bootstrap.servers' = 'b-5.xxxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-2.xxxxx-msk-flink.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9092',
      'properties.group.id' = 'xxxxx-table-group',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );
 
          
         
       
 
       执行Flink 表的查询
 
        
       
 
       在数据库进行数据插入和更新
 
       数据插入
 
       
 
       
 
       数据更新
 
       
 
       
 
       总结
 
        
        - 使用托管的MSK和MSK连接器,对RDBMS的CDC数据进行近实时的提取,减少了维护MSK和MSK连接器的运维负担。
- 使用托管的EMR+Flink,能够近实时地消费在MSK发布的CDC数据。
- 客户可以基于这一架构进行CDC数据的计算,进行流式架构的推进。
参考:
 
        
        - https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/mysql_rds_set_configuration.html
- https://catalog.us-east-1.prod.workshops.aws/workshops/c2b72b6f-666b-4596-b8bc-bafa5dcca741/en-US/mskconnect/source-connector-setup
- https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/
本篇作者