SUBSCRIBE

SUBSCRIBE streams updates from a source, table, view, or materialized view as they occur.

Conceptual framework

The SUBSCRIBE statement is a more general form of a SELECT statement. While a SELECT statement computes a relation at a moment in time, a subscribe operation computes how a relation changes over time.

Fundamentally, SUBSCRIBE produces a sequence of updates. An update describes either the insertion or deletion of a row to the relation at a specific time. Taken together, the updates describe the complete set of changes to a relation, in order, while SUBSCRIBE is active.

You can use SUBSCRIBE to:

  • Power event processors that react to every change to a relation or an arbitrary SELECT statement.
  • Replicate the complete history of a relation while SUBSCRIBE is active.

Syntax

SUBSCRIBE TO object_name ( select_stmt ) WITH ( option_name = option_value , ) AS OF AT LEAST timestamp_expression UP TO timestamp_expression ENVELOPE UPSERT ( KEY ( col_ref , ) ) ENVELOPE DEBEZIUM ( KEY ( col_ref , ) ) WITHIN TIMESTAMP ORDER BY col_ref ASC DESC NULLS LAST NULLS FIRST ,
Field Use
object_name The name of the source, table, or view that you want to subscribe to.
select_stmt The SELECT statement whose output you want to subscribe to.
ENVELOPE UPSERT Use the upsert envelope, which takes a list of KEY columns and supports inserts, updates and deletes in the subscription output. For more information, see Modifying the output format.
ENVELOPE DEBEZIUM Use a Debezium-style diff envelope, which takes a list of KEY columns and supports inserts, updates and deletes in the subscription output along with the previous state of the key. For more information, see Modifying the output format.
WITHIN TIMESTAMP…ORDER BY Use an ORDER BY clause to sort the subscription output within a timestamp. For more information, see Modifying the output format.

The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source.

WITH options

The following options are valid within the WITH clause.

Option name Value type Default Describes
SNAPSHOT boolean true Whether to emit a snapshot of the current state of the relation at the start of the operation. See SNAPSHOT.
PROGRESS boolean false Whether to include detailed progress information. See PROGRESS.

Details

Output

SUBSCRIBE emits a sequence of updates as rows. Each row contains all of the columns of the subscribed relation or derived from the SELECT statement, prepended with several additional columns that describe the nature of the update:

Column Type Represents
mz_timestamp numeric Materialize's internal logical timestamp. This will never be less than any timestamp previously emitted by the same SUBSCRIBE operation.
mz_progressed boolean

This column is only present if the PROGRESS option is specified.

If true, indicates that the SUBSCRIBE will not emit additional records at times strictly less than mz_timestamp. See PROGRESS below.
mz_diff bigint The change in frequency of the row. A positive number indicates that mz_diff copies of the row were inserted, while a negative number indicates that |mz_diff| copies of the row were deleted.
Column 1 Varies The columns from the subscribed relation, each as its own column, representing the data that was inserted into or deleted from the relation.
...
Column N Varies

AS OF

The AS OF clause allows specifying a timestamp at which the SUBSCRIBE should begin returning results, in order to inspect the historical state of a relation. If AS OF is specified, no rows whose timestamp is less than the specified timestamp will be returned. If the timestamp specified is earlier than the earliest historical state retained by the source relations, an error will be signaled.

If AS OF is unspecified, the system automatically chooses an AS OF timestamp.

Currently, all user-defined sources and tables have a retention window of one second, so AS OF is of limited usefulness except when subscribing to queries over certain internal relations.

UP TO

The UP TO clause allows specifying a timestamp at which the SUBSCRIBE will cease running. If UP TO is specified, no rows whose timestamp is greater than or equal to the specified timestamp will be returned.

Interaction of AS OF and UP TO

The lower timestamp bound specified by AS OF is inclusive, whereas the upper bound specified by UP TO is exclusive. Thus, a SUBSCRIBE query whose AS OF is equal to its UP TO will terminate after returning zero rows.

