AWS Database Blog

Data consolidation for analytical applications using logical replication for Amazon RDS Multi-AZ clusters

Amazon Relational Database Service (Amazon RDS) Multi-AZ deployments provide enhanced availability and durability for your RDS database instances. You can deploy highly available, durable PostgreSQL databases in three Availability Zones using Amazon RDS Multi-AZ DB cluster deployments with two readable standby DB instances. With a Multi-AZ DB cluster, applications gain automatic failovers in typically under 35 seconds, up to two times faster transaction commit latency compared to Amazon Multi-AZ DB instance deployments without compromising durability, and additional read capacity. In this post, we walk through a use case of data consolidation for analytical applications using logical replication in the context of RDS Multi-AZ DB clusters. We enable and utilize logical replication in an RDS for PostgreSQL Multi-AZ DB cluster and also demonstrate logical replication consumption recovery after a DB instance failover.

AWS announced support for PostgreSQL logical replication with Amazon RDS Multi-AZ DB cluster deployments. Amazon RDS for PostgreSQL supports streaming data changes using PostgreSQL’s logical replication. You can now set up logical replication to send data from your Amazon RDS for PostgreSQL Multi-AZ DB cluster to other data systems, or receive changes from other systems into your Amazon RDS for PostgreSQL Multi-AZ DB cluster. Logical replication is supported for RDS Multi-AZ DB clusters running Amazon RDS for PostgreSQL version 14.8-R2 and higher and 15.3-R2 and higher.

Logical Replication Overview

PostgreSQL offers two types of replication – physical replication and logical replication. Both methods serve the purpose of creating redundant copies of data to ensure high availability and fault tolerance. Physical replication is also known as streaming replication. In this method, the actual data files (segments) and changes made to them are replicated from the primary database server to one or more standby servers. The replication process occurs at the binary level, meaning the entire data blocks are copied, which makes it efficient for handling large databases with high write loads. However, even small changes to a large block will result in replicating the whole block. In the replication topology, the primary and standby servers must be of the same PostgreSQL major version for compatibility and cannot be used for partial replication or selective replication of specific tables or data subsets.

Logical replication, on the other hand, works at a higher level of abstraction. It replicates changes by understanding the logical structure of the data, which allows for more flexibility in terms of what is replicated and how it is consumed on the standby server. Logical replication gives you fine-grained control over both data replication and security. You can use logical replication between different platforms (for example, Linux to Windows) and different major versions of PostgreSQL, providing more flexibility during upgrades. You can also handle schema changes, such as table and column modifications, without much difficulty. Additionally, you can replicate individual rows or even specific columns, making it more efficient when dealing with selective replication or partial replication scenarios.

Logical replication operates on a publish and subscribe model, utilizing change records from the PostgreSQL write-ahead log (WAL). The source, known as the publisher, transmits these changes for the specified tables to one or more recipients (subscribers) replicating the changes and ensuring synchronization between the publisher and the subscriber. The publisher’s set of changes is identified through a publication, and subscribers access these changes by establishing a subscription that defines the connection to the publisher’s database and its publications. PostgreSQL uses a mechanism called a replication slot to track the progress of a subscription, such as what changes still need to be sent to a subscriber.

In Amazon RDS for PostgreSQL, users can manually create a logical replication slot on the writer DB instance of a Multi-AZ DB cluster. This offers more control and flexibility, and is useful for different change data capture (CDC) systems or troubleshooting existing setups. It is worth noting that the CREATE PUBLICATION/SUBSCRIPTION commands automatically set up the replication slots by default, which cover most use cases and simplifies the process. Manually creating the replication slot requires selecting a decoding plugin, which determines the format used to stream changes. RDS Multi-AZ DB clusters support three plugins: pgoutput (what PostgreSQL uses for logical replication), wal2json, and test_decoding (which is used for troubleshooting and is not recommended for production use) giving you the flexibility to capture changes in different formats for your CDC architectures and use cases.

Logical replication use cases

