Exploring postgresql logical replication
PostgreSQL’s logical replication is a powerful feature that enables fine-grained replication of data from one PostgreSQL instance to another. It provides flexibility for use cases such as real-time data synchronization, migrations, and scaling out read workloads. In this blog, we’ll explore logical replication, set it up using Docker Compose, troubleshoot common issues, and verify the setup with a Python script.
What is logical replication?
Logical replication in PostgreSQL works by replicating data changes at the logical level, such as individual row changes, rather than at the physical block level (like streaming replication). It allows replicating specific tables or subsets of data and is well-suited for multi-tenant architectures or heterogeneous environments.
Logicla replication can be achieved using 2 high level operations
Creating Publication
A PostgreSQL publication is a database object that defines a set of changes to be shared with subscribers in a logical replication setup. Essentially, it is a configuration that specifies which tables (and optionally, which types of operations: INSERT, UPDATE, DELETE) will be replicated from a publisher database to one or more subscribers.
Creating Subscription
A PostgreSQL subscription is a database object used in logical replication to define how a subscriber database receives changes from a publication on a publisher database. It is the counterpart to a publication, allowing the subscriber to consume data changes and apply them locally.
Replication in action
Create database nodes
Here, we use docker compose to create a publisher and expose it on port 5432. Subscriber is exposed on port 5433.
|
|
From the above, command: postgres -c wal_level=logical -c max_replication_slots=5 -c max_wal_senders=5
should be of interest for us to understand more about logical replication.
Publisher Initialization
-- Create a replication role
CREATE ROLE replicator WITH REPLICATION PASSWORD 'replicator_pass' LOGIN;
-- Create the example table
CREATE TABLE example_table (
id SERIAL PRIMARY KEY,
data TEXT
);
-- Insert initial data
INSERT INTO example_table (data) VALUES ('Hello'), ('World');
-- Grant SELECT privileges to the replicator role
GRANT SELECT ON example_table TO replicator;
-- Create a publication
CREATE PUBLICATION my_publication FOR TABLE example_table;
Subscriber Initialization
-- Create the same table as the publisher
CREATE TABLE example_table (
id SERIAL PRIMARY KEY,
data TEXT
);
-- Create a subscription to the publisher
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=postgres-publisher port=5432 user=replicator password=replicator_pass dbname=publisher_db'
PUBLICATION my_publication;
Verify replication
import psycopg2
from psycopg2.extras import RealDictCursor
# Connection details for the publisher and subscriber
publisher_config = {
"host": "localhost",
"port": 5432,
"dbname": "publisher_db",
"user": "publisher_user",
"password": "publisher_pass"
}
subscriber_config = {
"host": "localhost",
"port": 5433,
"dbname": "subscriber_db",
"user": "subscriber_user",
"password": "subscriber_pass"
}
def fetch_data(connection_config, table_name):
"""Fetch data from a given table in the specified PostgreSQL instance."""
try:
with psycopg2.connect(**connection_config) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(f"SELECT * FROM {table_name};")
return cursor.fetchall()
except Exception as e:
print(f"Error connecting to {connection_config['dbname']}: {e}")
return []
def main():
publisher_table = "example_table"
subscriber_table = "example_table"
print("Fetching data from the publisher...")
publisher_data = fetch_data(publisher_config, publisher_table)
print(f"Publisher data ({len(publisher_data)} rows): {publisher_data}")
print("Fetching data from the subscriber...")
subscriber_data = fetch_data(subscriber_config, subscriber_table)
print(f"Subscriber data ({len(subscriber_data)} rows): {subscriber_data}")
# Verify data consistency
if publisher_data == subscriber_data:
print("\n✅ Data is replicating successfully!")
else:
print("\n❌ Data is not consistent between publisher and subscriber.")
if __name__ == "__main__":
main()