Ingest data from Neon

💡 Tip: For help getting started with your own data, you can schedule a free guided trial.

Neon is a fully managed serverless PostgreSQL provider. It separates compute and storage to offer features like autoscaling, branching and bottomless storage.

This page shows you how to stream data from a Neon database to Materialize using the PostgreSQL source.

Before you begin

  • Make sure you have a Neon account.

  • Make sure you have access to your Neon instance via psql or the SQL editor in the Neon Console.

A. Configure Neon

The steps in this section are specific to Neon. You can run them by connecting to your Neon database using a psql client or the SQL editor in the Neon Console.

1. Enable logical replication

WARNING! Enabling logical replication applies globally to all databases in your Neon project, and cannot be reverted. It also restarts all computes, which means that any active connections are dropped and have to reconnect.

Materialize uses PostgreSQL’s logical replication protocol to track changes in your database and propagate them to Materialize.

As a first step, you need to make sure logical replication is enabled in Neon.

  1. Select your project in the Neon Console.

  2. On the Neon Dashboard, select Settings.

  3. Select Logical Replication.

  4. Click Enable to enable logical replication.

You can verify that logical replication is enabled by running:

SHOW wal_level;

The result should be:

 wal_level
-----------
 logical

2. Create a publication and a replication user

Once logical replication is enabled, the next step is to create a publication with the tables that you want to replicate to Materialize. You’ll also need a user for Materialize with sufficient privileges to manage replication.

  1. For each table that you want to replicate to Materialize, set the replica identity to FULL:

    ALTER TABLE <table1> REPLICA IDENTITY FULL;
    
    ALTER TABLE <table2> REPLICA IDENTITY FULL;
    

    REPLICA IDENTITY FULL ensures that the replication stream includes the previous data of changed rows, in the case of UPDATE and DELETE operations. This setting enables Materialize to ingest Neon data with minimal in-memory state. However, you should expect increased disk usage in your Neon database.

  2. Create a publication with the tables you want to replicate:

    For specific tables:

    CREATE PUBLICATION mz_source FOR TABLE <table1>, <table2>;
    

    For all tables in the database:

    CREATE PUBLICATION mz_source FOR ALL TABLES;
    

    The mz_source publication will contain the set of change events generated from the specified tables, and will later be used to ingest the replication stream.

    Be sure to include only the tables you need. If the publication includes additional tables, Materialize will waste resources on ingesting and then immediately discarding the data.

  3. Create a dedicated user for Materialize, if you don’t already have one. The default user created with your Neon project and users created using the Neon CLI, Console or API are granted membership in the neon_superuser role, which has the required REPLICATION privilege.

    While you can use the default user for replication, we recommend creating a dedicated user for security reasons.

    Use the roles create CLI command to create a new role.

    neon roles create --name materialize
    
    1. Navigate to the Neon Console.
    2. Select a project.
    3. Select Branches.
    4. Select the branch where you want to create the role.
    5. Select the Roles & Databases tab.
    6. Click Add Role.
    7. In the role creation dialog, specify the role name as “materialize”.
    8. Click Create. The role is created, and you are provided with the password for the role.

    Use the roles endpoint to create a new role.

    curl 'https://console.neon.tech/api/v2/projects/<project_id>/branches/<branch_id>/roles' \
    -H 'Accept: application/json' \
    -H "Authorization: Bearer $NEON_API_KEY" \
    -H 'Content-Type: application/json' \
    -d '{
    "role": {
        "name": "materialize"
    }
    }' | jq
    
  4. Grant the user the required permissions on the schema(s) you want to replicate:

    GRANT USAGE ON SCHEMA public TO materialize;
    
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO materialize;
    
    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO materialize;
    

    Granting SELECT ON ALL TABLES IN SCHEMA instead of on specific tables avoids having to add privileges later if you add tables to your publication.

B. (Optional) Configure network security

NOTE: If you are prototyping and your Neon instance is publicly accessible, you can skip this step. For production scenarios, we recommend using IP Allow to limit the IP addresses that can connect to your Neon instance.

Allow Materialize IPs

If you use Neon’s IP Allow feature to limit the IP addresses that can connect to your Neon instance, you will need to allow inbound traffic from Materialize IP addresses.

  1. In the Materialize console’s SQL Shell, or your preferred SQL client connected to Materialize, run the following query to find the static egress IP addresses, for the Materialize region you are running in:

    SELECT * FROM mz_egress_ips;
    
  2. In your Neon project, add the IPs to your IP Allow list:

    1. Select your project in the Neon Console.
    2. On the Neon Dashboard, select Settings.
    3. Select IP Allow.
    4. Add each Materialize IP address to the list.

C. Ingest data in Materialize

The steps in this section are specific to Materialize. You can run them in the Materialize console’s SQL Shell or your preferred SQL client connected to Materialize.

1. (Optional) Create a cluster

NOTE: If you are prototyping and already have a cluster to host your PostgreSQL source (e.g. quickstart), you can skip this step. For production scenarios, we recommend separating your workloads into multiple clusters for resource isolation.

In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to clients, or a combination.

In this case, you’ll create a dedicated cluster for ingesting source data from your PostgreSQL database.

  1. In the SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CLUSTER command to create the new cluster:

    CREATE CLUSTER ingest_postgres (SIZE = '200cc');
    
    SET CLUSTER = ingest_postgres;
    

    A cluster of size 200cc should be enough to process the initial snapshot of the tables in your publication. For very large snapshots, consider using a larger size to speed up processing. Once the snapshot is finished, you can readjust the size of the cluster to fit the volume of changes being replicated from your upstream PostgeSQL database.

2. Start ingesting data