There are several common use cases for using logical replication with PostgreSQL, including:

  • Real-time data streaming and event-driven architectures –Using logical replication allows each change in data to be captured as a discrete event. For instance, changes such as insert, update, or delete statements in your database can each generate a distinct event. These events contain valuable information such as the modified data, changed values, timestamps, and the type of operation performed. This makes logical replication useful for implementing event-driven architectures, building data pipelines, and synchronizing data with other applications or services. To process the received events, you can subscribe to the replication stream and use messaging systems like Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or Amazon MQ. These messaging systems provide scalable and fault-tolerant event processing capabilities, allowing you to handle a large volume of events and distribute them to multiple consumers. Additionally, consider leveraging event-driven frameworks or systems that are optimized for efficient event processing. For instance, serverless computing services like AWS Lambda can automatically run your code in response to events, ensuring scalability and fault tolerance in your architecture.
  • Data distribution – Logical replication enables you to distribute data across multiple PostgreSQL instances or even different database systems. This is useful for scenarios where you need to share specific tables or subsets of data with other applications or databases. By using logical replication for data distribution, you can achieve better scalability, improved performance, data sharing, and integration across multiple environments and systems. Logical replication provides flexibility in choosing what data to replicate and where to replicate it, enabling you to design and implement a distributed architecture that meets your specific requirements.
  • Data integration and ETL – Logical replication can be used for data integration and extract, transform, load (ETL) processes. You can replicate selected tables or entire databases to downstream systems where data can be transformed, merged, or loaded into other systems. You could also use logical replication to consolidate multiple databases into a single one (for example, for Data Analytics).

Solution overview

In this example solution, we walk through a demonstration of enabling and utilizing logical replication in an RDS for PostgreSQL Multi-AZ DB cluster. We discuss the creation of a logical replication slot, the configuration of a Kinesis data stream, support for logical replication slots post-failover, and how to consume the replication slot data using a Python script. The following diagram illustrates our solution architecture.

Prerequisites

To implement logical replication for an RDS for PostgreSQL Multi-AZ DB cluster, you must have:

  1. An AWS Account.
  2. The AWS Command Line Interface (AWS CLI).
  3. A client running the PostgreSQL psql client and PgBench tool with Python installed.
  4. RDS for PostgreSQL Multi-AZ cluster does not support AWS free tier therefore consider costs for Amazon RDS pricing and AWS Kinesis Data Streams pricing.

Create a custom parameter group and enable logical replication

First, we create a DB cluster parameter group and enable logical replication and increase the number of logical replication workers.

# Create a DB cluster parameter group
aws rds create-db-cluster-parameter-group \
--db-cluster-parameter-group-name demo-pg-15-etl \
--db-parameter-group-family postgres15 \
--description "Demo RDS Multi-AZ Cluster ETL using logical replication" \ 
--region us-west-2

# Modify the demo DB cluster parameter group to enable logical replication and set the maximum logical replication workers to a value higher than your number of consumers
aws rds modify-db-cluster-parameter-group \
--db-cluster-parameter-group-name demo-pg-15-etl \
--parameters "ParameterName=max_logical_replication_workers,ParameterValue=10,ApplyMethod=pending-reboot" \
 "ParameterName=rds.logical_replication,ParameterValue=1,ApplyMethod=pending-reboot" \
--region us-west-2

For more information on how to perform this step using the console, see Modifying parameters in a DB cluster parameter group and Working with parameter groups for Multi-AZ DB clusters.

Create an RDS for PostgreSQL Multi-AZ cluster and DB cluster parameter group

Our next step is to create an RDS for PostgreSQL Multi-AZ cluster. To do this, we use the AWS Command Line interface (AWS CLI). We configure the new DB cluster to use the custom parameter group we created previously.

# Create a demo DB cluster
aws rds create-db-cluster \
--db-cluster-identifier demo-multi-az-cluster-etl-source \
--db-cluster-parameter-group-name demo-pg-15-etl \
--engine postgres \
--engine-version 15.3 \
--db-subnet-group-name default \
--master-user-password $PGPASSWORD \
--master-username $PGUSER \
--db-cluster-instance-class db.m6gd.large \
--allocated-storage 100 \
--storage-type io1 \
--iops 1000 \
--region us-west-2

For instructions on how to perform this step using the AWS Management Console, see Creating a Multi-AZ cluster

Create a Kinesis data stream

While the DB cluster is rebooting, we can create the Kinesis data stream, which will be the target for logical replication. Kinesis Data Streams allows you to ingest, process, and analyze real-time streaming data and can be used to stream data into other targets such as Amazon Redshift.

# Create Kinesis data stream
aws kinesis create-stream \
--stream-name etl_demo \
--shard-count 1 \
--region us-west-2

For instructions on how to perform this step using the console, see Creating a Stream via the AWS Management Console.

Create test tables and data

After we have configured the DB cluster, we need to create test data. In this example, we use pgbench to create dummy tables and data:

