如何使用逻辑复制在 PostgreSQL 的 RDS 中复制表?

上次更新日期:2022 年 7 月 12 日

我想在 PostgreSQL 的 Amazon Relational Database Service (Amazon RDS) 中的数据库之间复制表,而不使用任何扩展。

解决方法

逻辑复制的一个典型用例是在两个 Amazon RDS for PostgreSQL 数据库实例之间复制一组表。PostgreSQL 的 RDS 支持 PostgreSQL 10.4 及更高版本的逻辑复制。Amazon Aurora PostgreSQL 兼容版 2.2.0 及更高版本支持 PostgreSQL 10.6 及更高版本的逻辑复制。

在提供的解决方案中,使用 PostgreSQL 的 RDS 中的逻辑复制将两个源表复制到两个目标表。逻辑复制首先对源表中已经存在的数据执行初始加载,然后继续复制正在进行的更改。

开启逻辑复制

若要在 RDS 中为 PostgreSQL 打开逻辑复制,请修改自定义参数组,将 rds.logical_replication 设置为 1,并将 rds.logical_replication 附加到数据库实例。如果自定义参数组附加到数据库实例,请更新参数组,将 rds.logical_replication 设置为 1rds.logical_replication 参数是一个静态参数,需要重启数据库实例才能生效。当数据库实例重启时,wal_level 参数设置为逻辑

验证 wal_levelrds.logical_replication 的值:

postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
          name           | setting
-------------------------+---------
 rds.logical_replication | on
 wal_level               | logical
(2 rows)

连接到源数据库实例中的源数据库

连接到 PostgreSQL 数据库实例的源 RDS 中的源数据库。创建源表:

source=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
source=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE

将数据插入到源表中:

source=> INSERT INTO reptab1 VALUES (generate_series(1,1000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR ('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50

为源表创建出版物

为源表创建出版物。使用 SELECT 查询验证所创建的出版物的详细信息:

source=> CREATE PUBLICATION testpub FOR TABLE reptab1,reptab2;
CREATE PUBLICATION
source=> SELECT * FROM pg_publication;
  oid   | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
--------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
 115069 | testpub |    16395 | f            | t         | t         | t         | t           | f
(1 row)

验证源表是否已添加到出版物中:

source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
 testpub | public     | reptab1
 testpub | public     | reptab2
(2 rows)

注意:要复制数据库中的所有表,请运行以下命令:

CREATE PUBLICATION testpub FOR ALL TABLES;

连接到目标数据库并创建目标表

连接到目标数据库实例中的目标数据库。创建与源表同名的目标表。通过对目标表运行 SELECT 查询,确保目标表中没有数据:

target=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
target=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE
target=> SELECT count(*) FROM reptab1;
 count
-------
     0
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
     0
(1 row)

在目标数据库中创建并验证订阅

在目标数据库中创建订阅。使用 SELECT 查询验证订阅是否已启用:

target=> CREATE SUBSCRIPTION testsub CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' PUBLICATION testpub;
NOTICE:  Created replication slot "testsub" on publisher
CREATE SUBSCRIPTION
target=> SELECT oid,subname,subenabled,subslotname,subpublications FROM pg_subscription;
  oid  | subname | subenabled | subslotname | subpublications
-------+---------+------------+-------------+-----------------
 16434 | testsub | t          | testsub     | {testpub}
(1 row)

重要信息:为避免在数据库日志中存储用户名和密码的纯文本版本,请在创建订阅之前运行以下命令:

target=> SET log_min_messages to 'PANIC';
SET
target=> SET log_statement to NONE;
SET

创建订阅后,订阅会将源表中存在的所有数据加载到目标表中。对目标表运行 SELECT 查询,以验证初始数据是否已加载:

target=> SELECT count(*) FROM reptab1;
 count
-------
  1000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
    50
(1 row)

验证源数据库中的复制槽

在目标数据库中创建订阅将在源数据库中创建复制槽。通过在源数据库上运行以下 SELECT 查询,验证复制槽的详细信息:

source=> SELECT * FROM pg_replication_slots;
 slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
 ----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
 testsub   | pgoutput | logical   | 115048 | source   | f         | t      |        846 |      |         6945 | 58/B4000568 | 58/B40005A0         | reserved   |
(1 row)

测试从源表复制

通过在源表中插入行,测试是否正在将源表中的数据更改复制到目标表:

source=> INSERT INTO reptab1 VALUES(generate_series(1001,2000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50
source=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
source=> SELECT count(*) FROM reptab2; count
-------
   100
(1 row)

通过验证目标表中的行数来测试复制

验证目标表中的行数,以确认是否正在将新插入内容复制到目标表中:

target=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
   100
(1 row)

清理并关闭逻辑复制

当复制完成且不再需要时,请清理并关闭逻辑复制。不活动的复制槽会导致源数据库实例上的 WAL 文件累积。WAL 文件可能会占满存储空间并导致中断。

删除目标数据库上的订阅:

target=> DROP SUBSCRIPTION testsub;
NOTICE:  Dropped replication slot "testsub" on publisher
DROP SUBSCRIPTION
target=> SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications
----+---------+---------+----------+------------+-------------+-------------+---------------+-----------------
(0 rows)

注意:删除订阅也会删除订阅创建的复制槽。

通过在源数据库上运行以下 SELECT 查询语句,验证复制插槽是否已从源数据库中删除:

source=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------
(0 rows)

删除该出版物。验证出版物是否已成功删除:

source=> DROP PUBLICATION testpub;
DROP PUBLICATION
source=> SELECT * FROM pg_publication;
 oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-----+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
(0 rows)
source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
(0 rows)

在附加到数据库实例的自定义参数组中,将 rds.logical_replication 改为 0。如果数据库实例未使用逻辑复制,则根据需要重启该数据库实例。

根据您的使用情况查看 max_replication_slotsmax_wal_sendersmax_logical_replication_workersmax_worker_processesmax_sync_workers_per_subscription

注意:以下命令检查是否存在任何非活动的复制槽,确定槽的各自大小,然后根据需要删除槽。

SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag, active FROM pg_replication_slots ;
SELECT pg_drop_replication_slot('Your_slotname_name');

用于 Replication(复制)的 PostgreSQL 文档

这篇文章对您有帮助吗?


您是否需要账单或技术支持?