CREATE SOURCE: PostgreSQL

WARNING! Before creating a PostgreSQL source, you must set up logical replication in the upstream database. For step-by-step instructions, see the PostgreSQL CDC guide.

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

To connect to a PostgreSQL instance, you first need to create a connection that specifies access and authentication parameters. Once created, a connection is reusable across multiple CREATE SOURCE statements.

Syntax

CREATE SOURCE IF NOT EXISTS src_name IN CLUSTER cluster_name FROM POSTGRES CONNECTION connection_name ( PUBLICATION publication_name , TEXT COLUMNS ( column_name , ) ) FOR ALL TABLES FOR TABLES ( table_name AS subsrc_name , ) WITH ( field = val , )
Field Use
src_name The name for the source.
IF NOT EXISTS Do nothing (except issuing a notice) if a source with the same name already exists. Default.
IN CLUSTER cluster_name The cluster to maintain this source. If not specified, the SIZE option must be specified.
CONNECTION connection_name The name of the PostgreSQL connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page.
FOR ALL TABLES Create subsources for all tables in the publication.
FOR TABLES ( table_list ) Create subsources for specific tables in the publication.

CONNECTION options

Field Value Description
PUBLICATION text Required. The PostgreSQL publication (the replication data set containing the tables to be streamed to Materialize).
TEXT COLUMNS A list of names Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize.

WITH options

Field Value Description
SIZE text The size for the source. Accepts values: 3xsmall, 2xsmall, xsmall, small, medium, large, xlarge. Required if the IN CLUSTER option is not specified.

Features

Change data capture

This source uses PostgreSQL’s native replication protocol to continually ingest changes resulting from INSERT, UPDATE and DELETE operations in the upstream database — a process also known as change data capture.

For this reason, you must configure the upstream PostgreSQL database to support logical replication before creating a source in Materialize. Follow the step-by-step instructions in the PostgreSQL CDC guide to get logical replication set up.

Creating a source

To avoid creating multiple replication slots in the upstream PostgreSQL database and minimize the required bandwidth, Materialize ingests the raw replication stream data for either all tables (FOR ALL TABLES) or a specified subset of tables (FOR TABLES) included in a specific publication.

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
  FOR ALL TABLES
  WITH (SIZE = '3xsmall');

When you define a source, Materialize will automatically:

  1. Create a replication slot in the upstream PostgreSQL database (see PostgreSQL replication slots).

    The name of the replication slot created by Materialize is prefixed with materialize_ for easy identification, and can be looked up in mz_internal.mz_postgres_sources.

    SELECT * FROM mz_internal.mz_postgres_sources;
    
       id   |             replication_slot
    --------+----------------------------------------------
     u8     | materialize_7f8a72d0bf2a4b6e9ebc4e61ba769b71
    
  2. Create a subsource for each original table in the publication.

    SHOW SOURCES;
    
             name         |   type    |  size
    ----------------------+-----------+---------
     table_1              | subsource |
     table_2              | subsource |
     mz_source            | postgres  | 3xsmall
    

    And perform an initial, snapshot-based sync of the tables in the publication before it starts ingesting change events.

  3. Incrementally update any materialized or indexed views that depend on the source as change events stream in, as a result of INSERT, UPDATE and DELETE operations in the upstream PostgreSQL database.

It’s important to note that the schema metadata is captured when the source is initially created, and is validated against the upstream schema upon restart. If you wish to add additional tables to the original publication and use them in Materialize, the source must be dropped and recreated.

PostgreSQL replication slots

Each source ingests the raw replication stream data for all tables in the specified publication using a single replication slot. This allows you to minimize the performance impact on the upstream database, as well as reuse the same source across multiple materializations.

WARNING! Make sure to delete any replication slots if you stop using Materialize, or if either the Materialize or PostgreSQL instances crash. To look up the name of the replication slot created for each source, use mz_internal.mz_postgres_sources.

If you delete all objects that depend on a source without also dropping the source, the upstream replication slot will linger and continue to accumulate data so that the source can resume in the future. To avoid unbounded disk space usage, make sure to use DROP SOURCE or manually delete the replication slot.

For PostgreSQL 13+, it is recommended that you set a reasonable value for max_slot_wal_keep_size to limit the amount of storage used by replication slots.

PostgreSQL schemas

CREATE SOURCE will attempt to create each upstream table in the current schema. This may lead to naming collisions if, for example, you are replicating schema1.table_1 and schema2.table_1. Use the FOR TABLES clause to provide aliases for each upstream table, in such cases, or to specify an alternative destination schema in Materialize.

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
  FOR TABLES (schema1.table_1 AS s1_table_1, schema2_table_1 AS s2_table_1)
  WITH (SIZE = '3xsmall');

Known limitations

Schema changes

Materialize does not support changes to schemas for existing publications, and will set the source into an error state if a breaking DDL change is detected upstream. To handle schema changes, you need to drop the existing sources and then recreate them after creating new publications for the updated schemas.

Supported types

Replicating tables that contain data types unsupported in Materialize is possible via the TEXT COLUMNS option. The specified columns will be treated as text, and will thus not offer the expected PostgreSQL type features. For example:

  • enum: the implicit ordering of the original PostgreSQL enum type is not preserved, as Materialize will sort values as text.
  • money: the resulting text value cannot be cast back to e.g. numeric, since PostgreSQL adds typical currency formatting to the output.
Truncation

Tables replicated into Materialize should not be truncated. If a table is truncated while replicated, the whole source becomes inaccessible and will not produce any data until it is recreated. Instead, remove all rows from a table using an unqualified DELETE.

DELETE FROM t;

Examples

WARNING! Before creating a PostgreSQL source, you must set up logical replication in the upstream database. For step-by-step instructions, see the PostgreSQL CDC guide.

Creating a connection

A connection describes how to connect and authenticate to an external system you want Materialize to read data from.

Once created, a connection is reusable across multiple CREATE SOURCE statements. For more details on creating connections, check the CREATE CONNECTION documentation page.

CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    USER 'postgres',
    PASSWORD SECRET pgpass,
    SSL MODE 'require',
    DATABASE 'postgres'
);

If your PostgreSQL server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    USER 'postgres',
    PASSWORD SECRET pgpass,
    AWS PRIVATELINK privatelink_svc,
    DATABASE 'postgres'
);
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize',
);
CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    SSH TUNNEL ssh_connection,
    DATABASE 'postgres'
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check out this guide.

Creating a source

Create subsources for all tables included in the PostgreSQL publication

CREATE SOURCE mz_source
    FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
    FOR ALL TABLES
    WITH (SIZE = '3xsmall');

Create subsources for specific tables included in the PostgreSQL publication

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
  FOR TABLES (table_1, table_2 AS alias_table_2)
  WITH (SIZE = '3xsmall');

Handling unsupported types

If the publication contains tables that use data types unsupported by Materialize, use the TEXT COLUMNS option to decode data as text for the affected columns.

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (
    PUBLICATION 'mz_source',
    TEXT COLUMNS (table.column_of_unsupported_type)
  ) FOR ALL TABLES
  WITH (SIZE = '3xsmall');

Sizing a source

To provision a specific amount of CPU and memory to a source on creation, use the SIZE option:

CREATE SOURCE mz_source
  FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
  WITH (SIZE = '3xsmall');

To resize the source after creation:

ALTER SOURCE mz_source SET (SIZE = 'large');

The smallest source size (3xsmall) is a resonable default to get started. For more details on sizing sources, check the CREATE SOURCE documentation page.

Back to top ↑