PostgreSQL CDC

Change Data Capture (CDC) allows you to track and propagate changes in a Postgres database to downstream consumers based on its Write-Ahead Log (WAL). In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time materialized views on top of CDC data.

There are two ways to connect Materialize to a Postgres database for CDC:

Direct Postgres source

If Kafka is not part of your stack, you can use the Postgres source to connect directly to Materialize. This source uses Postgres’ native replication protocol to continually ingest changes resulting from INSERT, UPDATE and DELETE operations in the upstream database.

Database setup

Minimum requirements: PostgreSQL 10+

Before creating a source in Materialize, you need to ensure that the upstream database is configured to support logical replication.

As a superuser:

  1. Check the wal_level configuration setting:

    SHOW wal_level;
    

    The default value is replica. For CDC, you’ll need to set it to logical in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.

  2. Restart the database so all changes can take effect.

We recommend following the AWS RDS documentation for detailed information on logical replication configuration and best practices.

As a superuser (rds_superuser):

  1. Create a custom RDS parameter group and associate it with your instance. You will not be able to set custom parameters on the default RDS parameter groups.

  2. In the custom RDS parameter group, set the rds.logical_replication static parameter to 1.

  3. Add the egress IP addresses associated with your Materialize region to the security group of the RDS instance. You can find these addresses by querying the mz_egress_ips table in Materialize.

  4. Restart the database so all changes can take effect.

NOTE: Aurora Serverless (v1) does not support logical replication, so it’s not possible to use this service with Materialize.

We recommend following the AWS Aurora documentation for detailed information on logical replication configuration and best practices.

As a superuser:

  1. Create a DB cluster parameter group for your instance using the following settings:

    Set Parameter group family to your version of Aurora PostgreSQL.

    Set Type to DB Cluster Parameter Group.

  2. In the DB cluster parameter group, set the rds.logical_replication static parameter to 1.

  3. In the DB cluster parameter group, set reasonable values for max_replication_slots, max_wal_senders, max_logical_replication_workers, and max_worker_processes parameters based on your expected usage.

  4. Add the egress IP addresses associated with your Materialize region to the security group of the DB instance. You can find these addresses by querying the mz_egress_ips table in Materialize.

  5. Restart the database so all changes can take effect.

We recommend following the Azure DB for PostgreSQL documentation for detailed information on logical replication configuration and best practices.

  1. In the Azure portal, or using the Azure CLI, enable logical replication for the PostgreSQL instance.

  2. Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses under the “Connections security” menu. You can find these addresses by querying the mz_egress_ips table in Materialize.

  3. Restart the database so all changes can take effect.

We recommend following the Cloud SQL for PostgreSQL documentation for detailed information on logical replication configuration and best practices.

As a superuser (cloudsqlsuperuser):

  1. In the Google Cloud Console, enable logical replication by setting the cloudsql.logical_decoding configuration parameter to on.

  2. Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses. You can find these addresses by querying the mz_egress_ips table in Materialize.

  3. Restart the database so all changes can take effect.

Once logical replication is enabled:

  1. Grant the required privileges to the replication user:

    ALTER ROLE "dbuser" WITH REPLICATION;
    

    And ensure that this user has the right permissions on the tables you want to replicate:

    GRANT CONNECT ON DATABASE dbname TO dbuser;
    GRANT USAGE ON SCHEMA schema TO dbuser;
    GRANT SELECT ON ALL TABLES IN SCHEMA schema TO dbuser;
    

    Note: SELECT privileges on the tables you want to replicate are needed for the initial table sync.

  2. Set the replica identity to FULL for the tables you want to replicate:

    ALTER TABLE repl_table REPLICA IDENTITY FULL;
    

    This setting determines the amount of information that is written to the WAL in UPDATE and DELETE operations.

    As a heads-up, you should expect a performance hit in the database from increased CPU usage. For more information, see the PostgreSQL documentation.

  3. Create a publication with the tables you want to replicate:

    For specific tables:

    CREATE PUBLICATION mz_source FOR TABLE table1, table2;
    

    For all tables in Postgres:

    CREATE PUBLICATION mz_source FOR ALL TABLES;
    

    The mz_source publication will contain the set of change events generated from the specified tables, and will later be used to ingest the replication stream. We strongly recommend that you limit publications only to the tables you need.

Create a source

Postgres sources ingest the raw replication stream data for all tables included in a publication to avoid creating multiple replication slots and minimize the required bandwidth.

When you define a Postgres source, Materialize will automatically create a subsource for each original table in the publication (so you don’t have to!):

Create subsources for all tables included in the Postgres publication

CREATE SOURCE mz_source
    FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
    FOR ALL TABLES
    WITH (SIZE = '3xsmall');

Create subsources for specific tables included in the Postgres publication

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
  FOR TABLES (table_1, table_2 AS alias_table_2)
  WITH (SIZE = '3xsmall');
NOTE: Materialize performs an initial sync of all tables in the publication before it starts ingesting change events.

Create a materialized view

Any materialized view that depends on replication subsources will be incrementally updated as change events stream in, as a result of INSERT, UPDATE and DELETE operations in the original Postgres database.

CREATE MATERIALIZED VIEW cnt_view1 AS
    SELECT field1,
           COUNT(*) AS cnt
    FROM view1
    GROUP BY field1;

Kafka + Debezium