# Create some dummy tables and data using pgbench
pgbench -is 10 \
-h demo-multi-az-cluster-etl-source.cluster-abcd123dummy.us-west-2.rds.amazonaws.com \
-p 5432 \
-U $PGUSER \
-d $PGDATABASE

Create a role and replication slot

To enable logical replication, we must create a database role for replication and grant the role the necessary permissions. After the role is created, we create a logical replication slot on our source DB cluster. For this example, we use the wal2json decoding output.

# Connect to the source DB instance and create a role for replicaton
psql -h demo-multi-az-cluster-etl-source.cluster-abcd123dummy.us-west-2.rds.amazonaws.com \
-p 5432 \
-U $PGUSER \
-d $PGDATABASE

postgres=> CREATE USER repluser;
CREATE ROLE
postgres=> GRANT rds_replication to repluser;
GRANT ROLE
postgres=> \password repluser
Enter new password for user "repluser":
Enter it again:

# Create logical replication slot on source DB cluster
SELECT * FROM pg_create_logical_replication_slot('etl_slot_wal2json','wal2json');

For more information, see Create user and Logical Decoding.

Consume the replication stream

Finally, we consume the logical replication stream using the following Python script. The script connects to the DB cluster endpoint, starts consuming the replication stream, and forwards each record to the Kinesis data stream. The script also prints the payload to the standard output. The script has a simple retry mechanism to support reconnecting to the DB cluster endpoint if there is an interruption.

To run this script, you need a client with network access to the RDS for PostgreSQL Multi-AZ DB cluster with Python and the PostgreSQL client tools installed.

$ pip3 install boto3 psycopg2-binary
$ cat consume_slot.py
import boto3
import json
import time
import pdb
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
import os

my_slot_name = 'etl_slot_wal2json'
my_stream_name = 'etl_demo'
kinesis_client = boto3.client('kinesis', region_name='us-west-2')

host = os.getenv('PGHOST')
user = os.getenv('PGUSER')
port = os.getenv('PGPORT')
password = os.getenv('PGPASSWORD')
database = os.getenv('PGDATABASE')

def consume(msg):
    kinesis_client.put_record(StreamName=my_stream_name, Data=json.dumps(msg.payload), PartitionKey="default")
    print(msg.payload)

def consume_replication_stream():
    my_connection = psycopg2.connect(
            f"dbname={database} host={host} port={port} user={user} password={password}",
        connection_factory=LogicalReplicationConnection)
    cur = my_connection.cursor()
    cur.drop_replication_slot(my_slot_name)
    cur.create_replication_slot(my_slot_name, output_plugin='wal2json')
    cur.start_replication(slot_name=my_slot_name, options={'pretty-print': 1}, decode=True)
    print(f'Successfully connected to slot {my_stream_name}')
    cur.consume_stream(consume)

def main():
    retries = 10
    for i in range(retries):
        try:
            consume_replication_stream()
            break  # Success! So we break out the loop
        except Exception as e:
            print(f"Error occurred: {e}. Retrying...")
            time.sleep(30)  # Wait for 30 seconds before retrying
            if i == retries - 1:  # If we've retried up to the maximum retries allowed, raise the exception to be handled by the next level
                raise

if __name__ == '__main__':
    main()

Run the script:

$ python3 consume_slot.py
Successfully connected to slot etl_demo

Start a DML workload

To demonstrate that data is streaming, we need to run a DML workload. In this example, we use pgbench to run the built-in tpcb-like workload for 300 seconds (5 minutes):

pgbench -T300 \ 
-h demo-multi-az-cluster-etl-source.cluster-abcd123dummy.us-west-2.rds.amazonaws.com \
-p 5432 \
-U $PGUSER \
-d $PGDATABASE \

After the workload begins, you should see the stream payload being processed in the terminal standard output:

$ python3 consume_slot.py
Successfully connected to slot etl_demo
{
"change": [
]
}
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "pgbench_accounts",
"columnnames": ["aid", "bid", "abalance", "filler"],
"columntypes": ["integer", "integer", "integer", "character(84)"],
"columnvalues": [674192, 7, 2261, " "],
"oldkeys": {
"keynames": ["aid"],
"keytypes": ["integer"],
"keyvalues": [674192]
}
}

You can also view the payload via the Kinesis Data Viewer:

For more information on how to perform this step using the console, see Using Data Viewer in the Kinesis Console.

Demonstrate DB cluster failover

Now that we have validated that the logical replication stream is being consumed successfully, we can introduce a failover of the DB cluster to demonstrate that the replication slot is copied to the read replica nodes in the DB cluster and that post-failover consumption of the stream can continue.

