PostgreSQL CDC
Change Data Capture (CDC) allows you to track and propagate changes in a Postgres database to downstream consumers based on its Write-Ahead Log (WAL).
This guide shows you how to propagate CDC data directly from Postgres to Materialize using Postgres’ native replication protocol.
Database setup
Minimum requirements: PostgreSQL 10+
Before creating a source in Materialize, you need to ensure that the upstream database is configured to support logical replication.
As a superuser:
-
Check the
wal_level
configuration setting:SHOW wal_level;
The default value is
replica
. For CDC, you’ll need to set it tological
in the database configuration file (postgresql.conf
). Keep in mind that changing thewal_level
requires a restart of the Postgres instance and can affect database performance. -
Restart the database so all changes can take effect.
We recommend following the AWS RDS documentation for detailed information on logical replication configuration and best practices.
As a superuser (rds_superuser
):
-
Create a custom RDS parameter group and associate it with your instance. You will not be able to set custom parameters on the default RDS parameter groups.
-
In the custom RDS parameter group, set the
rds.logical_replication
static parameter to1
. -
Add the egress IP addresses associated with your Materialize region to the security group of the RDS instance. You can find these addresses by querying the
mz_egress_ips
table in Materialize. -
Restart the database so all changes can take effect.
We recommend following the AWS Aurora documentation for detailed information on logical replication configuration and best practices.
As a superuser:
-
Create a DB cluster parameter group for your instance using the following settings:
Set Parameter group family to your version of Aurora PostgreSQL.
Set Type to DB Cluster Parameter Group.
-
In the DB cluster parameter group, set the
rds.logical_replication
static parameter to1
. -
In the DB cluster parameter group, set reasonable values for
max_replication_slots
,max_wal_senders
,max_logical_replication_workers
, andmax_worker_processes parameters
based on your expected usage. -
Add the egress IP addresses associated with your Materialize region to the security group of the DB instance. You can find these addresses by querying the
mz_egress_ips
table in Materialize. -
Restart the database so all changes can take effect.
We recommend following the Azure DB for PostgreSQL documentation for detailed information on logical replication configuration and best practices.
-
In the Azure portal, or using the Azure CLI, enable logical replication for the PostgreSQL instance.
-
Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses under the “Connections security” menu. You can find these addresses by querying the
mz_egress_ips
table in Materialize. -
Restart the database so all changes can take effect.
We recommend following the Cloud SQL for PostgreSQL documentation for detailed information on logical replication configuration and best practices.
As a superuser (cloudsqlsuperuser
):
-
In the Google Cloud Console, enable logical replication by setting the
cloudsql.logical_decoding
configuration parameter toon
. -
Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses. You can find these addresses by querying the
mz_egress_ips
table in Materialize. -
Restart the database so all changes can take effect.
Once logical replication is enabled:
-
Grant the required privileges to the replication user:
ALTER ROLE "dbuser" WITH REPLICATION;
And ensure that this user has the right permissions on the tables you want to replicate:
GRANT CONNECT ON DATABASE dbname TO dbuser; GRANT USAGE ON SCHEMA schema TO dbuser; GRANT SELECT ON ALL TABLES IN SCHEMA schema TO dbuser;
Note:
SELECT
privileges on the tables you want to replicate are needed for the initial table sync. -
Set the replica identity to
FULL
for the tables you want to replicate:ALTER TABLE repl_table REPLICA IDENTITY FULL;
This setting determines the amount of information that is written to the WAL in
UPDATE
andDELETE
operations.As a heads-up, you should expect a performance hit in the database from increased CPU usage. For more information, see the PostgreSQL documentation.
-
Create a publication with the tables you want to replicate:
For specific tables:
CREATE PUBLICATION mz_source FOR TABLE table1, table2;
For all tables in Postgres:
CREATE PUBLICATION mz_source FOR ALL TABLES;
The
mz_source
publication will contain the set of change events generated from the specified tables, and will later be used to ingest the replication stream. We strongly recommend that you limit publications only to the tables you need.
Create a source
Postgres sources ingest the raw replication stream data for all tables included in a publication to avoid creating multiple replication slots and minimize the required bandwidth.
When you define a Postgres source, Materialize will automatically create a subsource for each original table in the publication (so you don’t have to!):
Create subsources for all tables included in the Postgres publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES
WITH (SIZE = '3xsmall');
Create subsources for all tables in specific schemas included in the Postgres publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR SCHEMAS (public, project)
WITH (SIZE = '3xsmall');
Create subsources for specific tables included in the Postgres publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR TABLES (table_1, table_2 AS alias_table_2)
WITH (SIZE = '3xsmall');
Create a materialized view
Any materialized view that depends on replication subsources will be incrementally updated as change events stream in, as a result of INSERT
, UPDATE
and DELETE
operations in the original Postgres database.
CREATE MATERIALIZED VIEW cnt_view1 AS
SELECT field1,
COUNT(*) AS cnt
FROM view1
GROUP BY field1;