If Kafka is part of your stack, you can use Debezium and the Kafka source to propagate CDC data from Postgres to Materialize. Debezium captures row-level changes resulting from INSERT, UPDATE and DELETE operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.

Database setup

Minimum requirements: PostgreSQL 10+

Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support logical replication.

As a superuser:

  1. Check the wal_level configuration setting:

    SHOW wal_level;
    

    The default value is replica. For CDC, you’ll need to set it to logical in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.

  2. Restart the database so all changes can take effect.

We recommend following the AWS RDS documentation for detailed information on logical replication configuration and best practices.

As a superuser (rds_superuser):

  1. Create a custom RDS parameter group and associate it with your instance. You will not be able to set custom parameters on the default RDS parameter groups.

  2. In the custom RDS parameter group, set the rds.logical_replication static parameter to 1.

  3. Add the egress IP addresses associated with your Materialize region to the security group of the RDS instance. You can find these addresses by querying the mz_egress_ips table in Materialize.

  4. Restart the database so all changes can take effect.

NOTE: Aurora Serverless (v1) does not support logical replication, so it’s not possible to use this service with Materialize.

We recommend following the AWS Aurora documentation for detailed information on logical replication configuration and best practices.

As a superuser:

  1. Create a DB cluster parameter group for your instance using the following settings:

    Set Parameter group family to your version of Aurora PostgreSQL.

    Set Type to DB Cluster Parameter Group.

  2. In the DB cluster parameter group, set the rds.logical_replication static parameter to 1.

  3. In the DB cluster parameter group, set reasonable values for max_replication_slots, max_wal_senders, max_logical_replication_workers, and max_worker_processes parameters based on your expected usage.

  4. Add the egress IP addresses associated with your Materialize region to the security group of the DB instance. You can find these addresses by querying the mz_egress_ips table in Materialize.

  5. Restart the database so all changes can take effect.

We recommend following the Azure DB for PostgreSQL documentation for detailed information on logical replication configuration and best practices.

  1. In the Azure portal, or using the Azure CLI, enable logical replication for the PostgreSQL instance.

  2. Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses under the “Connections security” menu. You can find these addresses by querying the mz_egress_ips table in Materialize.

  3. Restart the database so all changes can take effect.

We recommend following the Cloud SQL for PostgreSQL documentation for detailed information on logical replication configuration and best practices.

As a superuser (cloudsqlsuperuser):

  1. In the Google Cloud Console, enable logical replication by setting the cloudsql.logical_decoding configuration parameter to on.

  2. Add the egress IP addresses associated with your Materialize region to the list of allowed IP addresses. You can find these addresses by querying the mz_egress_ips table in Materialize.

  3. Restart the database so all changes can take effect.

Once logical replication is enabled:

  1. Grant enough privileges to ensure Debezium can operate in the database. The specific privileges will depend on how much control you want to give to the replication user, so we recommend following the Debezium documentation.

  2. If a table that you want to replicate has a primary key defined, you can use your default replica identity value. If a table you want to replicate has no primary key defined, you must set the replica identity value to FULL:

    ALTER TABLE repl_table REPLICA IDENTITY FULL;
    

    This setting determines the amount of information that is written to the WAL in UPDATE and DELETE operations. Setting it to FULL will include the previous values of all the table’s columns in the change events.

    As a heads up, you should expect a performance hit in the database from increased CPU usage. For more information, see the PostgreSQL documentation.

Deploy Debezium

Minimum requirements: Debezium 1.5+

Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a Postgres connector configuration and then start the connector by adding it to Kafka Connect.

WARNING! If you deploy the PostgreSQL Debezium connector in Confluent Cloud, you must override the default value of After-state only to false.
  1. Create a connector configuration file and save it as register-postgres.json:

    {
        "name": "your-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "plugin.name":"pgoutput",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "postgres",
            "database.password": "postgres",
            "database.dbname" : "postgres",
            "database.server.name": "pg_repl",
            "table.include.list": "public.table1",
            "publication.autocreate.mode":"filtered",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schemas.enable": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named serverName.schemaName.tableName.

  2. Start the Debezium Postgres connector using the configuration file:

    export CURRENT_HOST='<your-host>'
    
    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
    http://$CURRENT_HOST:8083/connectors/ -d @register-postgres.json
    
  3. Check that the connector is running:

    curl http://$CURRENT_HOST:8083/connectors/your-connector/status
    

    The first time it connects to a Postgres server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:

    /usr/bin/kafka-avro-console-consumer \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --topic pg_repl.public.table1
    

Create a source

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you're interested in JSON support, please reach out in the community Slack or leave a comment in this GitHub issue.

Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before and after values for each record. To create a source that interprets the Debezium envelope in Materialize:

CREATE SOURCE kafka_repl
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'pg_repl.public.table1')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE DEBEZIUM
    WITH (SIZE = '3xsmall');

This allows you to replicate tables with REPLICA IDENTITY DEFAULT, INDEX, or FULL.

Transaction support

Debezium provides transaction metadata that can be used to preserve transactional boundaries downstream. We are working on using this topic to support transaction-aware processing in Materialize (#7537)!

Create a materialized view

Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT, UPDATE and DELETE operations in the original Postgres database.

CREATE MATERIALIZED VIEW cnt_table1 AS
    SELECT field1,
           COUNT(*) AS cnt
    FROM kafka_repl
    GROUP BY field1;
Back to top ↑