Recursive CTEs
Recursive CTEs operate on the recursively-defined structures like trees or graphs implied from queries over your data.
Syntax
with_recursive_cte
recursive_cte_binding
Field | Use |
---|---|
RETURN AT RECURSION LIMIT $n | An optional clause indicating that the fixpoint computation should stop after $n iterations and use the current values computed for each recursive_cte_binding in the select_stmt . This could be useful when debugging and validating the correctness of recursive queries, or when you know exactly how many iterations you want to have, regardless of reaching a fixpoint. See the Examples section for an example. |
ERROR AT RECURSION LIMIT $n | An optional clause indicating that the fixpoint computation should stop after $n iterations and fail the query with an error. Adding this clause with a reasonably high limit is a good safeguard against accidentally running a non-terminating dataflow in your production clusters. |
recursive_cte_binding | A binding that gives the SQL fragment defined under select_stmt a cte_ident alias. This alias can be used in the same binding or in all other (preceding and subsequent) bindings in the enclosing recursive CTE block. In contrast to the cte_binding definition, a recursive_cte_binding needs to explicitly state its type as a comma-separated list of (col_ident col_type ) pairs. |
Details
Within a recursive CTEs block, any cte_ident
alias can be referenced in all recursive_cte_binding
definitions that live under the same block, as well as in the final select_stmt
for that block.
A WITH MUTUALLY RECURSIVE
block with a general form
WITH MUTUALLY RECURSIVE
-- A sequence of bindings, all in scope for all definitions.
$R_1(...) AS ( $sql_cte_1 ),
...
$R_n(...) AS ( $sql_cte_n )
-- Compute the result from the final values of all bindings.
$sql_body
is evaluated as if it was performing the following steps:
- Initially, bind
$R_1, ..., $R_n
to the empty collection. - Repeat in a loop:
- Update
$R_1
using the current values bound to$R_1, ..., $R_n
in$sql_cte_1
. - Update
$R_2
using the current values bound to$R_1, ..., $R_n
in$sql_cte_2
. Note that this includes the new value of$R_1
bound above. - …
- Update
$R_n
using the current values bound to$R_1, ..., $R_n
in$sql_cte_n
. Note that this includes the new values of$R_1, ..., $R_{n-1}
bound above.
- Update
- Exit the loop when one of the following conditions is met:
- The values bound to all CTEs have stopped changing.
- The optional early exit condition to
RETURN
orERROR AT ITERATION $i
was set and we have reached iteration$i
.
Note that Materialize’s ability to efficiently handle incremental changes to your inputs extends across loop iterations.
For each iteration, Materialize performs work resulting only from the input changes for this iteration and feeds back the resulting output changes to the next iteration.
When the set of changes for all bindings becomes empty, the recursive computation stops and the final select_stmt
is evaluated.
In the absence of recursive CTEs, every SELECT
query is guaranteed to compute its result or fail with an error within a finite amount of time.
However, introducing recursive CTEs complicates the situation as follows:
- The query might not converge (and may never terminate). Non-terminating queries never return a result and can consume a lot of your cluster resources. See an example below.
- A small update to a few (or even one) data points in your input might cascade in big updates in your recursive computation. This most likely will manifest in spikes of the cluster resources allocated to your recursive dataflows. See an example below.
Examples
Let’s consider a very simple schema consisting of users
that belong to a
hierarchy of geographical areas
and exchange transfers
between each other.
Use the SQL Shell to run the sequence of
commands below.
Example schema
-- A hierarchy of geographical locations with various levels of granularity.
CREATE TABLE areas(id int not null, parent int, name text);
-- A collection of users.
CREATE TABLE users(id char(1) not null, area_id int not null, name text);
-- A collection of transfers between these users.
CREATE TABLE transfers(src_id char(1), tgt_id char(1), amount numeric, ts timestamp);
Example data
DELETE FROM areas;
DELETE FROM users;
DELETE FROM transfers;
INSERT INTO areas VALUES
(1, 2 , 'Brooklyn'),
(2, 3 , 'New York'),
(3, 7 , 'United States of America'),
(4, 5 , 'Kreuzberg'),
(5, 6 , 'Berlin'),
(6, 7 , 'Germany'),
(7, null , 'Earth');
INSERT INTO users VALUES
('A', 1, 'Alice'),
('B', 4, 'Bob'),
('C', 2, 'Carol'),
('D', 5, 'Dan');
INSERT INTO transfers VALUES
('B', 'C', 20.0 , now()),
('A', 'D', 15.0 , now() + '05 seconds'),
('C', 'D', 25.0 , now() + '10 seconds'),
('A', 'B', 10.0 , now() + '15 seconds'),
('C', 'A', 35.0 , now() + '20 seconds');
Transitive closure
The following view will compute connected
as the transitive closure of a graph where:
- each
user
is a graph vertex, and - a graph edge between users
x
andy
exists only if a transfer fromx
toy
was made recently (using the rather small10 seconds
period here for the sake of illustration):
CREATE MATERIALIZED VIEW connected AS
WITH MUTUALLY RECURSIVE
connected(src_id char(1), dst_id char(1)) AS (
SELECT DISTINCT src_id, tgt_id FROM transfers WHERE mz_now() <= ts + interval '10s'
UNION
SELECT c1.src_id, c2.dst_id FROM connected c1 JOIN connected c2 ON c1.dst_id = c2.src_id
)
SELECT src_id, dst_id FROM connected;
To see results change over time, you can SUBSCRIBE
to the
materialized view and then use a different SQL Shell session to insert
some sample data into the base tables used in the view:
SUBSCRIBE(SELECT * FROM connected) WITH (SNAPSHOT = FALSE);
You’ll see results change as new data is inserted. When you’re done, cancel out
of the SUBSCRIBE
using Stop streaming.
connected
result might get close to the square of the number of users
.
Strongly connected components
Another thing that you might be interested in is identifying maximal sub-graphs where every pair of users
are connected
(the so-called strongly connected components (SCCs)) of the graph defined above.
This information might be useful to identify clusters of closely-tight users in your application.
Since you already have connected
defined as a MATERIALIZED VIEW
, you can piggy-back on that information to derive the SCCs of your graph.
Two users
will be in the same SCC only if they are connected
in both directions.
Consequently, given the connected
contents, we can:
- Restrict
connected
to the subset ofsymmetric
connections that go in both directions. - Identify the
scc
of eachusers
entry with the lowestdst_id
of allsymmetric
neighbors and its ownid
.
CREATE MATERIALIZED VIEW strongly_connected_components AS
WITH
symmetric(src_id, dst_id) AS (
SELECT src_id, dst_id FROM connected
INTERSECT ALL
SELECT dst_id, src_id FROM connected
)
SELECT u.id, least(min(c.dst_id), u.id)
FROM users u
LEFT JOIN symmetric c ON(u.id = c.src_id)
GROUP BY u.id;
Again, you can insert some sample data into the base tables and observe how the
materialized view contents change over time using SUBSCRIBE
:
SUBSCRIBE(SELECT * FROM strongly_connected_components) WITH (SNAPSHOT = FALSE);
When you’re done, cancel out of the SUBSCRIBE
using Stop streaming.
strongly_connected_components
definition given above is not recursive, but relies on the recursive CTEs from the connected
definition.
If you don’t need to keep track of the connected
contents for other reasons, you can use this alternative SCC definition which computes SCCs directly using repeated forward and backward label propagation.
Aggregations over a hierarchy
You might want to keep track of the aggregated net balance per area for a recent period of time. This can be achieved in three steps:
- Sum up the aggregated net balance for the set period for each user in an
user_balances
CTE. - Sum up the
user_balances
of users directly associated with an area in adirect_balances
CTE. - Recursively add to
direct_balances
theindirect_balances
sum of all child areas.
A materialized view that does the above three steps in three CTEs (of which the last one is recursive) can be defined as follows:
CREATE MATERIALIZED VIEW area_balances AS
WITH MUTUALLY RECURSIVE
user_balances(id char(1), balance numeric) AS (
WITH
credits AS (
SELECT src_id as id, sum(amount) credit
FROM transfers
WHERE mz_now() <= ts + interval '10s'
GROUP BY src_id
),
debits AS (
SELECT tgt_id as id, sum(amount) debit
FROM transfers
WHERE mz_now() <= ts + interval '10s'
GROUP BY tgt_id
)
SELECT
id,
coalesce(debit, 0) - coalesce(credit, 0) as balance
FROM
users
LEFT JOIN credits USING(id)
LEFT JOIN debits USING(id)
),
direct_balances(id int, parent int, balance numeric) AS (
SELECT
a.id as id,
a.parent as parent,
coalesce(sum(ub.balance), 0) as balance
FROM
areas a
LEFT JOIN users u ON (a.id = u.area_id)
LEFT JOIN user_balances ub ON (u.id = ub.id)
GROUP BY
a.id, a.parent
),
indirect_balances(id int, parent int, balance numeric) AS (
SELECT
db.id as id,
db.parent as parent,
db.balance + coalesce(sum(ib.balance), 0) as balance
FROM
direct_balances db
LEFT JOIN indirect_balances ib ON (db.id = ib.parent)
GROUP BY
db.id, db.parent, db.balance
)
SELECT
id, balance
FROM
indirect_balances;
As before, you can insert the example data and observe how the materialized view contents change over time from the psql
with the \watch
command:
SELECT id, name, balance FROM area_balances JOIN areas USING(id) ORDER BY id;
\watch 1
Non-terminating queries
Let’s look at a slight variation of the transitive closure example from above with the following changes:
- All
UNION
operators are replaced withUNION ALL
. - The
mz_now() < ts + $period
predicate from the firstUNION
branch is omitted. - The
WITH MUTUALLY RECURSIVE
clause has an optionalERROR AT RECURSION LIMIT 100
. - The final result in this example is ordered by
src_id, dst_id
.
WITH MUTUALLY RECURSIVE (ERROR AT RECURSION LIMIT 100)
connected(src_id char(1), dst_id char(1)) AS (
SELECT DISTINCT src_id, tgt_id FROM transfers
UNION ALL
SELECT src_id, dst_id FROM connected
UNION ALL
SELECT c1.src_id, c2.dst_id FROM connected c1 JOIN connected c2 ON c1.dst_id = c2.src_id
)
SELECT src_id, dst_id FROM connected ORDER BY src_id, dst_id;
After inserting the example data you can observe that executing the above statement returns the following error:
ERROR: Evaluation error: Recursive query exceeded the recursion limit 100. (Use RETURN AT RECURSION LIMIT to not error, but return the current state as the final result when reaching the limit.)
The recursive CTE connected
has not converged to a fixpoint within the first 100 iterations!
To see why, you can run variants of the same query where the
ERROR AT RECURSION LIMIT 100
clause is replaced by
RETURN AT RECURSION LIMIT $n -- where $n = 1, 2, 3, ...
and observe how the result changes after $n
iterations.
src_id | dst_id
--------+--------
A | B
...
src_id | dst_id
--------+--------
A | B
A | B
...
src_id | dst_id
--------+--------
A | B
A | B
A | B
...
Changing the UNION
to UNION ALL
in the connected
definition caused a full copy of transfer
to be added to the current value of connected
in each iteration!
Consequently, connected
never stops growing and the recursive CTE computation never reaches a fixpoint.
Queries with “update locality”
The examples presented so far have the following “update locality” property:
For example:
- Most of the time, inserting or removing
transfers
will not change the contents of theconnected
collection. This is true because:- An alternative path from
x
toy
already existed before the insertion, or - An alternative path from
x
toy
will exist after the removal.
- An alternative path from
- Inserting or removing
transfers
will not change most of the contents of thearea_balances
collection. This is true because:- Areas are organized in a hierarchy with a maximum height
h
. - A single transfer contributes directly only to the
area_balances
of the areas where thesrc_id
andtgt_id
belong. - A single transfer contributes indirectly only to the
area_balances
of the areas ancestor areas of the above two areas. - Consequently, each
transfer
change will affect at most2
areas in each iteration and at most2*h
areas in total.
- Areas are organized in a hierarchy with a maximum height
However, note that not all iterative algorithms have this property. For example:
- In a naive PageRank implementation, the introduction of a link between two pages
x
andy
will change the PageRank of these two pages and every pagez
transitively connected to eitherx
ory
. Since most graphs exhibit small-world properties, this might represent most of the existing pages. - In a naive k-means clustering algorithm, inserting or removing a new data point will affect the positions of the
k
mean points after each iteration.
Depending on the size and update frequency of your input collections, expressing algorithms that violate the “update locality” property (such as the examples above) using recursive CTEs in Materialize might lead to excessive CPU and memory consumption in the clusters that compute these recursive queries.