CREATE SOURCE: PostgreSQL
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
Materialize supports PostgreSQL (11+) as a data source. To connect to a PostgreSQL instance, you first need to create a connection that specifies access and authentication parameters. Once created, a connection is reusable across multiple CREATE SOURCE
statements.
Syntax
Field | Use |
---|---|
src_name | The name for the source. |
IF NOT EXISTS | Do nothing (except issuing a notice) if a source with the same name already exists. Default. |
IN CLUSTER cluster_name | The cluster to maintain this source. If not specified, the SIZE option must be specified. |
CONNECTION connection_name | The name of the PostgreSQL connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page. |
FOR ALL TABLES | Create subsources for all tables in the publication. |
FOR SCHEMAS ( schema_list ) | Create subsources for specific schemas in the publication. |
FOR TABLES ( table_list ) | Create subsources for specific tables in the publication. |
EXPOSE PROGRESS AS progress_subsource_name | The name of the progress collection for the source. If this is not specified, the progress collection will be named <src_name>_progress . For more information, see Monitoring source progress. |
CONNECTION
options
Field | Value | Description |
---|---|---|
PUBLICATION |
text |
Required. The PostgreSQL publication (the replication data set containing the tables to be streamed to Materialize). |
TEXT COLUMNS |
A list of names | Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. |
WITH
options
Field | Value | Description |
---|---|---|
SIZE |
text |
The size for the source. Accepts values: 3xsmall , 2xsmall , xsmall , small , medium , large , xlarge . Required if the IN CLUSTER option is not specified. |
Features
Change data capture
This source uses PostgreSQL’s native replication protocol to continually ingest changes resulting from INSERT
, UPDATE
and DELETE
operations in the upstream database — a process also known as change data capture.
For this reason, you must configure the upstream PostgreSQL database to support logical replication before creating a source in Materialize. Follow the step-by-step instructions in relevant integration guide to get logical replication set up: Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted.
Creating a source
To avoid creating multiple replication slots in the upstream PostgreSQL database and minimize the required bandwidth, Materialize ingests the raw replication stream data for some specific set of tables in your publication.
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES
WITH (SIZE = '3xsmall');
When you define a source, Materialize will automatically:
-
Create a replication slot in the upstream PostgreSQL database (see PostgreSQL replication slots).
The name of the replication slot created by Materialize is prefixed with
materialize_
for easy identification, and can be looked up inmz_internal.mz_postgres_sources
.SELECT * FROM mz_internal.mz_postgres_sources;
id | replication_slot --------+---------------------------------------------- u8 | materialize_7f8a72d0bf2a4b6e9ebc4e61ba769b71
-
Create a subsource for each original table in the publication.
SHOW SOURCES;
name | type | size ----------------------+-----------+--------- mz_source | postgres | 3xsmall mz_source_progress | progress | table_1 | subsource | table_2 | subsource |
And perform an initial, snapshot-based sync of the tables in the publication before it starts ingesting change events.
-
Incrementally update any materialized or indexed views that depend on the source as change events stream in, as a result of
INSERT
,UPDATE
andDELETE
operations in the upstream PostgreSQL database.
It’s important to note that the schema metadata is captured when the source is initially created, and is validated against the upstream schema upon restart. If you wish to add additional tables to the original publication and use them in Materialize, the source must be dropped and recreated.
PostgreSQL replication slots
Each source ingests the raw replication stream data for all tables in the specified publication using a single replication slot. This allows you to minimize the performance impact on the upstream database, as well as reuse the same source across multiple materializations.
mz_internal.mz_postgres_sources
.
If you delete all objects that depend on a source without also dropping the source, the upstream replication slot will linger and continue to accumulate data so that the source can resume in the future. To avoid unbounded disk space usage, make sure to use DROP SOURCE
or manually delete the replication slot.
For PostgreSQL 13+, it is recommended that you set a reasonable value for max_slot_wal_keep_size
to limit the amount of storage used by replication slots.
PostgreSQL schemas
CREATE SOURCE
will attempt to create each upstream table in the current schema. This may lead to naming collisions if, for example, you are replicating schema1.table_1
and schema2.table_1
. Use the FOR TABLES
clause to provide aliases for each upstream table, in such cases, or to specify an alternative destination schema in Materialize.
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR TABLES (schema1.table_1 AS s1_table_1, schema2_table_1 AS s2_table_1)
WITH (SIZE = '3xsmall');
Monitoring source progress
By default, PostgreSQL sources expose progress metadata as a subsource that you
can use to monitor source ingestion progress. The name of the progress
subsource can be specified when creating a source using the EXPOSE PROGRESS AS
clause; otherwise, it will be named <src_name>_progress
.
The following metadata is available for each source as a progress subsource:
Field | Type | Meaning |
---|---|---|
lsn |
uint8 |
The last Log Sequence Number (LSN) consumed from the upstream PostgreSQL replication stream. |
And can be queried using:
SELECT lsn
FROM <src_name>_progress;
The reported LSN should increase as Materialize consumes new WAL records from the upstream PostgreSQL database. For more details on monitoring source ingestion progress and debugging related issues, see Troubleshooting.
Known limitations
Schema changes
Materialize supports schema changes in the upstream PostgreSQL database as follows:
Compatible schema changes
- Adding columns to tables. Materialize will not ingest these columns unless
you use
ALTER SOURCE...{DROP | ADD} SUBSOURCE
to first drop the affected subsource, and then add the table back to the source. - Dropping columns that were added after the source was created. These columns are never ingested, so you can drop them without issue.
- Adding or removing
NOT NULL
constraints to tables that were nullable when the source was created.
Incompatible schema changes
All other schema changes to tables in the publication will set the corresponding subsource into an error state, which prevents you from reading from the subsource(s).
To handle incompatible schema changes, use ALTER SOURCE... {DROP | ADD} SUBSOURCE
to first drop the affected
subsource, and then add the table back to the source. When you add the
subsource, it will have the updated schema from the corresponding upstream
table.
Supported types
Replicating tables that contain data types unsupported in Materialize is possible via the TEXT COLUMNS
option. The specified columns will be treated as text
, and will thus not offer the expected PostgreSQL type features. For example:
enum
: the implicit ordering of the original PostgreSQLenum
type is not preserved, as Materialize will sort values astext
.money
: the resultingtext
value cannot be cast back to e.g.numeric
, since PostgreSQL adds typical currency formatting to the output.
Truncation
Tables replicated into Materialize should not be truncated. If a table is truncated while replicated, the whole source becomes inaccessible and will not produce any data until it is recreated.
Instead, remove all rows from a table using an unqualified DELETE
.
DELETE FROM t;
Inherited tables
When using PostgreSQL table
inheritance,
PostgreSQL serves data from SELECT
s as if the inheriting tables' data is also
present in the inherited table. However, both PostgreSQL’s logical replication
and COPY
only present data written to the tables themselves, i.e. the
inheriting data is not treated as part of the inherited table.
PostgreSQL sources use logical replication and COPY
to ingest table data, so
inheriting tables' data will only be ingested as part of the inheriting table,
i.e. in Materialize, the data will not be returned when serving SELECT
s from
the inherited table.
You can mimic PostgreSQL’s SELECT
behavior with inherited tables by creating a
materialized view that UNION
s data from the inherited and inheriting tables,
though there are many caveats:
- Materialized views are maintained in arrangements, which moves the tables' data from being stored in relatively low-cost storage into being stored in memory.
- If new tables inherit from the table, data from the inheriting tables will
not be available in the view. You will need to add the inheriting tables via
ADD SUBSOURCE
and create a new view that unions the new table.
Examples
Creating a connection
A connection describes how to connect and authenticate to an external system you want Materialize to read data from.
Once created, a connection is reusable across multiple CREATE SOURCE
statements. For more details on creating connections, check the CREATE CONNECTION
documentation page.
CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
USER 'postgres',
PASSWORD SECRET pgpass,
SSL MODE 'require',
DATABASE 'postgres'
);
If your PostgreSQL server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host.
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
USER 'postgres',
PASSWORD SECRET pgpass,
AWS PRIVATELINK privatelink_svc,
DATABASE 'postgres'
);
For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST 'bastion-host',
PORT 22,
USER 'materialize',
);
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 5432,
SSH TUNNEL ssh_connection,
DATABASE 'postgres'
);
For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.
Creating a source
Create subsources for all tables included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES
WITH (SIZE = '3xsmall');
Create subsources for all tables from specific schemas included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR SCHEMAS (public, project)
WITH (SIZE = '3xsmall');
Create subsources for specific tables included in the PostgreSQL publication
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR TABLES (table_1, table_2 AS alias_table_2)
WITH (SIZE = '3xsmall');
Handling unsupported types
If the publication contains tables that use data types unsupported by Materialize, use the TEXT COLUMNS
option to decode data as text
for the affected columns.
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (
PUBLICATION 'mz_source',
TEXT COLUMNS (table.column_of_unsupported_type)
) FOR ALL TABLES
WITH (SIZE = '3xsmall');
Adding/dropping tables to/from a source
To handle upstream schema changes, use the ALTER SOURCE...DROP SUBSOURCE
syntax to drop the affected subsource, and then ALTER SOURCE...ADD SUBSOURCE
to add the subsource back to the source.
-- List all subsources in mz_source
SHOW SUBSOURCES ON mz_source;
-- Get rid of an outdated or errored subsource
ALTER SOURCE mz_source DROP SUBSOURCE table_1;
-- Start ingesting the table with the updated schema or fix
ALTER SOURCE mz_source ADD SUBSOURCE table_1;
Sizing a source
To provision a specific amount of CPU and memory to a source on creation, use the SIZE
option:
CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
WITH (SIZE = '3xsmall');
To resize the source after creation:
ALTER SOURCE mz_source SET (SIZE = 'large');
The smallest source size (3xsmall
) is a resonable default to get started. For more details on sizing sources, check the CREATE SOURCE
documentation page.
Related pages
CREATE SECRET
CREATE CONNECTION
CREATE SOURCE
- PostgreSQL integration guides: