流式计算(Stream Processing)是一种数据处理模式,与传统的批处理(Batch Processing)不同,流式计算实时处理连续不断的数据流。在这种模式下,数据在到达时即被处理,而不是在数据全部收集完毕后再进行处理。流式计算通常用于需要低延迟和实时响应的应用场景,比如实时分析、监控、金融风控、物联网数据处理等。
 
       背景
 
       在大数据领域,常见的流式处理框架包括Apache Spark Streaming、Apache Flink等。然而根据开源社区的统计,用户在使用流式系统时,可能只有20%的时间真正花在了开发应用、实现业务逻辑上,而剩下的80%的时间都用来处理各种由系统带来的障碍、运维、调优等等,极大降低了开发的效率。本文通过具体使用案例,介绍了一种新的分布式架构的 SQL 流式数据库 – RisingWave。其能够提供增量更新、一致性的物化视图——这是一种持久的数据结构,承载了流处理的结果。通过以上特性,RisingWave让开发人员能够通过级联物化视图来表达复杂的流处理逻辑,从而大大降低了构建流处理应用的难度。同时,RisingWave演进出了利用流来实现数据库间的实时复制功能,本文将会进行探索以及尝试,提供另外一种数据库之间复制的实现方案。
 
       Demo案例架构
 
       RisingWave作为一款开源和商用兼具的产品,其产品包含内容已经相当广泛,本文不能一一覆盖,因此测试只选取最具代表性的场景对其产品特性进行具像化的解释,感兴趣的读者可以参考RisingWave 官网或者Github官方社区在本文的基础上进一步挖掘感兴趣的内容。
 
       架构说明,本案例通过在AWS 原生EKS 上部署RisingWave 2.02 版本为例,介绍了如下场景内容:
 
        
        - AWS 原生 EKS 上部署安装 RisingWave集群。
