如何使用逻辑复制在 PostgreSQL 的 RDS 中复制表?
上次更新日期:2022 年 7 月 12 日
我想在 PostgreSQL 的 Amazon Relational Database Service (Amazon RDS) 中的数据库之间复制表,而不使用任何扩展。
解决方法
逻辑复制的一个典型用例是在两个 Amazon RDS for PostgreSQL 数据库实例之间复制一组表。PostgreSQL 的 RDS 支持 PostgreSQL 10.4 及更高版本的逻辑复制。Amazon Aurora PostgreSQL 兼容版 2.2.0 及更高版本支持 PostgreSQL 10.6 及更高版本的逻辑复制。
在提供的解决方案中,使用 PostgreSQL 的 RDS 中的逻辑复制将两个源表复制到两个目标表。逻辑复制首先对源表中已经存在的数据执行初始加载,然后继续复制正在进行的更改。
开启逻辑复制
若要在 RDS 中为 PostgreSQL 打开逻辑复制,请修改自定义参数组,将 rds.logical_replication 设置为 1,并将 rds.logical_replication 附加到数据库实例。如果自定义参数组附加到数据库实例,请更新参数组,将 rds.logical_replication 设置为 1。rds.logical_replication 参数是一个静态参数,需要重启数据库实例才能生效。当数据库实例重启时,wal_level 参数设置为逻辑。
验证 wal_level 和 rds.logical_replication 的值:
postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name | setting
-------------------------+---------
rds.logical_replication | on
wal_level | logical
(2 rows)
连接到源数据库实例中的源数据库
连接到 PostgreSQL 数据库实例的源 RDS 中的源数据库。创建源表:
source=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
source=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE
将数据插入到源表中:
source=> INSERT INTO reptab1 VALUES (generate_series(1,1000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR ('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50
为源表创建出版物
为源表创建出版物。使用 SELECT 查询验证所创建的出版物的详细信息:
source=> CREATE PUBLICATION testpub FOR TABLE reptab1,reptab2;
CREATE PUBLICATION
source=> SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
--------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
115069 | testpub | 16395 | f | t | t | t | t | f
(1 row)
验证源表是否已添加到出版物中:
source=> SELECT * FROM pg_publication_tables;
pubname | schemaname | tablename
---------+------------+-----------
testpub | public | reptab1
testpub | public | reptab2
(2 rows)
注意:要复制数据库中的所有表,请运行以下命令:
CREATE PUBLICATION testpub FOR ALL TABLES;
连接到目标数据库并创建目标表
连接到目标数据库实例中的目标数据库。创建与源表同名的目标表。通过对目标表运行 SELECT 查询,确保目标表中没有数据:
target=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
target=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE
target=> SELECT count(*) FROM reptab1;
count
-------
0
(1 row)
target=> SELECT count(*) FROM reptab2;
count
-------
0
(1 row)
在目标数据库中创建并验证订阅
在目标数据库中创建订阅。使用 SELECT 查询验证订阅是否已启用:
target=> CREATE SUBSCRIPTION testsub CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' PUBLICATION testpub;
NOTICE: Created replication slot "testsub" on publisher
CREATE SUBSCRIPTION
target=> SELECT oid,subname,subenabled,subslotname,subpublications FROM pg_subscription;
oid | subname | subenabled | subslotname | subpublications
-------+---------+------------+-------------+-----------------
16434 | testsub | t | testsub | {testpub}
(1 row)
重要信息:为避免在数据库日志中存储用户名和密码的纯文本版本,请在创建订阅之前运行以下命令:
target=> SET log_min_messages to 'PANIC';
SET
target=> SET log_statement to NONE;
SET
创建订阅后,订阅会将源表中存在的所有数据加载到目标表中。对目标表运行 SELECT 查询,以验证初始数据是否已加载:
target=> SELECT count(*) FROM reptab1;
count
-------
1000
(1 row)
target=> SELECT count(*) FROM reptab2;
count
-------
50
(1 row)
验证源数据库中的复制槽
在目标数据库中创建订阅将在源数据库中创建复制槽。通过在源数据库上运行以下 SELECT 查询,验证复制槽的详细信息:
source=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
testsub | pgoutput | logical | 115048 | source | f | t | 846 | | 6945 | 58/B4000568 | 58/B40005A0 | reserved |
(1 row)
测试从源表复制
通过在源表中插入行,测试是否正在将源表中的数据更改复制到目标表:
source=> INSERT INTO reptab1 VALUES(generate_series(1001,2000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50
source=> SELECT count(*) FROM reptab1;
count
-------
2000
(1 row)
source=> SELECT count(*) FROM reptab2; count
-------
100
(1 row)
通过验证目标表中的行数来测试复制
验证目标表中的行数,以确认是否正在将新插入内容复制到目标表中:
target=> SELECT count(*) FROM reptab1;
count
-------
2000
(1 row)
target=> SELECT count(*) FROM reptab2;
count
-------
100
(1 row)
清理并关闭逻辑复制
当复制完成且不再需要时,请清理并关闭逻辑复制。不活动的复制槽会导致源数据库实例上的 WAL 文件累积。WAL 文件可能会占满存储空间并导致中断。
删除目标数据库上的订阅:
target=> DROP SUBSCRIPTION testsub;
NOTICE: Dropped replication slot "testsub" on publisher
DROP SUBSCRIPTION
target=> SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications
----+---------+---------+----------+------------+-------------+-------------+---------------+-----------------
(0 rows)
注意:删除订阅也会删除订阅创建的复制槽。
通过在源数据库上运行以下 SELECT 查询语句,验证复制插槽是否已从源数据库中删除:
source=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------
(0 rows)
删除该出版物。验证出版物是否已成功删除:
source=> DROP PUBLICATION testpub;
DROP PUBLICATION
source=> SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-----+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
(0 rows)
source=> SELECT * FROM pg_publication_tables;
pubname | schemaname | tablename
---------+------------+-----------
(0 rows)
在附加到数据库实例的自定义参数组中,将 rds.logical_replication 改为 0。如果数据库实例未使用逻辑复制,则根据需要重启该数据库实例。
根据您的使用情况查看 max_replication_slots、max_wal_senders、max_logical_replication_workers、max_worker_processes 和 max_sync_workers_per_subscription。
注意:以下命令检查是否存在任何非活动的复制槽,确定槽的各自大小,然后根据需要删除槽。
SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag, active FROM pg_replication_slots ;
SELECT pg_drop_replication_slot('Your_slotname_name');
相关信息
用于 Replication(复制)的 PostgreSQL 文档