Ingest data from Amazon RDS
This page shows you how to stream data from Amazon RDS for PostgreSQL to Materialize using the PostgreSQL source.
Before you begin
- 
Make sure you are running PostgreSQL 11 or higher. 
- 
Make sure you have access to your PostgreSQL instance via psql, or your preferred SQL client.
A. Configure Amazon RDS
1. Enable logical replication
Materialize uses PostgreSQL’s logical replication protocol to track changes in your database and propagate them to Materialize.
As a first step, you need to make sure logical replication is enabled.
- 
As a user with the rds_superuserrole, usepsql(or your preferred SQL client) to connect to your database.
- 
Check if logical replication is enabled: SELECT name, setting FROM pg_settings WHERE name = 'rds.logical_replication';name | setting -------------------------+--------- rds.logical_replication | off (1 row)- 
If logical replication is off, continue to the next step. 
- 
If logical replication is already on, skip to Create a publication and a Materialize user section. 
 
- 
- 
Using the AWS Management Console, create a DB parameter group in RDS. - Set Parameter group family to your PostgreSQL version.
- Set Type to DB Parameter Group.
- Set Engine type to PostgreSQL.
 
- 
Edit the new parameter group and set the rds.logical_replicationparameter to1.
- 
Associate the DB parameter group with your database. Use the Apply Immediately option to immediately reboot your database and apply the change. Keep in mind that rebooting the RDS instance can affect database performance. Do not move on to the next step until the database Status is Available in the RDS Console. 
- 
Back in the SQL client connected to PostgreSQL, verify that replication is now enabled: SELECT name, setting FROM pg_settings WHERE name = 'rds.logical_replication';name | setting -------------------------+--------- rds.logical_replication | on (1 row)If replication is still not enabled, reboot the database. 
2. Create a publication and a replication user
Once logical replication is enabled, create a publication with the tables that you want to replicate to Materialize. You’ll also need a user for Materialize with sufficient privileges to manage replication.
- 
As a superuser, use psql(or your preferred SQL client) to connect to your database.
- 
For each table that you want to replicate to Materialize, set the replica identity to FULL:ALTER TABLE <table1> REPLICA IDENTITY FULL;ALTER TABLE <table2> REPLICA IDENTITY FULL;REPLICA IDENTITY FULLensures that the replication stream includes the previous data of changed rows, in the case ofUPDATEandDELETEoperations. This setting enables Materialize to ingest PostgreSQL data with minimal in-memory state. However, you should expect increased disk usage in your PostgreSQL database.
- 
Create a publication with the tables you want to replicate: For specific tables: CREATE PUBLICATION mz_source FOR TABLE <table1>, <table2>;For all tables in the database: CREATE PUBLICATION mz_source FOR ALL TABLES;The mz_sourcepublication will contain the set of change events generated from the specified tables, and will later be used to ingest the replication stream.Be sure to include only the tables you need. If the publication includes additional tables, Materialize will waste resources on ingesting and then immediately discarding the data. 
- 
Create a user for Materialize, if you don’t already have one: CREATE USER materialize PASSWORD '<password>';
- 
Grant the user permission to manage replication: GRANT rds_replication TO materialize;
- 
Grant the user the required permissions on the tables you want to replicate: GRANT CONNECT ON DATABASE <dbname> TO materialize;GRANT USAGE ON SCHEMA <schema> TO materialize;GRANT SELECT ON <table1> TO materialize;GRANT SELECT ON <table2> TO materialize;Once connected to your database, Materialize will take an initial snapshot of the tables in your publication. SELECTprivileges are required for this initial snapshot.If you expect to add tables to your publication, you can grant SELECTon all tables in the schema instead of naming the specific tables:GRANT SELECT ON ALL TABLES IN SCHEMA <schema> TO materialize;
B. Configure network security
Configure your network to allow Materialize to connect to your database. For example, you can:
- 
Allow Materialize IPs: Configure your database’s security group to allow connections from Materialize. 
- 
Use an SSH tunnel: Use an SSH tunnel to connect Materialize to the database. 
The steps to allow Materialize to connect to your database depends on your deployment setup. Refer to your company’s network/security policies and procedures.
- 
In the AWS Management Console, add an inbound rule to your RDS security group to allow traffic from Materialize IPs. In each rule: - Set Type to PostgreSQL.
- Set Source to the IP address in CIDR notation.
 