Prior to failover, let’s review the slot configuration:

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
---------------------------------------------+----------+-----------+--------+----------+-----------+--------+------------+--------+--------------+-------------+---------------------+------------+---------------+-----------
rds_us_west_2_db_knjbk2urhjnkn636iixihmgksq | | physical | | | f | t | 25517 | 338176 | 7874 | 1/B4000000 | | reserved | | f
etl_slot_wal2json | wal2json | logical | 5 | postgres | f | t | | | 308164 | 1/A8210E68 | 1/A8210EA0 | reserved | | f
rds_us_west_2_db_by3onqaew4kwmrlfn3zjpuigbq | | physical | | | f | t | 28390 | 338172 | 7874 | 1/B4000000 | | reserved | | f
(3 rows)

While our sample workload is running and being consumed, we initiate a failover:

# Failover DB cluster
aws rds failover-db-cluster \ 
--db-cluster-identifier demo-multi-az-cluster-etl-source \ 
--region us-west-2 

For further information on how to perform this step using the console, see Failover process for Multi-AZ DB Clusters.

When the failover is complete, we can validate that the logical replication stream reconnects and continues to consume the data:

{
"kind": "insert",
"schema": "public",
"table": "pgbench_history",
"columnnames": ["tid", "bid", "aid", "delta", "mtime", "filler"],
"columntypes": ["integer", "integer", "integer", "integer", "timestamp without time zone", "character(22)"],
"columnvalues": [71, 9, 175688, 3147, "2023-07-04 08:20:06.966371", null]
}
]
}
Error occurred: terminating connection due to administrator command
. Retrying...
Error occurred: connection to server at "demo-multi-az-cluster-etl-source.cluster-abcd123dummy.us-west-2.rds.amazonaws.com" (35.163.140.202), port 5432 failed: Connection timed out
Is the server running on that host and accepting TCP/IP connections?
. Retrying...
Successfully connected to slot etl_demo
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "pgbench_accounts",
"columnnames": ["aid", "bid", "abalance", "filler"],
"columntypes": ["integer", "integer", "integer", "character(84)"],
"columnvalues": [17309, 1, 2376, " "],
"oldkeys": {
"keynames": ["aid"],
"keytypes": ["integer"],
"keyvalues": [17309]
}
}

We can also validate the replication slot is still available after the failover:

psql -h demo-multi-az-cluster-etl-source.cluster-abcd123dummy.us-west-2.rds.amazonaws.com \
-p 5432 \
-U $PGUSER \
-d $PGDATABASE \
-c "SELECT * FROM pg_replication_slots;"

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
---------------------------------------------+----------+-----------+--------+----------+-----------+--------+------------+--------+--------------+-------------+---------------------+------------+---------------+-----------
rds_us_west_2_db_knjbk2urhjnkn636iixihmgksq | | physical | | | f | t | 904 | 271194 | 7874 | 1/94000000 | | reserved | | f
rds_us_west_2_db_7jhd2ofzzow66dzv2xpsxpanfq | | physical | | | f | t | 3468 | 271202 | 7874 | 1/94000000 | | reserved | | f
etl_slot_wal2json | wal2json | logical | 5 | postgres | f | f | | | 269331 | 1/900ABF88 | 1/900ABFC0 | reserved | | f
(3 rows)

Cleanup

Don’t forget to delete any unnecessary resources and drop unused replication slots. To drop unused replication slots, connect to the publisher and run the following SQL command:

SELECT pg_drop_replication_slot(slot_name)FROM pg_replication_slots
WHERE slot_name IN (SELECT slot_name FROM pg_replication_slots where active = 'f');

For further information on the pg_drop_replication_slot function, see Replication Management Functions.

You can use the following AWS CLI command to delete the RDS for PostgreSQL Multi-AZ DB cluster with an optional DB snapshot.

aws rds delete-db-cluster \
--db-cluster-identifier demo-multi-az-cluster-etl-source \
--final-db-snapshot-identifier demo-multi-az-cluster-etl-source \
--region us-west-2

For further information on how to perform this step using the console, see Deleting a Multi-AZ DB cluster.

You can use the following AWS CLI command to delete the RDS for PostgreSQL Multi-AZ DB cluster parameter group.

aws rds delete-db-cluster-parameter-group \
--db-cluster-parameter-group-name demo-pg-15-etl \
--region us-west-2

You can use the following AWS CLI command to delete the Amazon Kinesis Data Stream.

aws kinesis delete-stream \
--stream-name etl_demo \
--region us-west-2

