亚马逊AWS官方博客
使用 RisingWave 实现 MSK,Kinesis,RDS MySQL 实时数据同步
流式计算(Stream Processing)是一种数据处理模式,与传统的批处理(Batch Processing)不同,流式计算实时处理连续不断的数据流。在这种模式下,数据在到达时即被处理,而不是在数据全部收集完毕后再进行处理。流式计算通常用于需要低延迟和实时响应的应用场景,比如实时分析、监控、金融风控、物联网数据处理等。
背景
在大数据领域,常见的流式处理框架包括Apache Spark Streaming、Apache Flink等。然而根据开源社区的统计,用户在使用流式系统时,可能只有20%的时间真正花在了开发应用、实现业务逻辑上,而剩下的80%的时间都用来处理各种由系统带来的障碍、运维、调优等等,极大降低了开发的效率。本文通过具体使用案例,介绍了一种新的分布式架构的 SQL 流式数据库 – RisingWave。其能够提供增量更新、一致性的物化视图——这是一种持久的数据结构,承载了流处理的结果。通过以上特性,RisingWave让开发人员能够通过级联物化视图来表达复杂的流处理逻辑,从而大大降低了构建流处理应用的难度。同时,RisingWave演进出了利用流来实现数据库间的实时复制功能,本文将会进行探索以及尝试,提供另外一种数据库之间复制的实现方案。
Demo案例架构
RisingWave作为一款开源和商用兼具的产品,其产品包含内容已经相当广泛,本文不能一一覆盖,因此测试只选取最具代表性的场景对其产品特性进行具像化的解释,感兴趣的读者可以参考RisingWave 官网或者Github官方社区在本文的基础上进一步挖掘感兴趣的内容。
架构说明,本案例通过在AWS 原生EKS 上部署RisingWave 2.02 版本为例,介绍了如下场景内容:
- AWS 原生 EKS 上部署安装 RisingWave集群。
- AWS 托管 MSK 接入 RisingWave 集群,并建立相应的表并导入数据。
- AWS 托管 Kinesis 接入到 RisingWave 集群,并建立相应的表并导入数据。
- 以AWS 托管 RDS MySQL 为例,建立源数据库以及目标数据库并实现数据库之间整体以及 CDC 复制。
![]() |
以上架构图说明 RisingWave 部署在 AWS EKS 的大概逻辑,分两个部分,其一是支撑 Risingwave 运行的底层架构,包括 AWS原生的网络,存储,安全以及计算资源。其二是可以作为RisingWave 上下游的原生服务,比如上游的MSK( Amazon kafka),Kinesis,Amazon RDS 以及下游的 DynamoDB , Amazon RDS,Amazon Cache RDS等。同时,架构图列出了 Risingwave 集群上下游( Sources/Sinks )支持AWS原生服务,详细内容可以参考文中最后的附录。
测试部署及测试步骤
EKS集群搭建以及RisingWave 集群构建
AWS EKS 搭建可以参考AWS文档:
https://docs.aws.amazon.com/zh_cn/eks/latest/userguide/quickstart.html
具体EC2 实例这里选择 m5.4xlarge,如下图:
![]() |
本案例需要配置插件请参考如下文档:
https://docs.aws.amazon.com/zh_cn/eks/latest/userguide/eks-add-ons.html
RisingWave 采用官方推荐Helm方式创建:
https://docs.risingwave.com/deploy/risingwave-k8s-helm
RisingWave和Amazon EKS 集成很好,在Amazon EKS准备到位的情况下一条命令就能安装完成,如下:
注意,使用 helm部署的时候,其中用到 RisingWave 提供 values.yaml 文件进行部署,其中有两个部分,需要注意,如下:
这部分是指定 外在 的 MySQL数据库作为 metaStore,我这里选择和EKS在相同区域的 Amazon RDS作为存储,这样做的好处是,使用 AWS托管数据库本身的冗余能力提供外部存储,其状态不会随着EKS 集群的变化而变化,保持状态稳定。同时,需要保证EKS集群中的 node 和 RDS 之间的连通性可达。我这里将Amazon RDS 和 EKS 部署在相同的VPC中,减少延迟保证性能,实际生产环境中可以根据需要进行适当的调整。
这一部分是 statstore 存储参数的相关设定,我这里选择 AWS S3 作为状态存储的路径,这里为了测试方便,我直接选择使用配置Access Key访问,实际生产环境中最好使用ServiceAccount 进行访问,满足 Amazon EKS 使用的最佳实践。需要注意的是,如果多次安装使用相同的S3 bucket存储数据的话,每次需要清空原有S3的旧数据,避免旧有数据对新环境的影响。同时AK-SK 或者 serviceAccount 执行的Role 应该有对 S3 bucket的读写 访问权限。
部署完成之后可以使用如下命令检查 RisingWave 集群的状态,并连接到 RisingWave 集群
到这一步,说明RisingWave 集群在 EKS 上部署完成,我们进入下一阶段测试。
使用MSK作为RisingWave 数据源
建立托管 MSK 集群配置如下:
![]() |
![]() |
这里需要注意,保证 MSK 集群和RisingWave 集群连通性,除了考虑 VPC 之外,还需要保证安全组配置正确。参考如下AWS 官方配置文档:
https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/getting-started.html
参考如下文档配置 Amazon MSK 连接:
https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-password-tutorial.html
使用psql 连接risingwave 集群,执行如下建表语句:
从客户端发起向 MSK 写入数据,参考如下形式:
在RisingWave 集群中验证jingamz 表中的数据,如下:
以上可见, 源端Kafka 集群实时写入5条数据,马上就在 Risingwave 集群中以关系数据库表的形式存储。 同时,RisingWave 的表支持 SQL 语句的聚合查询,可以实时统计数据流的统计数据。 整个过程支持标准的SQL,只要使用 psql 客户端联入运行即可,完全不需要用代码进行聚合计算统计,有使用SQL的经验即可,学习曲线极其平缓。
使用AWS Kinesis 作为数据源
AWS kinesis data stream 配置如下:
![]() |
RisingWave 建立读取 Kinesis的table 如下:
客户端模拟 kinesis 数据写入如下:
数据同步到 RisingWave 表中,可以根据表中数据进行聚合。
Amazon RDS 之间进行CDC数据同步
Amazon RDS 数据库设置如下:
![]() |
在RisingWave 集群中建立 RDS source,并建立基于Source 的Table 以及sink的目标,注意这里的RDS的源表是 Database jing 中的表 people, sink的目标是database jing中的表 people_rw 。这里为了测试方便 源表和目标表位于相同数据库中。 生产环境中可以根据实际配置进行调整。
在客户端中模拟数据写入到原始数据表中:
检测源表中的数据量,如下:
检测 RisingWave 集群中people_rw 中的数据量,如下:
同时检查 目标 Amazon RDS 数据库中的数据量,如下:
此时,源数据表,RisingWave 集群中的表,以及目标数据库表总数据一致,说明CDC 复制成功。
继续在RisingWave table中进行数据的更改,如下语句:
红体字部分是修改替代的邮箱信息,此时再次检查 目标数据库中对应数据的内容:
注意此时目标库中的数据已经随RisingWave 集群中的内容修改过来。 通过这种方式,能够很方便的在数据CDC同步过程中对内容进行修改。这是传统的CDC工具不能做到的。 同样的,类似的方式也可以进行流式处理的数据内容进行修改。
结论以及补充
通过以上测试步骤,可以清楚看到 RisingWave 作为云原生 分布式 SQL 流式数据库,除了能够简便部署在 Amazon EKS 服务上之外,还能很好的和 AWS 托管的RDS数据库,流式托管服务 MSK 以及 Kinesis 良好结合稳定运行。同时根据RisingWave 官网描述,其上下游还能集成包括Starrocks在内的多种流行分析数据库,能够为客户提供多种应用场景的适配,紧密结合业务。
另外RisingWave 除了提供开源版本之外,还提供 Premium Edition,给到客户更丰富的功能以及更强大的支持服务,具体参见如下:
https://docs.risingwave.com/docs/current/rw-premium-edition-intro/
附录
有关 RisingWave 流式数据库的具体介绍,可以参考官网链接:
https://docs.risingwave.com/get-started/intro
有关RisingWave 流式数据库支持的源/目标,可以参考官网链接:
https://docs.risingwave.com/integrations/overview
有关RisingWave 流式数据库和 Flink 功能以及性能的对比,可以参考官网链接:
https://zh-cn.risingwave.com/docs/current/risingwave-flink-comparison/