Now that you’ve configured your database network and created an ingestion cluster, you can connect Materialize to your Neon database and start ingesting data.

  1. Run the CREATE SECRET command to securely store the password for the materialize PostgreSQL user you created earlier:

    CREATE SECRET pgpass AS '<PASSWORD>';
    

    You can access the password for your Neon user from the Connection Details widget on the Neon Dashboard.

  2. Use the CREATE CONNECTION command to create a connection object with access and authentication details for Materialize to use:

    CREATE CONNECTION pg_connection TO POSTGRES (
      HOST '<host>',
      PORT 5432,
      USER '<user_name>',
      PASSWORD SECRET pgpass,
      SSL MODE 'require',
      DATABASE '<database>'
    );
    

    You can find the connection details for your replication user in the Connection Details widget on the Neon Dashboard. A Neon connection string looks like this:

    postgresql://materialize:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech/dbname?sslmode=require
    
    • Replace <host> with your Neon hostname (e.g., ep-cool-darkness-123456.us-east-2.aws.neon.tech).
    • Replace <role_name> with the dedicated replication user (e.g., materialize).
    • Replace <database> with the name of the database containing the tables you want to replicate to Materialize (e.g., dbname).
  3. Use the CREATE SOURCE command to connect Materialize to your Neon database and start ingesting data from the publication you created earlier:

    CREATE SOURCE mz_source
      IN CLUSTER ingest_postgres
      FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
      FOR ALL TABLES;
    

    By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause. To ingest data from specific schemas or tables in your publication, use FOR SCHEMAS (<schema1>,<schema2>) or FOR TABLES (<table1>, <table2>) instead of FOR ALL TABLES.

  4. After source creation, you can handle upstream schema changes for specific replicated tables using the ALTER SOURCE...{ADD | DROP} SUBSOURCE syntax.

3. Monitor the ingestion status

Before it starts consuming the replication stream, Materialize takes a snapshot of the relevant tables in your publication. Until this snapshot is complete, Materialize won’t have the same view of your data as your PostgreSQL database.

In this step, you’ll first verify that the source is running and then check the status of the snapshotting process.

  1. Back in the SQL client connected to Materialize, use the mz_source_statuses table to check the overall status of your source:

    WITH
      source_ids AS
      (SELECT id FROM mz_sources WHERE name = 'mz_source')
    SELECT *
    FROM
      mz_internal.mz_source_statuses
        JOIN
          (
            SELECT referenced_object_id
            FROM mz_internal.mz_object_dependencies
            WHERE
              object_id IN (SELECT id FROM source_ids)
            UNION SELECT id FROM source_ids
          )
          AS sources
        ON mz_source_statuses.id = sources.referenced_object_id;
    

    For each subsource, make sure the status is running. If you see stalled or failed, there’s likely a configuration issue for you to fix. Check the error field for details and fix the issue before moving on. Also, if the status of any subsource is starting for more than a few minutes, contact our team.

  2. Once the source is running, use the mz_source_statistics table to check the status of the initial snapshot:

    WITH
      source_ids AS
      (SELECT id FROM mz_sources WHERE name = 'mz_source')
    SELECT sources.referenced_object_id AS id, mz_sources.name, snapshot_committed
    FROM
      mz_internal.mz_source_statistics
        JOIN
          (
            SELECT object_id, referenced_object_id
            FROM mz_internal.mz_object_dependencies
            WHERE
              object_id IN (SELECT id FROM source_ids)
            UNION SELECT id, id FROM source_ids
          )
          AS sources
        ON mz_source_statistics.id = sources.referenced_object_id
        JOIN mz_sources ON mz_sources.id = sources.referenced_object_id;
    

    object_id | snapshot_committed
    ----------|------------------
     u144     | t
    (1 row)
    

    Once snapshot_commited is t, move on to the next step. Snapshotting can take between a few minutes to several hours, depending on the size of your dataset and the size of the cluster the source is running in.

4. Right-size the cluster

After the snapshotting phase, Materialize starts ingesting change events from the PostgreSQL replication stream. For this work, Materialize generally performs well with an 100cc replica, so you can resize the cluster accordingly.

  1. Still in a SQL client connected to Materialize, use the ALTER CLUSTER command to downsize the cluster to 100cc:

    ALTER CLUSTER ingest_postgres SET (SIZE '100cc');
    

    Behind the scenes, this command adds a new 100cc replica and removes the 200cc replica.

  2. Use the SHOW CLUSTER REPLICAS command to check the status of the new replica:

    SHOW CLUSTER REPLICAS WHERE cluster = 'ingest_postgres';
    

         cluster     | replica |  size  | ready
    -----------------+---------+--------+-------
     ingest_postgres | r1      | 100cc  | t
    (1 row)
    
  3. Going forward, you can verify that your new cluster size is sufficient as follows:

    1. In Materialize, get the replication slot name associated with your PostgreSQL source from the mz_internal.mz_postgres_sources table:

      SELECT
          d.name AS database_name,
          n.name AS schema_name,
          s.name AS source_name,
          pgs.replication_slot
      FROM
          mz_sources AS s
          JOIN mz_internal.mz_postgres_sources AS pgs ON s.id = pgs.id
          JOIN mz_schemas AS n ON n.id = s.schema_id
          JOIN mz_databases AS d ON d.id = n.database_id;
      
    2. In PostgreSQL, check the replication slot lag, using the replication slot name from the previous step:

      SELECT
          pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn)
          AS replication_lag_bytes
      FROM pg_replication_slots
      WHERE slot_name = '<slot_name>';
      

      The result of this query is the amount of data your PostgreSQL cluster must retain in its replication log because of this replication slot. Typically, this means Materialize has not yet communicated back to PostgreSQL that it has committed this data. A high value can indicate that the source has fallen behind and that you might need to scale up your ingestion cluster.

Next steps

With Materialize ingesting your PostgreSQL data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.

Back to top ↑