A SUBSCRIBE whose UP TO is less than its AS OF timestamp (whether that timestamp was specified in an AS OF clause or chosen by the system) will signal an error.

Duration

SUBSCRIBE will continue to run until canceled, the session ends, the UP TO timestamp is reached, or all updates have been presented. The latter case typically occurs when tailing constant views (e.g. CREATE VIEW v AS SELECT 1).

WARNING!

Many PostgreSQL drivers wait for a query to complete before returning its results. Since SUBSCRIBE can run forever, naively executing a SUBSCRIBE using your driver’s standard query API may never return.

Either use an API in your driver that does not buffer rows or use the FETCH statement or AS OF and UP TO bounds to fetch rows from SUBSCRIBE in batches. See the examples for details.

SNAPSHOT

By default, SUBSCRIBE begins by emitting a snapshot of the subscribed relation, which consists of a series of updates at its AS OF timestamp describing the contents of the relation. After the snapshot, SUBSCRIBE emits further updates as they occur.

For updates in the snapshot, the mz_timestamp field will be fast-forwarded to the AS OF timestamp. For example, an insert that occurred before the SUBSCRIBE began would appear in the snapshot.

To see only updates after the initial timestamp, specify WITH (SNAPSHOT = false).

PROGRESS

If the PROGRESS option is specified via WITH (PROGRESS):

  • An additional mz_progressed column appears in the output. When the column is false, the rest of the row is a valid update. When the column is true everything in the row except for mz_timestamp is not a valid update and its content should be ignored; the row exists only to communicate that timestamps have advanced.

  • The first update emitted by the SUBSCRIBE is guaranteed to be a progress message indicating the subscribe’s AS OF timestamp.

Intuitively, progress messages communicate that no updates have occurred in a given time window. Without explicit progress messages, it is impossible to distinguish between a stall in Materialize and a legitimate period of no updates.

Not all timestamps that appear will have a corresponding row with mz_progressed set to true. For example, the following is a valid sequence of updates:

mz_timestamp | mz_progressed | mz_diff | column1
-------------|---------------|---------|--------------
1            | false         | 1       | data
2            | false         | 1       | more data
3            | false         | 1       | even more data
4            | true          | NULL    | NULL

Notice how Materialize did not emit explicit progress messages for timestamps 1 or 2. The receipt of the update at timestamp 2 implies that there are no more updates for timestamp 1, because timestamps are always presented in non-decreasing order. The receipt of the explicit progress message at timestamp 4 implies that there are no more updates for either timestamp 2 or 3—but that there may be more data arriving at timestamp 4.

Examples

SUBSCRIBE produces rows similar to a SELECT statement, except that SUBSCRIBE may never complete. Many drivers buffer all results until a query is complete, and so will never return. Below are the recommended ways to work around this.

Creating a counter load generator

As an example, we’ll create a counter load generator that emits a row every second:

CREATE SOURCE counter FROM LOAD GENERATOR COUNTER;

Subscribing with FETCH

The recommended way to use SUBSCRIBE is with DECLARE and FETCH. These must be used within a transaction, with a single DECLARE per transaction. This allows you to limit the number of rows and the time window of your requests. Next, let’s subscribe to the counter load generator source that we’ve created above.

First, declare a SUBSCRIBE cursor:

BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE (SELECT * FROM counter);

Then, use FETCH in a loop to retrieve each batch of results as soon as it’s ready:

FETCH ALL c;

That will retrieve all of the rows that are currently available. If there are no rows available, it will wait until there are some ready and return those. A timeout can be used to specify a window in which to wait for rows. This will return up to the specified count (or ALL) of rows that are ready within the timeout. To retrieve up to 100 rows that are available in at most the next 1s:

FETCH 100 c WITH (timeout='1s');

To retrieve all available rows available over the next 1s:

FETCH ALL c WITH (timeout='1s');

A 0s timeout can be used to return rows that are available now without waiting:

FETCH ALL c WITH (timeout='0s');

Using clients

If you want to use SUBSCRIBE from an interactive SQL session (e.g.psql), wrap the query in COPY:

COPY (SUBSCRIBE (SELECT * FROM counter)) TO STDOUT;
Additional guides
Go
Java
Node.js
PHP
Python
Ruby
Rust

Mapping rows to their updates

After all the rows from the SNAPSHOT have been transmitted, the updates will be emitted as they occur. How can you map each row to its corresponding update?

mz_timestamp mz_progressed mz_diff Column 1 …. Column N
1 false 1 id1 value1
1 false 1 id2 value2
1 false 1 id3 value3 <- Last row from SNAPSHOT
2 false -1 id1 value1
2 false 1 id1 value4

If your row has a unique column key, it is possible to map the update to its corresponding origin row; if the key is unknown, you can use the output of hash(columns_values) instead.

In the example above, Column 1 acts as the column key that uniquely identifies the origin row the update refers to; in case this was unknown, hashing the values from Column 1 to Column N would identify the origin row.

Modifying the output format

ENVELOPE UPSERT

To modify the output of SUBSCRIBE to support upserts, use ENVELOPE UPSERT. This clause allows you to specify a KEY that Materialize uses to interpret the rows as a series of inserts, updates and deletes within each distinct timestamp.

The output columns are reordered so that all the key columns come before the value columns.

  • Using this modifier, the output rows will have the following structure:

    SUBSCRIBE mview ENVELOPE UPSERT (KEY (key));
    
    mz_timestamp | mz_state | key  | value
    -------------|----------|------|--------
    100          | upsert   | 1    | 2
    100          | upsert   | 2    | 4
    
  • For inserts and updates, the value columns for each key are set to the resulting value of the series of operations, and mz_state is set to upsert.

    Insert

     -- at time 200, add a new row with key=3, value=6
     mz_timestamp | mz_state | key  | value
     -------------|----------|------|--------
     ...
     200          | upsert   | 3    | 6
     ...
    

    Update

     -- at time 300, update key=1's value to 10
     mz_timestamp | mz_state | key  | value
     -------------|----------|------|--------
     ...
     300          | upsert   | 1    | 10
     ...
    
  • If only deletes occur within a timestamp, the value columns for each key are set to NULL, and mz_state is set to delete.

    Delete

     -- at time 400, delete all rows
     mz_timestamp | mz_state | key  | value
     -------------|----------|------|--------
     ...
     400          | delete   | 1    | NULL
     400          | delete   | 2    | NULL
     400          | delete   | 3    | NULL
     ...
    
  • Only use ENVELOPE UPSERT when there is at most one live value per key. If materialize detects that a given key has multiple values, it will generate an update with mz_state set to "key_violation", the problematic key, and all the values nulled out. Materialize is not guaranteed to detect this case, please don’t rely on it.

    Key violation

     -- at time 500, introduce a key_violation
     mz_timestamp | mz_state        | key  | value
     -------------|-----------------|------|--------
     ...
     500          | key_violation   | 1    | NULL
     ...
    
  • If PROGRESS is set, Materialize also returns the mz_progressed column. Each progress row will have a NULL key and a NULL value.

ENVELOPE DEBEZIUM

PREVIEW This feature is in private preview. It may have performance or stability issues and is under active development. It isn't subject to our backwards compatibility guarantees.

You must contact us to enable this feature in your Materialize region.

To modify the output of SUBSCRIBE to support upserts using a Debezium-style diff envelope , useENVELOPE DEBEZIUM. This clause allows you to specify a KEY that Materialize uses to interpret the rows as a series of inserts, updates and deletes within each distinct timestamp. Unlike ENVELOPE UPSERT, the output includes the state of the row before and after the upsert operation.