To create an SSH tunnel from Materialize to your database, you launch an instance to serve as an SSH bastion host, configure the bastion host to allow traffic only from Materialize, and then configure your database’s private network to allow traffic from the bastion host.
- 
Launch an EC2 instance to serve as your SSH bastion host. - Make sure the instance is publicly accessible and in the same VPC as your RDS instance.
- Add a key pair and note the username. You’ll use this username when connecting Materialize to your bastion host.
 Warning: Auto-assigned public IP addresses can change in certain cases. For this reason, it’s best to associate an elastic IP address to your bastion host. 
- 
Configure the SSH bastion host to allow traffic only from Materialize. 
- 
In the security group of your RDS instance, add an inbound rule to allow traffic from the SSH bastion host. - Set Type to All TCP.
- Set Source to Custom and select the bastion host’s security group.
 
C. Ingest data in Materialize
1. (Optional) Create a cluster
quickstart), you can skip this step. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to clients, or a combination.
In this case, you’ll create a dedicated cluster for ingesting source data from your PostgreSQL database.
- 
In the SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CLUSTERcommand to create the new cluster:CREATE CLUSTER ingest_postgres (SIZE = '200cc'); SET CLUSTER = ingest_postgres;A cluster of size 200ccshould be enough to process the initial snapshot of the tables in your publication. For very large snapshots, consider using a larger size to speed up processing. Once the snapshot is finished, you can readjust the size of the cluster to fit the volume of changes being replicated from your upstream PostgeSQL database.
2. Start ingesting data
Now that you’ve configured your database network and created an ingestion cluster, you can connect Materialize to your PostgreSQL database and start ingesting data. The exact steps depend on your networking configuration, so start by selecting the relevant option.
- 
In Materialize console’s SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE SECRETcommand to securely store the password for thematerializePostgreSQL user you created earlier:CREATE SECRET pgpass AS '<PASSWORD>';
- 
Use the CREATE CONNECTIONcommand to create a connection object with access and authentication details for Materialize to use:CREATE CONNECTION pg_connection TO POSTGRES ( HOST '<host>', PORT 5432, USER 'materialize', PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE '<database>' );- 
Replace <host>with your RDS endpoint. To find your RDS endpoint, select your database in the RDS Console, and look under Connectivity & security.
- 
Replace <database>with the name of the database containing the tables you want to replicate to Materialize.
 
- 
- 
Use the CREATE SOURCEcommand to connect Materialize to your RDS instance and start ingesting data from the publication you created earlier:CREATE SOURCE mz_source IN CLUSTER ingest_postgres FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source') FOR ALL TABLES;By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTERclause. To ingest data from specific schemas or tables in your publication, useFOR SCHEMAS (<schema1>,<schema2>)orFOR TABLES (<table1>, <table2>)instead ofFOR ALL TABLES.
- 
After source creation, you can handle upstream schema changes for specific replicated tables using the ALTER SOURCE...ADD SUBSOURCEandDROP SOURCEsyntax.
- 
In the Materialize console’s SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CONNECTIONcommand to create an SSH tunnel connection:CREATE CONNECTION ssh_connection TO SSH TUNNEL ( HOST '<SSH_BASTION_HOST>', PORT <SSH_BASTION_PORT>, USER '<SSH_BASTION_USER>' );- 
Replace <SSH_BASTION_HOST>and<SSH_BASTION_PORT> with the public IP address and port of the SSH bastion host you created earlier.
- 
Replace <SSH_BASTION_USER>with the username for the key pair you created for your SSH bastion host.
 
- 
- 
Get Materialize’s public keys for the SSH tunnel connection you just created: SELECT mz_connections.name, mz_ssh_tunnel_connections.* FROM mz_connections JOIN mz_ssh_tunnel_connections USING(id) WHERE mz_connections.name = 'ssh_connection';
- 
Log in to your SSH bastion host and add Materialize’s public keys to the authorized_keysfile, for example:# Command for Linux echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
- 
Back in the SQL client connected to Materialize, validate the SSH tunnel connection you created using the VALIDATE CONNECTIONcommand:VALIDATE CONNECTION ssh_connection;If no validation error is returned, move to the next step. 
- 
Use the CREATE SECRETcommand to securely store the password for thematerializePostgreSQL user you created earlier:CREATE SECRET pgpass AS '<PASSWORD>';
- 
Use the CREATE CONNECTIONcommand to create another connection object, this time with database access and authentication details for Materialize to use:CREATE CONNECTION pg_connection TO POSTGRES ( HOST '<host>', PORT 5432, USER 'materialize', PASSWORD SECRET pgpass, DATABASE '<database>', SSH TUNNEL ssh_connection );- 
Replace <host>with your RDS endpoint. To find your RDS endpoint, select your database in the RDS Console, and look under Connectivity & security.
- 
Replace <database>with the name of the database containing the tables you want to replicate to Materialize.
 