Best practices

When enabling logical replication, there are a number of best practices you should consider:

  • Be aware of unsupported SQL commands – Logical replication is based on the logical changes made to the data. As a result, it doesn’t support automatically replicating schema changes such as ALTER TABLE and some other DDL changes. For further information on limitations of PostgreSQL replication see the next section, Current limitations in PostgreSQL logical replication.
  • Clean up unused replication slots – If a replication slot is no longer needed, you should drop the replication slot as soon as possible to prevent accumulation of WAL files. An unused replication slot can prevent vacuum from cleaning up dead tuples and resetting the XID age horizon.
  • Monitor disk space usage – Logical replication slots retain WAL files until they are consumed by all subscribers. This can lead to significant disk space usage if a consumer falls behind or stops consuming changes. For information on monitoring Amazon RDS using Amazon CloudWatch see, Overview of Amazon RDS and Amazon CloudWatch.
  • Monitor replication lag – It’s important to monitor the logical replication lag. High replication lag can lead to delayed updates to the consumer and increased disk space usage on the source. Monitoring logical replication lag is the responsibility of the user, for further information on gathering replication status information see, pg_stat_replication.
  • Monitor the number of logical replication consumers – As you increase the number of replication consumers, you need to ensure that the number of consumers doesn’t exceed the configured value of max_logical_replication_workers.

Current limitations in PostgreSQL logical replication

Bear in mind that logical replication has certain restrictions or limitations:

  • Schema changes – The database schema and DDL commands are not replicated. The initial schema can be copied by hand using pg_dump --schema-only. Subsequent schema changes would need to be kept in sync manually. As a best practice, schema changes should be committed first by the subscriber, then by the publisher.
  • Sequence data – Though logical replication replicates sequence data in serial or identity columns, in the event of switchover or failover to the subscriber database, you must update the sequences to the latest values.
  • Large objects Large objects are not replicated. It’s also easier to store a reference to a large object on some external storage like Amazon Simple Storage Service (Amazon S3) and replicate that reference rather than storing and replicating the object itself.
  • Truncate – Replication of TRUNCATE commands is supported, but some care must be taken when truncating groups of tables connected by foreign keys. A workaround of TRUNCATE could be DELETE. To avoid accidental TRUNCATE operations, you can REVOKE TRUNCATE privileges from tables.
  • Partitioned tables – Logical replication treats partitioned tables as regular tables. Logical replication doesn’t work at the base table level, but at the child table level. If you’re using partitions, the partition hierarchy must be the same on both sides of a logical replication setup.
  • Foreign tables, views, and materialized views – Replication is only supported by tables, including partitioned tables. Attempts to replicate other types of relations, such as views, materialized views, or foreign tables, will result in an error.

There is ongoing work in the open-source PostgreSQL developer community to address these limitations.

Conclusion

In this post, we provided an overview of PostgreSQL logical replication, how to enable and use logical replication with RDS for PostgreSQL Multi-AZ DB clusters, how to consume logical replication to an Amazon Kinesis Data Stream in the context of data consolidation (ETL) use cases and best practices and limitations for PostgreSQL logical replication.

RDS Multi-AZ cluster deployments offer enhanced availability and durability for production database workloads. The introduction of PostgreSQL logical replication with RDS Multi-AZ clusters enables granular replication and synchronization of individual tables, providing flexibility and efficiency. As we discussed, this feature supports various use cases, such as real-time data streaming, event-driven architectures, smooth database upgrades, data distribution, data integration and ETL processes, and more. Incorporating logical replication with RDS Multi-AZ clusters can bring significant benefits to your PostgreSQL database workloads. To enhance availability, durability, and data synchronization capabilities for your applications, it’s time to explore logical replication with your RDS Multi-AZ cluster topology.

We encourage you to learn more by building a solution using the sample implementation provided in this post and a dataset relevant to your business. If you have questions or suggestions, leave a comment.


About the Authors

Sean Massey is a Database Engineer and a Subject Matter Expert in Amazon RDS, specializing in PostgreSQL. Sean’s relentless dedication to customer success is reflected in his track record of resolving critical issues in Amazon Aurora and Amazon RDS for PostgreSQL database systems.

Shayon Sanyal is a Principal Database Specialist Solutions Architect and specializes in the Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL open-source database engines. In his current role, he spends his time working with customers to help design scalable, secure, and robust cloud-native architectures.

Vijay Karumajji is a Principal Database Solutions Architect with Amazon Web Services. He works with our customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.