- AWS 托管 MSK 接入 RisingWave 集群,并建立相应的表并导入数据。
- AWS 托管 Kinesis 接入到 RisingWave 集群,并建立相应的表并导入数据。
- 以AWS 托管 RDS MySQL 为例,建立源数据库以及目标数据库并实现数据库之间整体以及 CDC 复制。
以上架构图说明 RisingWave 部署在 AWS EKS 的大概逻辑,分两个部分,其一是支撑 Risingwave 运行的底层架构,包括 AWS原生的网络,存储,安全以及计算资源。其二是可以作为RisingWave 上下游的原生服务,比如上游的MSK( Amazon kafka),Kinesis,Amazon RDS 以及下游的 DynamoDB , Amazon RDS,Amazon Cache RDS等。同时,架构图列出了 Risingwave 集群上下游( Sources/Sinks )支持AWS原生服务,详细内容可以参考文中最后的附录。
 
       测试部署及测试步骤
 
       EKS集群搭建以及RisingWave 集群构建
 
       AWS EKS 搭建可以参考AWS文档:
 
       https://docs.aws.amazon.com/zh_cn/eks/latest/userguide/quickstart.html
 
       具体EC2 实例这里选择 m5.4xlarge,如下图:
 
        
       本案例需要配置插件请参考如下文档:
 
       https://docs.aws.amazon.com/zh_cn/eks/latest/userguide/eks-add-ons.html
 
       RisingWave 采用官方推荐Helm方式创建:
 
       https://docs.risingwave.com/deploy/risingwave-k8s-helm
 
       RisingWave和Amazon EKS 集成很好,在Amazon EKS准备到位的情况下一条命令就能安装完成,如下:
 
        
        <p>[ec2-user@ip-1
 
         
         [ec2-user@ip-172-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave
NAME: my-risingwave
LAST DEPLOYED: Sun Nov 10 12:33:51 2024
NAMESPACE: risingwave
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Welcome to fast and modern stream processing with RisingWave!
Check the running status with the following command:
    kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
Try accessing the SQL console with the following command:
    kubectl -n risingwave port-forward svc/my-risingwave 4567:svc
Keep the above command running and open a new terminal window to run the following command:
    psql -h localhost -p 4567 -d dev -U root 
For more advanced applications, refer to our documentation at: https://www.risingwave.dev
 
          
        
72-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave<br />NAME: my-risingwave<br />LAST DEPLOYED: Sun Nov 10 12:33:51 2024<br />NAMESPACE: risingwave<br />STATUS: deployed<br />REVISION: 1<br />TEST SUITE: None<br />NOTES:<br />Welcome to fast and modern stream processing with RisingWave!</p><p>Check the running status with the following command:</p><p>kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave</p><p>Try accessing the SQL console with the following command:</p><p>kubectl -n risingwave port-forward svc/my-risingwave 4567:svc</p><p>Keep the above command running and open a new terminal window to run the following command:</p><p>psql -h localhost -p 4567 -d dev -U root</p><p>For more advanced applications, refer to our documentation at: https://www.risingwave.dev</p>
 
         
       注意,使用 helm部署的时候,其中用到 RisingWave 提供 values.yaml 文件进行部署,其中有两个部分,需要注意,如下:
 
        
        ## @section metaStore.mysql Values for MySQL backend.
mysql:
## @param metaStore.mysql.enabled Enable/disable the meta store backend.
##
enabled: true
## @param metaStore.mysql.host Host of the MySQL server.
##
host: "risingwave.cluster-XXXXXXX.us-east-1.rds.amazonaws.com"
## @param metaStore.mysql.port Port of the MySQL server. Defaults to 3306.
##
port: 3306
## @param metaStore.mysql.database Database of the MySQL server.
##
database: "risingwave"
## @param metaStore.mysql.options Options to connect.
##
options: {}
## @param metaStore.mysql.authentication Authentication information.
##
authentication:
## @param metaStore.mysql.authentication.username Username to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
username: "XXXXXX"
## @param metaStore.mysql.authentication.password Password to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
password: "XXXXXXXX"
## @param metaStore.mysql.authentication.existingSecretName Existing secret name.
## The Secret must contain `username` and `password` keys.
## If it is specified, username and password above are ignored.
##
existingSecretName: ""
 
         
       这部分是指定 外在 的 MySQL数据库作为 metaStore,我这里选择和EKS在相同区域的 Amazon RDS作为存储,这样做的好处是,使用 AWS托管数据库本身的冗余能力提供外部存储,其状态不会随着EKS 集群的变化而变化,保持状态稳定。同时,需要保证EKS集群中的 node 和 RDS 之间的连通性可达。我这里将Amazon RDS 和 EKS 部署在相同的VPC中,减少延迟保证性能,实际生产环境中可以根据需要进行适当的调整。
 
        
        stateStore:
## @param stateStore.dataDirectory RisingWave state store data directory.
## Must be a relative path. Max length is 800 characters.
## Default to "hummock" if set to empty.
##
dataDirectory: "hummock"
## @section stateStore.s3 S3 values.
##
s3:
## @param stateStore.s3.enabled Use S3 state store. Only one state store backend can be enabled.
##
enabled: true
## @param stateStore.s3.endpoint S3 endpoint URL for S3-compatible object storages.
##
endpoint: ""
## @param stateStore.s3.region S3 region.
##
region: "us-east-1"
## @param stateStore.s3.bucket S3 bucket.
##
bucket: "XXXXXXXXXXXX"
## @param stateStore.s3.forcePathStyle Enforce path style requests. If the value is false, the path could be one
## of path style and virtual hosted style, depending on the endpoint format. For more details of the two styles,
## please refer to the documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html
##
forcePathStyle: false
authentication:
## @param stateStore.s3.authentication.useServiceAccount Use S3 service account authentication.
## If enabled, accessKey and secretAccessKey will be ignored.
## Otherwise, accessKey and secretAccessKey are required and will be stored in a Secret.
##
useServiceAccount: false
## @param stateStore.s3.authentication.accessKey S3 access key.
##
accessKey: "AKXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.secretAccessKey S3 secret access key.
##
secretAccessKey: "XXXXXXXXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.existingSecretName Use existing Secret for S3 authentication.
## If set, use the existing Secret instead of creating a new one.
## Secret must contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY keys.
##
existingSecretName: ""
 
         
       这一部分是 statstore  存储参数的相关设定,我这里选择 AWS S3 作为状态存储的路径,这里为了测试方便,我直接选择使用配置Access Key访问,实际生产环境中最好使用ServiceAccount 进行访问,满足 Amazon EKS 使用的最佳实践。需要注意的是,如果多次安装使用相同的S3 bucket存储数据的话,每次需要清空原有S3的旧数据,避免旧有数据对新环境的影响。同时AK-SK 或者 serviceAccount 执行的Role 应该有对 S3 bucket的读写 访问权限。
 
       部署完成之后可以使用如下命令检查 RisingWave 集群的状态,并连接到 RisingWave 集群
 
        
        [ec2-user@ip-172-31-62-218 ~]$ kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
NAME                                      READY   STATUS    RESTARTS   AGE
my-risingwave-compactor-d778ddb5d-wjhkf   1/1     Running   0          41h
my-risingwave-compute-0                   1/1     Running   0          41h
my-risingwave-frontend-658c47cff4-nf44h   1/1     Running   0          41h
my-risingwave-meta-0                      1/1     Running   0          41h
 
         
        
        [ec2-user@ip-172-31-62-218 ~]$ kubectl -n risingwave port-forward svc/my-risingwave 4567:svc
Forwarding from 127.0.0.1:4567 -> 4567
Forwarding from [::1]:4567 -> 4567
[ec2-user@ip-172-31-62-218 ~]$ psql --version
psql (PostgreSQL) 15.6
[ec2-user@ip-172-31-62-218 ~]$ psql -h localhost -p 4567 -d dev -U root
psql (15.6, server 13.14.0)
Type "help" for help.
dev=>
 
         
       到这一步,说明RisingWave 集群在 EKS 上部署完成,我们进入下一阶段测试。
 
       使用MSK作为RisingWave 数据源
 
       建立托管 MSK 集群配置如下:
 
        
        
       这里需要注意,保证 MSK 集群和RisingWave 集群连通性,除了考虑 VPC 之外,还需要保证安全组配置正确。参考如下AWS 官方配置文档:
 
       https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/getting-started.html
 
       参考如下文档配置 Amazon MSK 连接:
 
       https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-password-tutorial.html
 
       使用psql 连接risingwave 集群,执行如下建表语句:
 
        
        CREATE TABLE IF NOT EXISTS jingamz (
cust_id VARCHAR,
month int,
expenses NUMERIC,
)
WITH (
connector = 'kafka', topic = 'risingwave',
properties.bootstrap.server = 'b-1.risingwave.XXXXXXX.c10.kafka.us-east-1.amazonaws.com:9096',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'sasl_ssl',
properties.sasl.username = 'jingamz',
properties.sasl.password = 'XXXXXXX'
) FORMAT PLAIN ENCODE JSON;
 
         
       从客户端发起向 MSK 写入数据,参考如下形式:
 
        
        [ec2-user@ip-172-31-62-218 risingwave]$ /home/ec2-user/kafka_2.13-3.5.1/bin/kafka-console-producer.sh --broker-list \
b-1.risingwave.zsob86.c10.kafka.us-east-1.amazonaws.com:9096 \
--producer.config /home/ec2-user/risingwave/client_sasl.properties \
--topic risingwave
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
>{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
>
 
         
       在RisingWave 集群中验证jingamz 表中的数据,如下:
 
        
        dev=> select * from jingamz;
cust_id | month | expenses
---------+-------+----------
1313131 |    12 |  1313.13
1313131 |    10 |   492.83
3535353 |    12 |    81.12
3535353 |    12 |    81.12
3535353 |    12 |    81.12
(5 rows)
dev=> select cust_id,sum(expenses) from jingamz group by cust_id;
cust_id |   sum
---------+---------
3535353 |  243.36
1313131 | 1805.96
(2 rows)
dev=>
 
         
       以上可见, 源端Kafka 集群实时写入5条数据,马上就在 Risingwave 集群中以关系数据库表的形式存储。 同时,RisingWave 的表支持 SQL 语句的聚合查询,可以实时统计数据流的统计数据。 整个过程支持标准的SQL,只要使用 psql 客户端联入运行即可,完全不需要用代码进行聚合计算统计,有使用SQL的经验即可,学习曲线极其平缓。
 
       使用AWS Kinesis 作为数据源
 
       AWS kinesis data stream 配置如下:
 
        
       RisingWave 建立读取 Kinesis的table 如下:
 
        
        dev=> CREATE TABLE IF NOT EXISTS jingamz_kinesis (
vendorId VARCHAR,
pickupDate VARCHAR,
dropoffDate VARCHAR,
passengerCount VARCHAR,
pickupLongitude VARCHAR,
pickupLatitude VARCHAR,
dropoffLongitude VARCHAR,
dropoffLatitude VARCHAR,
storeAndFwdFlag VARCHAR,
gcDistance VARCHAR,
tripDuration VARCHAR,
googleDistance VARCHAR,
googleDuration VARCHAR
)
WITH (
connector='kinesis',
stream='input-stream',
aws.region='us-east-1',
aws.credentials.access_key_id = 'XXXXXXXXXXXX',
aws.credentials.secret_access_key = 'XXXXXXXXXXX'
) FORMAT PLAIN ENCODE JSON;
CREATE_TABLE
dev=>
 
         
       客户端模拟 kinesis 数据写入如下:
 
        
        [ec2-user@ip-172-31-62-218 kinesis]$ ls
kinesis-data.py
[ec2-user@ip-172-31-62-218 kinesis]$ python3 kinesis-data.py
Total ingested:1,ReqID:ce856857-13d9-887b-9128-3544cac21ba3,HTTPStatusCode:200
Total ingested:2,ReqID:e0c75913-b67f-4f9c-bf6a-04076f64dc44,HTTPStatusCode:200
Total ingested:3,ReqID:d3182e43-1e86-f87d-8cb5-7357c79d6ba5,HTTPStatusCode:200
Total ingested:4,ReqID:f2ac8522-014b-1238-ad01-d836d85081e0,HTTPStatusCode:200
Total ingested:5,ReqID:c17228df-606c-8880-9edf-75cbb9771b58,HTTPStatusCode:200
Total ingested:6,ReqID:f72245c7-77ab-abdd-a88f-18d3aeb03805,HTTPStatusCode:200
Total ingested:7,ReqID:c50dfee1-7a55-b293-9aa0-a3f5a34e214b,HTTPStatusCode:200
Total ingested:8,ReqID:d22b283d-a317-1f11-8d86-75297a0c8cc9,HTTPStatusCode:200
Total ingested:9,ReqID:c8f020c9-7fad-b86a-975d-7ddda6b62bb2,HTTPStatusCode:200
 
         
        
        dev=> select count(*) from jingamz_kinesis;
count
-------
134
(1 row)
dev=> describe jingamz_kinesis;
Name        |       Type        | Is Hidden | Description
-------------------+-------------------+-----------+-------------
vendorid          | character varying | false     |
pickupdate        | character varying | false     |
dropoffdate       | character varying | false     |
passengercount    | character varying | false     |
pickuplongitude   | character varying | false     |
pickuplatitude    | character varying | false     |
dropofflongitude  | character varying | false     |
dropofflatitude   | character varying | false     |
storeandfwdflag   | character varying | false     |
gcdistance        | character varying | false     |
tripduration      | character varying | false     |
googledistance    | character varying | false     |
googleduration    | character varying | false     |
_row_id           | serial            | true      |
primary key       | _row_id           |           |
distribution key  | _row_id           |           |
table description | jingamz_kinesis   |           |
(17 rows)
dev=> select * from jingamz_kinesis limit 10;
vendorid |         pickupdate         |        dropoffdate         | passengercount | pickuplongitude | pickuplatitude | dropofflongitude |   dropofflatitude   | storeandfwdflag | gcdistance | tripduration | googledistance | googleduration
----------+----------------------------+----------------------------+----------------+-----------------+----------------+------------------+---------------------+-----------------+------------+--------------+----------------+----------------
2        | 2024-11-12T13:40:36.307084 | 2024-11-12T14:30:36.307091 | 9              | -73.98092651    | 40.74196243    | -73.99310303     | 40.75263214         | 0               | 5          | 8055         | 5              | 8055
2        | 2024-11-12T13:40:36.375318 | 2024-11-12T15:04:36.375323 | 1              | -73.98174286    | 40.71915817    | -73.98442078     | 40.74978638         | 0               | 2          | 9343         | 2              | 9343
2        | 2024-11-12T13:40:36.507280 | 2024-11-12T14:45:36.507287 | 9              | -74.00737       | 40.7431221     | -73.87303925     |         40.77410507 | 1               | 4          | 9750         | 4              | 9750
2        | 2024-11-12T13:40:36.707454 | 2024-11-12T14:10:36.707462 | 6              | -73.97222137    | 40.67683029    | -73.99495697     | 40.745121           | 1               | 3          | 2203         | 3              | 2203
2        | 2024-11-12T13:40:36.773733 | 2024-11-12T14:38:36.773740 | 9              | -73.98532867    | 40.74198914    | -73.98092651     | 40.74196243         | 1               | 5          | 9840         | 5              | 9840
1        | 2024-11-12T13:40:36.843111 | 2024-11-12T14:34:36.843116 | 7              | -73.9697876     | 40.75758362    | -74.00737        | 40.7431221          | 0               | 6          | 5326         | 6              | 5326
(10 rows)
 
         
       数据同步到 RisingWave 表中,可以根据表中数据进行聚合。
 
        
       Amazon RDS 之间进行CDC数据同步
 
       Amazon RDS 数据库设置如下:
 
        
       在RisingWave 集群中建立 RDS source,并建立基于Source 的Table 以及sink的目标,注意这里的RDS的源表是 Database jing 中的表 people, sink的目标是database jing中的表 people_rw 。这里为了测试方便 源表和目标表位于相同数据库中。 生产环境中可以根据实际配置进行调整。
 
        
        dev=> CREATE SOURCE mysql_jing WITH (
connector = 'mysql-cdc',
hostname = 'risingwave.cluster-XXXXXXXXXX.us-east-1.rds.amazonaws.com',
port = '3306',
username = 'admin',
password = 'XXXXXXX',
database.name = 'jing',
server.id = 970344889
);
CREATE_SOURCE
dev=> CREATE TABLE people_rw (
id int,
first_name CHARACTER VARYING,
last_name CHARACTER VARYING,
email CHARACTER VARYING,
zipcode int,
city CHARACTER VARYING,
country CHARACTER VARYING,
birthdate date,
added TIMESTAMPTZ,
PRIMARY KEY (id)
) FROM mysql_jing TABLE 'jing.people';
CREATE_TABLE
dev=> CREATE SINK s_mysql FROM people_rw WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://risingwave.cluster-XXXXXXXXX.us-east-1.rds.amazonaws.com:3306/jing?user=admin&password=XXXXXXXX',
table.name='people_rw',
type = 'upsert',
primary_key = 'id'
);
CREATE_SINK
 
         
       在客户端中模拟数据写入到原始数据表中:
 
        
        [ec2-user@ip-172-31-62-218 faker]$ ls
db_feeder_with_faker.py  db_feeder_with_faker_jingamz.py  db_feeder_with_faker_umu.py
[ec2-user@ip-172-31-62-218 faker]$ python3 db_feeder_with_faker_jingamz.py
Error creating table 1050 (42S01): Table 'people' already exists
iteration 10
iteration 20
iteration 30
iteration 40
iteration 50
^CTraceback (most recent call last):
File "/home/ec2-user/faker/db_feeder_with_faker_jingamz.py", line 63, in <module>
time.sleep(2)
KeyboardInterrupt
[ec2-user@ip-172-31-62-218 faker]$
 
         
       检测源表中的数据量,如下:
 
        
        Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use jing;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select count(*) from people;
+----------+
| count(*) |
+----------+
|      240 |
+----------+
1 row in set (0.06 sec)
mysql>
 
         
       检测 RisingWave 集群中people_rw 中的数据量,如下:
 
        
        dev=> select count(*) from people_rw;
count
-------
240
(1 row)
dev=>
 
         
       同时检查 目标 Amazon RDS 数据库中的数据量,如下:
 
        
        mysql> use jing;
Database changed
mysql> select count(*) from people_rw;
+----------+
| count(*) |
+----------+
|      240 |
+----------+
1 row in set (0.07 sec)
mysql>
 
         
       此时,源数据表,RisingWave 集群中的表,以及目标数据库表总数据一致,说明CDC 复制成功。
 
       继续在RisingWave table中进行数据的更改,如下语句:
 
        
        dev=> select * from people_rw where id=331;
id  | first_name | last_name |               email               | zipcode |   city    | country | birthdate  |           added
-----+------------+-----------+-----------------------------------+---------+-----------+---------+------------+---------------------------
331 | Edward     | Wood      | bonniepowell@armstrong-waters.net |    4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=> update people_rw set email='jingamz@amazon.com' where id = 331;
UPDATE 1
dev=> select * from people_rw where id=331;
id  | first_name | last_name |       email        | zipcode |   city    | country | birthdate  |           added
-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------------
331 | Edward     | Wood      | jingamz@amazon.com |    4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=>
 
         
       红体字部分是修改替代的邮箱信息,此时再次检查 目标数据库中对应数据的内容:
 
        
        mysql> select * from people_rw where id=331;
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| id  | first_name | last_name | email              | zipcode | city      | country | birthdate  | added               |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| 331 | Edward     | Wood      | jingamz@amazon.com |    4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06 |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
1 row in set (0.06 sec)
mysql>
 
         
       注意此时目标库中的数据已经随RisingWave 集群中的内容修改过来。 通过这种方式,能够很方便的在数据CDC同步过程中对内容进行修改。这是传统的CDC工具不能做到的。 同样的,类似的方式也可以进行流式处理的数据内容进行修改。
 
       结论以及补充
 
       通过以上测试步骤,可以清楚看到 RisingWave 作为云原生 分布式 SQL 流式数据库,除了能够简便部署在 Amazon EKS 服务上之外,还能很好的和 AWS 托管的RDS数据库,流式托管服务 MSK 以及 Kinesis 良好结合稳定运行。同时根据RisingWave 官网描述,其上下游还能集成包括Starrocks在内的多种流行分析数据库,能够为客户提供多种应用场景的适配,紧密结合业务。
 
       另外RisingWave 除了提供开源版本之外,还提供 Premium Edition,给到客户更丰富的功能以及更强大的支持服务,具体参见如下:
 
       https://docs.risingwave.com/docs/current/rw-premium-edition-intro/
 
       附录
 
       有关 RisingWave 流式数据库的具体介绍,可以参考官网链接:
 
       https://docs.risingwave.com/get-started/intro
 
       有关RisingWave 流式数据库支持的源/目标,可以参考官网链接:
 
       https://docs.risingwave.com/integrations/overview
 
       有关RisingWave 流式数据库和 Flink 功能以及性能的对比,可以参考官网链接:
 
       https://zh-cn.risingwave.com/docs/current/risingwave-flink-comparison/
 
        
       本篇作者