- 
- 
Use the CREATE SOURCEcommand to connect Materialize to your RDS instance and start ingesting data from the publication you created earlier:CREATE SOURCE mz_source IN CLUSTER ingest_postgres FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source') FOR ALL TABLES;By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTERclause. To ingest data from specific schemas or tables in your publication, useFOR SCHEMAS (<schema1>,<schema2>)orFOR TABLES (<table1>, <table2>)instead ofFOR ALL TABLES.
3. Monitor the ingestion status
Before it starts consuming the replication stream, Materialize takes a snapshot of the relevant tables in your publication. Until this snapshot is complete, Materialize won’t have the same view of your data as your PostgreSQL database.
In this step, you’ll first verify that the source is running and then check the status of the snapshotting process.
- 
Back in the SQL client connected to Materialize, use the mz_source_statusestable to check the overall status of your source:WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT * FROM mz_internal.mz_source_statuses JOIN ( SELECT referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id FROM source_ids ) AS sources ON mz_source_statuses.id = sources.referenced_object_id;For each subsource, make sure thestatusisrunning. If you seestalledorfailed, there’s likely a configuration issue for you to fix. Check theerrorfield for details and fix the issue before moving on. Also, if thestatusof any subsource isstartingfor more than a few minutes, contact our team.
- 
Once the source is running, use the mz_source_statisticstable to check the status of the initial snapshot:WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT sources.referenced_object_id AS id, mz_sources.name, snapshot_committed FROM mz_internal.mz_source_statistics JOIN ( SELECT object_id, referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id, id FROM source_ids ) AS sources ON mz_source_statistics.id = sources.referenced_object_id JOIN mz_sources ON mz_sources.id = sources.referenced_object_id;object_id | snapshot_committed ----------|------------------ u144 | t (1 row)Once snapshot_commitedist, move on to the next step. Snapshotting can take between a few minutes to several hours, depending on the size of your dataset and the size of the cluster the source is running in.
4. Right-size the cluster
After the snapshotting phase, Materialize starts ingesting change events from
the PostgreSQL replication stream. For this work, Materialize generally
performs well with an 100cc replica, so you can resize the cluster
accordingly.
- 
Still in a SQL client connected to Materialize, use the ALTER CLUSTERcommand to downsize the cluster to100cc:ALTER CLUSTER ingest_postgres SET (SIZE '100cc');Behind the scenes, this command adds a new 100ccreplica and removes the200ccreplica.
- 
Use the SHOW CLUSTER REPLICAScommand to check the status of the new replica:SHOW CLUSTER REPLICAS WHERE cluster = 'ingest_postgres';cluster | replica | size | ready -----------------+---------+--------+------- ingest_postgres | r1 | 100cc | t (1 row)
- 
Going forward, you can verify that your new cluster size is sufficient as follows: - 
In Materialize, get the replication slot name associated with your PostgreSQL source from the mz_internal.mz_postgres_sourcestable:SELECT d.name AS database_name, n.name AS schema_name, s.name AS source_name, pgs.replication_slot FROM mz_sources AS s JOIN mz_internal.mz_postgres_sources AS pgs ON s.id = pgs.id JOIN mz_schemas AS n ON n.id = s.schema_id JOIN mz_databases AS d ON d.id = n.database_id;
- 
In PostgreSQL, check the replication slot lag, using the replication slot name from the previous step: SELECT pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) AS replication_lag_bytes FROM pg_replication_slots WHERE slot_name = '<slot_name>';The result of this query is the amount of data your PostgreSQL cluster must retain in its replication log because of this replication slot. Typically, this means Materialize has not yet communicated back to PostgreSQL that it has committed this data. A high value can indicate that the source has fallen behind and that you might need to scale up your ingestion cluster. 
 
- 
Next steps
With Materialize ingesting your PostgreSQL data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.
- 
Explore your data with SHOW SOURCESandSELECT.
- 
Compute real-time results in memory with CREATE VIEWandCREATE INDEXor in durable storage withCREATE MATERIALIZED VIEW.
- 
Serve results to a PostgreSQL-compatible SQL client or driver with SELECTorSUBSCRIBEor to an external message broker withCREATE SINK.
- 
Check out the tools and integrations supported by Materialize.