The output columns are reordered so that all the key columns come before the value columns. There are two copies of the value columns: one prefixed with before_, which represents the value of the columns before the upsert operation; and another prefixed with after_, which represents the current value of the columns.

  • Using this modifier, the output rows will have the following structure:

    SUBSCRIBE mview ENVELOPE DEBEZIUM (KEY (key));
    
    mz_timestamp | mz_state | key  | before_value | after_value
    -------------|----------|------|--------------|-------
    100          | upsert   | 1    | NULL         | 2
    100          | upsert   | 2    | NULL         | 4
    
  • For inserts: the before values are NULL, the current value is the newly inserted value and mz_state is set to insert.

    Insert

     -- at time 200, add a new row with key=3, value=6
     mz_timestamp | mz_state | key  | before_value | after_value
     -------------|----------|------|--------------|-------
     ...
     200          | insert   | 3    | NULL         | 6
     ...
    
  • For updates: the before values are the old values, the value columns are the resulting values of the update, and mz_state is set toupsert.

    Update

     -- at time 300, update key=1's value to 10
     mz_timestamp | mz_state | key  | before_value | after_value
     -------------|----------|------|--------------|-------
     ...
     300          | upsert   | 1    | 2            | 10
     ...
    
  • If only deletes occur within a timestamp, the value columns for each key are set to NULL, the before values are set to the old value and mz_state is set to delete.

    Delete

     -- at time 400, delete all rows
     mz_timestamp | mz_state | key  | before_value | after_value
     -------------|----------|------|--------------|-------
     ...
     400          | delete   | 1    | 10           | NULL
     400          | delete   | 2    | 4            | NULL
     400          | delete   | 3    | 6            | NULL
     ...
    
  • Like ENVELOPE UPSERT, using ENVELOPE DEBEZIUM requires that there is at most one live value per key. If Materialize detects that a given key has multiple values, it will generate an update with mz_state set to "key_violation", the problematic key, and all the values nulled out. Materialize identifies key violations on a best-effort basis.

    Key violation

     -- at time 500, introduce a key_violation
     mz_timestamp | mz_state        | key  | before_value | after_value
     -------------|-----------------|------|--------------|-------
     ...
     500          | key_violation   | 1    | NULL         | NULL
     ...
    
  • If PROGRESS is set, Materialize also returns the mz_progressed column. Each progress row will have a NULL key and a NULL before and after value.

WITHIN TIMESTAMP ORDER BY

PREVIEW This feature is in private preview. It may have performance or stability issues and is under active development. It isn't subject to our backwards compatibility guarantees.

You must contact us to enable this feature in your Materialize region.

To modify the ordering of the output of SUBSCRIBE, use WITHIN TIMESTAMP ORDER BY. This clause allows you to specify an ORDER BY expression which is used to sort the rows within each distinct timestamp.

  • The ORDER BY expression can take any column in the underlying object or query, including mz_diff.

    SUBSCRIBE mview WITHIN TIMESTAMP ORDER BY c1, c2 DESC NULLS LAST, mz_diff;
    
    mz_timestamp | mz_diff | c1            | c2   | c3
    -------------|---------|---------------|------|-----
    100          | +1      | 1             | 20   | foo
    100          | -1      | 1             | 2    | bar
    100          | +1      | 1             | 2    | boo
    100          | +1      | 1             | 0    | data
    100          | -1      | 2             | 0    | old
    100          | +1      | 2             | 0    | new
    
  • If PROGRESS is set, progress messages are unaffected.

Dropping the counter load generator source

When you’re done, you can drop the counter load generator source:

DROP SOURCE counter;

Privileges

The privileges required to execute this statement are:

  • USAGE privileges on the schemas that all relations and types in the query are contained in.
  • SELECT privileges on all relations in the query.
    • NOTE: if any item is a view, then the view owner must also have the necessary privileges to execute the view definition. Even if the view owner is a superuser, they still must explicitly be granted the necessary privileges.
  • USAGE privileges on all types used in the query.
  • USAGE privileges on the active cluster.
Back to top ↑