SUBSCRIBE
SUBSCRIBE
streams updates from a source, table, view, or materialized view as
they occur.
Conceptual framework
The SUBSCRIBE
statement is a more general form of a SELECT
statement. While a SELECT
statement computes a relation at a moment in time, a
subscribe operation computes how a relation changes over time.
Fundamentally, SUBSCRIBE
produces a sequence of updates. An update describes either
the insertion or deletion of a row to the relation at a specific time. Taken
together, the updates describe the complete set of changes to a relation, in
order, while SUBSCRIBE
is active.
You can use SUBSCRIBE
to:
- Power event processors that react to every change to a relation or an arbitrary
SELECT
statement. - Replicate the complete history of a relation while
SUBSCRIBE
is active.
Syntax
SUBSCRIBE [TO] <object_name | (SELECT ...)>
[ENVELOPE UPSERT (KEY (<key1>, ...)) | ENVELOPE DEBEZIUM (KEY (<key1>, ...))]
[WITHIN TIMESTAMP ORDER BY <column1> [ASC | DESC] [NULLS LAST | NULLS FIRST], ...]
[AS OF [AT LEAST] <timestamp_expression>]
[UP TO <timestamp_expression>]
[WITH (<option_name> [= <option_value>], ...)]
where:
<object_name>
is the name of the source, table, view, or materialized view that you want to subscribe to.<select_stmt>
is theSELECT
statement whose output you want to subscribe to.
The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source.
Option | Description |
---|---|
ENVELOPE UPSERT (KEY (<key1>, …)) | If specified, use the upsert envelope, which takes a list of KEY columns. The upsert envelope supports inserts, updates and deletes in the subscription output. For more information, see Modifying the output format. |
ENVELOPE DEBEZIUM (KEY (<key1>, …)) | If specified, use a Debezium-style diff envelope, which takes a list of KEY columns. The Debezium envelope supports inserts, updates and deletes in the subscription output along with the previous state of the key. For more information, see Modifying the output format. |
WITHIN TIMESTAMP ORDER BY <column1>, … | If specified, use an ORDER BY clause to sort the subscription output within a timestamp. For each ORDER BY column, you can optionally specify:
|
AS OF <timestamp_expression> | If specified, no rows whose timestamp is earlier than the specified timestamp will be returned. For more information, see AS OF . |
UP TO <timestamp_expression> | If specified, no rows whose timestamp is greater than or equal to the specified timestamp will be returned. For more information, see UP TO . |
WITH <option_name> [= <option_value>] | If specified, use the specified option. For more information, see WITH options. |
WITH
options
The following options are valid within the WITH
clause.
Option name | Value type | Default | Describes |
---|---|---|---|
SNAPSHOT |
boolean |
true |
Whether to emit a snapshot of the current state of the relation at the start of the operation. See SNAPSHOT . |
PROGRESS |
boolean |
false |
Whether to include detailed progress information. See PROGRESS . |
Details
Output
SUBSCRIBE
emits a sequence of updates as rows. Each row contains all of the
columns of the subscribed relation or derived from the SELECT
statement, prepended
with several additional columns that describe the nature of the update:
Column | Type | Represents |
---|---|---|
mz_timestamp |
numeric |
Materialize's internal logical timestamp. This will never be less than any
timestamp previously emitted by the same SUBSCRIBE operation.
|
mz_progressed |
boolean |
This column is only present if the
true , indicates that the SUBSCRIBE will not emit
additional records at times strictly less than mz_timestamp . See
PROGRESS below.
|
mz_diff |
bigint |
The change in frequency of the row. A positive number indicates that
mz_diff copies of the row were inserted, while a negative
number indicates that |mz_diff| copies of the row were deleted.
|
Column 1 | Varies | The columns from the subscribed relation, each as its own column, representing the data that was inserted into or deleted from the relation. |
... | ||
Column N | Varies |
AS OF
When a history rentention
period
is configured for the object(s) powering the subscription, the AS OF
clause
allows specifying a timestamp at which the SUBSCRIBE
command should begin
returning results. If AS OF
is specified, no rows whose timestamp is earlier
than the specified timestamp will be returned. If the timestamp specified is
earlier than the earliest historical state retained by the underlying objects,
an error is thrown.
To configure the history retention period for objects used in a subscription,
see Durable
subscriptions.
If AS OF
is unspecified, the system automatically chooses an AS OF
timestamp.
UP TO
The UP TO
clause allows specifying a timestamp at which the SUBSCRIBE
will cease running. If UP TO
is specified, no rows whose timestamp is greater than or equal to the specified timestamp will be returned.
Interaction of AS OF
and UP TO
The lower timestamp bound specified by AS OF
is inclusive, whereas the upper bound specified by UP TO
is exclusive. Thus, a SUBSCRIBE
query whose AS OF
is equal to its UP TO
will terminate after returning zero rows.
A SUBSCRIBE
whose UP TO
is less than its AS OF
timestamp (whether that
timestamp was specified in an AS OF
clause or chosen by the system) will
signal an error.
Duration
SUBSCRIBE
will continue to run until canceled, the session ends, the UP TO
timestamp is reached, or all updates have been presented. The latter case typically occurs when
tailing constant views (e.g. CREATE VIEW v AS SELECT 1
).
Many PostgreSQL drivers wait for a query to complete before returning its
results. Since SUBSCRIBE
can run forever, naively executing a SUBSCRIBE
using your
driver’s standard query API may never return.
Either use an API in your driver that does not buffer rows or use the
FETCH
statement or AS OF
and UP TO
bounds
to fetch rows from SUBSCRIBE
in batches.
See the examples for details.
SNAPSHOT
By default, SUBSCRIBE
begins by emitting a snapshot of the subscribed relation, which
consists of a series of updates at its AS OF
timestamp describing the
contents of the relation. After the snapshot, SUBSCRIBE
emits further updates as
they occur.
For updates in the snapshot, the mz_timestamp
field will be fast-forwarded to the AS OF
timestamp.
For example, an insert that occurred before the SUBSCRIBE
began would appear in the snapshot.
To see only updates after the initial timestamp, specify WITH (SNAPSHOT = false)
.
PROGRESS
If the PROGRESS
option is specified via WITH (PROGRESS)
:
-
An additional
mz_progressed
column appears in the output. When the column isfalse
, the rest of the row is a valid update. When the column istrue
everything in the row except formz_timestamp
is not a valid update and its content should be ignored; the row exists only to communicate that timestamps have advanced. -
The first update emitted by the
SUBSCRIBE
is guaranteed to be a progress message indicating the subscribe’sAS OF
timestamp.
Intuitively, progress messages communicate that no updates have occurred in a given time window. Without explicit progress messages, it is impossible to distinguish between a stall in Materialize and a legitimate period of no updates.
Not all timestamps that appear will have a corresponding row with mz_progressed
set to true
.
For example, the following is a valid sequence of updates:
mz_timestamp | mz_progressed | mz_diff | column1
-------------|---------------|---------|--------------
1 | false | 1 | data
2 | false | 1 | more data
3 | false | 1 | even more data
4 | true | NULL | NULL
Notice how Materialize did not emit explicit progress messages for timestamps
1
or 2
. The receipt of the update at timestamp 2
implies that there
are no more updates for timestamp 1
, because timestamps are always presented
in non-decreasing order. The receipt of the explicit progress message at
timestamp 4
implies that there are no more updates for either timestamp
2
or 3
—but that there may be more data arriving at timestamp 4
.
Examples
SUBSCRIBE
produces rows similar to a SELECT
statement, except that SUBSCRIBE
may never complete.
Many drivers buffer all results until a query is complete, and so will never return.
Below are the recommended ways to work around this.
Creating a counter load generator
As an example, we’ll create a counter load generator that emits a row every second:
CREATE SOURCE counter FROM LOAD GENERATOR COUNTER;
Subscribing with FETCH
The recommended way to use SUBSCRIBE
is with DECLARE
and FETCH
.
These must be used within a transaction, with a single DECLARE
per transaction.
This allows you to limit the number of rows and the time window of your requests.
Next, let’s subscribe to the counter
load generator source that we’ve created above.
First, declare a SUBSCRIBE
cursor:
BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE (SELECT * FROM counter);
Then, use FETCH
in a loop to retrieve each batch of results as soon as it’s ready:
FETCH ALL c;
That will retrieve all of the rows that are currently available.
If there are no rows available, it will wait until there are some ready and return those.
A timeout
can be used to specify a window in which to wait for rows. This will return up to the specified count (or ALL
) of rows that are ready within the timeout. To retrieve up to 100 rows that are available in at most the next 1s
:
FETCH 100 c WITH (timeout='1s');
To retrieve all available rows available over the next 1s
:
FETCH ALL c WITH (timeout='1s');
A 0s
timeout can be used to return rows that are available now without waiting:
FETCH ALL c WITH (timeout='0s');
Using clients
If you want to use SUBSCRIBE
from an interactive SQL session (e.g.psql
), wrap the query in COPY
:
COPY (SUBSCRIBE (SELECT * FROM counter)) TO STDOUT;
Additional guides |
---|
Go |
Java |
Node.js |
PHP |
Python |
Ruby |
Rust |
Mapping rows to their updates
After all the rows from the SNAPSHOT
have been transmitted, the updates will be emitted as they occur. How can you map each row to its corresponding update?
mz_timestamp | mz_progressed | mz_diff | Column 1 | …. | Column N | |
---|---|---|---|---|---|---|
1 | false | 1 | id1 | value1 | ||
1 | false | 1 | id2 | value2 | ||
1 | false | 1 | id3 | value3 | <- Last row from SNAPSHOT |
|
2 | false | -1 | id1 | value1 | ||
2 | false | 1 | id1 | value4 |
If your row has a unique column key, it is possible to map the update to its corresponding origin row; if the key is unknown, you can use the output of hash(columns_values)
instead.
In the example above, Column 1
acts as the column key that uniquely identifies the origin row the update refers to; in case this was unknown, hashing the values from Column 1
to Column N
would identify the origin row.
Modifying the output format
ENVELOPE UPSERT
To modify the output of SUBSCRIBE
to support upserts, use ENVELOPE UPSERT
.
This clause allows you to specify a KEY
that Materialize uses to interpret
the rows as a series of inserts, updates and deletes within each distinct
timestamp.
The output columns are reordered so that all the key columns come before the value columns.
-
Using this modifier, the output rows will have the following structure:
SUBSCRIBE mview ENVELOPE UPSERT (KEY (key));
mz_timestamp | mz_state | key | value -------------|----------|------|-------- 100 | upsert | 1 | 2 100 | upsert | 2 | 4
-
For inserts and updates, the value columns for each key are set to the resulting value of the series of operations, and
mz_state
is set toupsert
.Insert
-- at time 200, add a new row with key=3, value=6 mz_timestamp | mz_state | key | value -------------|----------|------|-------- ... 200 | upsert | 3 | 6 ...
Update
-- at time 300, update key=1's value to 10 mz_timestamp | mz_state | key | value -------------|----------|------|-------- ... 300 | upsert | 1 | 10 ...
-
If only deletes occur within a timestamp, the value columns for each key are set to
NULL
, andmz_state
is set todelete
.Delete
-- at time 400, delete all rows mz_timestamp | mz_state | key | value -------------|----------|------|-------- ... 400 | delete | 1 | NULL 400 | delete | 2 | NULL 400 | delete | 3 | NULL ...
-
Only use
ENVELOPE UPSERT
when there is at most one live value per key. If materialize detects that a given key has multiple values, it will generate an update withmz_state
set to"key_violation"
, the problematic key, and all the values nulled out. Materialize is not guaranteed to detect this case, please don’t rely on it.Key violation
-- at time 500, introduce a key_violation mz_timestamp | mz_state | key | value -------------|-----------------|------|-------- ... 500 | key_violation | 1 | NULL ...
-
If
PROGRESS
is set, Materialize also returns themz_progressed
column. Each progress row will have aNULL
key and aNULL
value.
ENVELOPE DEBEZIUM
To enable this feature in your Materialize region, contact our team.
To modify the output of SUBSCRIBE
to support upserts using a
Debezium-style diff envelope,
use ENVELOPE DEBEZIUM
. This clause allows you to specify a KEY
that
Materialize uses to interpret the rows as a series of inserts, updates and
deletes within each distinct timestamp. Unlike ENVELOPE UPSERT
, the output
includes the state of the row before and after the upsert operation.
The output columns are reordered so that all the key columns come before the
value columns. There are two copies of the value columns: one prefixed with
before_
, which represents the value of the columns before the upsert
operation; and another prefixed with after_
, which represents the current
value of the columns.
-
Using this modifier, the output rows will have the following structure:
SUBSCRIBE mview ENVELOPE DEBEZIUM (KEY (key));
mz_timestamp | mz_state | key | before_value | after_value -------------|----------|------|--------------|------- 100 | upsert | 1 | NULL | 2 100 | upsert | 2 | NULL | 4
-
For inserts: the before values are
NULL
, the current value is the newly inserted value andmz_state
is set toinsert
.Insert
-- at time 200, add a new row with key=3, value=6 mz_timestamp | mz_state | key | before_value | after_value -------------|----------|------|--------------|------- ... 200 | insert | 3 | NULL | 6 ...
-
For updates: the before values are the old values, the value columns are the resulting values of the update, and
mz_state
is set toupsert
.Update
-- at time 300, update key=1's value to 10 mz_timestamp | mz_state | key | before_value | after_value -------------|----------|------|--------------|------- ... 300 | upsert | 1 | 2 | 10 ...
-
If only deletes occur within a timestamp, the value columns for each key are set to
NULL
, the before values are set to the old value andmz_state
is set todelete
.Delete
-- at time 400, delete all rows mz_timestamp | mz_state | key | before_value | after_value -------------|----------|------|--------------|------- ... 400 | delete | 1 | 10 | NULL 400 | delete | 2 | 4 | NULL 400 | delete | 3 | 6 | NULL ...
-
Like
ENVELOPE UPSERT
, usingENVELOPE DEBEZIUM
requires that there is at most one live value per key. If Materialize detects that a given key has multiple values, it will generate an update withmz_state
set to"key_violation"
, the problematic key, and all the values nulled out. Materialize identifies key violations on a best-effort basis.Key violation
-- at time 500, introduce a key_violation mz_timestamp | mz_state | key | before_value | after_value -------------|-----------------|------|--------------|------- ... 500 | key_violation | 1 | NULL | NULL ...
-
If
PROGRESS
is set, Materialize also returns themz_progressed
column. Each progress row will have aNULL
key and aNULL
before and after value.
WITHIN TIMESTAMP ORDER BY
To enable this feature in your Materialize region, contact our team.
To modify the ordering of the output of SUBSCRIBE
, use WITHIN TIMESTAMP ORDER BY
. This clause allows you to specify an ORDER BY
expression which is used
to sort the rows within each distinct timestamp.
-
The
ORDER BY
expression can take any column in the underlying object or query, includingmz_diff
.SUBSCRIBE mview WITHIN TIMESTAMP ORDER BY c1, c2 DESC NULLS LAST, mz_diff; mz_timestamp | mz_diff | c1 | c2 | c3 -------------|---------|---------------|------|----- 100 | +1 | 1 | 20 | foo 100 | -1 | 1 | 2 | bar 100 | +1 | 1 | 2 | boo 100 | +1 | 1 | 0 | data 100 | -1 | 2 | 0 | old 100 | +1 | 2 | 0 | new
-
If
PROGRESS
is set, progress messages are unaffected.
Dropping the counter
load generator source
When you’re done, you can drop the counter
load generator source:
DROP SOURCE counter;
Durable subscriptions
Because SUBSCRIBE
requests happen over the network, these connections might
get disrupted for both expected and unexpected reasons. You can adjust the
history retention
period
for the objects a subscription depends on, and then use AS OF
to
pick up where you left off on connection drops—this ensures that no data is lost
in the subscription process, and avoids the need for re-snapshotting the data.
For more information, see durable subscriptions.
Privileges
The privileges required to execute this statement are:
USAGE
privileges on the schemas that all relations and types in the query are contained in.SELECT
privileges on all relations in the query.- NOTE: if any item is a view, then the view owner must also have the necessary privileges to execute the view definition. Even if the view owner is a superuser, they still must explicitly be granted the necessary privileges.
USAGE
privileges on all types used in the query.USAGE
privileges on the active cluster.