Transaction Processing in the Data Plane

June 17, 2026

Frank McSherry
Chief Scientist

We'll write transaction commit logic in a SQL view, which can enable higher throughput than performing transaction commits in the control plane. Incremental view maintenance (IVM) makes the critical path fast enough that the determination of which transactions to commit and which to roll back can be tolerable (and beats collapsing under load). Not only tolerable, but .. interactive timescales (~30ms).

The tl;dr: we'll write all transaction intents to a table, and maintain a view over the table that reports which transactions commit and which roll back.
Incremental view mainteneance (IVM) in Materialize makes this fast, and moves the work from the control plane to the data plane.
Some additional (maintained) views support garbage collection, removing all but the most recent committed writes.

Caveats: This post is largely for educational purposes; please do not actually implement transactions this way without having a hard think about what you need. Also, I'm sure I'm not the only one to think of things this way, and there may already be real products that do this for you by now. Ververica's Streaming Ledger is an example that does something similar, but I don't understand it well enough to say. Let me know if you know better, or of other approaches!

For the curious, the last section is an appendix where I let Claude loose on the project, and it found several improvements and then (nudged) wrote about its experience and findings.
Soon I'll be out of work!

Database Transactions

Databases are built on the back of "transactions": bundles of commands that need to appear to happen at a single moment in time, or not at all. Transactions can read some data, write some data, write, read, write, maybe do some other things, and then eventually end with either COMMIT or ROLLBACK. At that moment the database needs either to make it real, or to just walk away.

Transactions can be pretty gory to implement because they are generally "interactive": the user doesn't express their full intent ahead of time, as they often do in most programming languages, but instead reveals it to the database one command at a time. Until the database sees a COMMIT everything is, and fundamentally needs to be, entirely tentative. The next command could be a ROLLBACK instead, and then the whole bundle of commands is off.

Because things are so gory, transaction processing often lives in the "control plane" of the database. Some number of transactions are in flight at any moment, and some careful centralized logic needs to keep track of what is real and what is not. This often limits the effective throughput of transaction processing: the number of potentially conflicting transactions that can be resolved per second, say.

At the same time, some classes of transactions are simple enough that we will be able to migrate them from the control plane to the data plane. The control plane often coordinates with shared locks, single threading, and other forms of mutual exclusion. The data plane, by contrast, works by extracting independence from the supplied task (e.g. a SQL query, and the join keys therein). The data plane is generally much more scalable than the control plane, providing a higher throughput, but with an expressivity tax imposed by not using the control plane.

A Running Example: Banks and Stuff

A classic example of non-trivial transaction processing derives from accounts containing money.

Imagine a database of users with accounts, who would like to redistribute wealth among themselves. The participants are mercurial, and don't want to reveal their plans ahead of time. They will only indicate the intended transfers when presented with the current balances of various accounts. Likewise, the transfers should only occur if they can occur just so, exactly as articulated by the participant. A transaction identifies a few accounts and their observed balances, and indicates the intended new balances each account should have (perhaps subject to constraints like having the same total).

Imagine you get millions of these transactions a second, each reading from and writing to sets of accounts that are referenced in multiple transactions. As you might imagine, you have something of a mess in front of you. You could start pulling candidate transactions off a queue and see which should commit and which should not, but you'll quickly find what you have is a data processing problem. The volume of transactions, their potential conflicts, all need more computation that we hope to introduce to an otherwise nimble control plane.

At the expense of some flexibility, this task can be broken apart into two problems:

  1. resolve the order in which transactions would commit, and then
  2. determine which of these ordered transactions should in fact commit.

The first problem can be addressed somewhat locally (epochs plus a transaction uuid). The second problem can be addressed scalably in the data plane, and is what we'll look at next!

Read Sets and Write Sets

Many transactions can be summed up by their read sets and write sets. Read sets are the values that the transaction read (or failed to read, if absent). Write sets are the values that it then chose to write conditional on those reads. If the read values are as observed the transaction should commit. If the read values are no longer as observed the transaction must roll back.

Let's look at the problem of resolving a collection of pairs of read and write sets. "Resolving" means picking out a subset of pairs with the property that when put in some order, the read set of each matches the most recent prior values. This is a limited way to approach transaction processing: great for our account example, but less good when your transaction needs to read a hash of the whole database, or other horrible things.

Let's model intended transactions, those that end with a COMMIT, with a table that records for any transaction identifier its read and write sets.

1
-- A transaction consists of values read and values to write.
2
CREATE TABLE intents (id INT, is_read BOOL, key TEXT, val TEXT);
sql

We'll use the transaction id to order transactions, and we'll want this to generally increase. If it helps, think of it as a pair (epoch, UUID) where epoch continually increments as time passes. We'll see how this epoch requirement eventually informs the latency of transaction resolution.

We'll use "keys" and "values" to describe where and what was read. You can imagine this as a key-value store, or relations where there exists a primary key. The ideas generalize to keyless collections, which are fundamentally key-value, where the key is the whole row and the value is its multiplicity.

A set of rows all with the same `id` indicate that you'd very much like to have your write values committed, as long as the reads still hold.

Some example transactions

Let's talk through a sequence of intended transactions and see what needs to happen with them.


This sequence was produced by Claude to demonstrate the mechanisms we'll explore. To start,

1
-- Each transaction that follows is part of this insert statement.
2
INSERT INTO intents (id, is_read, key, val) VALUES
sql


We're leading with an `INSERT INTO` just so all the rest of lines could plausibly be a SQL command.
Remember, we're landing all of this data in `intents` as our mechanism to propose a transaction.

1
(1, false, 'alice', '100'),
sql


The first transaction is a blind write: no row has `true` for the `is_read` column, so there is nothing gating the transaction.
It is an unconditional write.

1
(2, true, 'bob', NULL),
2
(2, false, 'bob', '50'),
sql


The second transaction needs `bob` to be `NULL` and in that case it writes `50`.

1
(3, true, 'alice', '100'),
2
(3, true, 'bob', '50'),
3
(3, false, 'alice', '70'),
4
(3, false, 'bob', '80')
sql


We're moving `30` from `alice` to `bob`, but only if specific balances hold.

1
(4, true, 'alice', '100'),
2
(4, true, 'bob', '50'),
3
(4, false, 'alice', '60'),
4
(4, false, 'bob', '90'),
sql


Another transaction that attempts to move `40`, but will fail because the reads don't match.

1
(5, true, 'alice', '70'),
2
(5, true, 'bob', '80'),
3
(5, false, 'alice', '50'),
4
(5, false, 'bob', '100'),
sql


A new attempt to transfer, using the refreshed values written by transaction `3`.
This should succeed.

All of these transaction are just data, written to the `intents` table.
But how do these transaction intents get from the `intents` table into the committed state?

Committed .. but to Where?

Our committed state will just be a view over intents.

This is very similar to how many databases' in-memory state is a roll-up of their write-ahead log.

For any collection of intended writes, we'll write some SQL that from intents picks winners and losers, and the winners are the transactions that "commit". We don't actually have to put those winners in a separate location, though it will make sense for us to build an index over them, as this is what most folks want to read.

Importantly, intents can change. When `intents` changes, our view of committed transactions will change in lockstep with it.

Changes to intents have the (intended) ability to also change our view of what has committed, and we'll need to take some care here. The read and write sets come from transactions that ended with a COMMIT, and ideally the sets for each transaction id should not themselves change once written. However, new transactions can still be added, and we need to be mindful of the identifiers they are added with. We will end up being certain about a transaction only once we are sure that all future transactions will use a higher identifier, which is something we can force with, for example, epochs.

We will be able to tidy intents, asynchronously, so that it doesn't grow without bound. We can remove failed transactions, remove the read sets of committed transactions, and remove writes that are themselves overwritten without being observed (e.g. after we remove all those read sets). Importantly, this work can be asynchronous, after the fact, rather than on the critical path of transaction resolution.

We'll end up with an indexed view over the most recently written values, to present as if we just had the data in an index. But, we'll have described the logic to derive it from intents declaratively (using SQL) rather than imperatively (using whatever your database is written in).

Resolving Transactions

Each transaction can commit if each of its read values match the previously committed write to those keys. Let's just write that logic in SQL! How would we do that?

It's not easy in vanilla SQL, at least not for me. It is pretty easy when you use recursive SQL. Brief, at least, if "easy" doesn't sound likely to you.

Informally, we'll iteratively develop, from initially empty sets:

  1. tentative writes (as a function of tentative commits),
  2. tentative reads (as a function of those tentative writes), and
  3. tentative commits (as a function of the tentative reads).

From tentative writes, reads, and commits, we'll return to re-evaluate the writes, then reads, then commits. If they change we'll go around again, and again, and again until they stop changing. Wait for a moment for the argument that it always terminates.

It turns out we'll be better served by developing transactions that do not commit, those that must be rolled back. This starts from the optimistic take that everything should commit until we have evidence that it shouldn't.

The following SQL does exactly that:

1
CREATE VIEW to_rollback AS
2
WITH MUTUALLY RECURSIVE
3

4
    -- Tentative writes as a function of `rollback`.
5
    writes(id INT, key TEXT, val TEXT) AS (
6
        SELECT intents.id, key, val
7
        FROM intents
8
        WHERE NOT EXISTS (SELECT FROM rollback WHERE intents.id = rollback.id)
9
          AND NOT intents.is_read
10
    ),
11

12
    -- Reads, and the corresponding read value.
13
    -- These may not match, which informs `rollback`.
14
    reads(id INT, val TEXT, red TEXT) AS (
15
        SELECT id, val, (
16
            SELECT DISTINCT ON (key) val
17
            FROM writes
18
            WHERE writes.id < intents.id
19
              AND writes.key = intents.key
20
            ORDER BY key, writes.id DESC, val
21
        )
22
        FROM intents
23
        WHERE intents.is_read
24
    ),
25

26
    -- Roll back transactions with a failed read.
27
    rollback(id INT) AS (
28
        SELECT DISTINCT reads.id
29
        FROM reads
30
        WHERE reads.val IS DISTINCT FROM reads.red
31
    )
32

33
SELECT * FROM rollback;
sql

If WITH MUTUALLY RECURSIVE (WMR) is scary jargon, it may help to talk through how to determine what it will produce as output. Each WMR block starts with all terms in scope (writes, reads, and rollback), and initially empty. It then repeatedly updates the contents of these collections by applying the stated rules, in the order stated, until their contents stop changing. It then runs that final SELECT block at the bottom, because all SQL fragments must return one collection.

In our case we start with initially empty collections, and in particular an initially empty rollback. Based on this, we'll initially imagine all writes succeed, all reads read the previously written values, and transactions must roll back if those values don't match the reads. If any transactions must roll back, we'll need to repeat the process, as some writes may not happen now, which may change reads (positively or negatively), then rollback, etc.

Does this ever stop? Indeed it must, but let's explain that in a different subsection.

Convergence and Termination

The above logic always terminates, and the rate of convergence can be (but is not always) very fast.

Termination is probably easiest to grok, so let's start there.

The reads of a transaction, and the decision to roll it back, depend only on strictly prior transactions. If in some iteration the smallest identifier to change in rollback was i, then in the next iteration all identifiers up through and including i will not change in rollback. Each round of iteration makes permanent progress in stabilizing rollback, increasing the least identifier that can change by at least one.

It could take as many rounds of iteration as there are distinct identifiers in intents, and we can contrive inputs that will do this, but it will certainly terminate.

Convergence is more nuanced, but follows similar principles.

A transaction can only be influenced by transactions with lower identifiers that intend writes for its read set. Imagine that transaction identifiers are randomly assigned (perhaps (epoch, uuid) rather than int). If you are a transaction, and there are k other transactions that mean to write to your read set, there is a one out of k + 1 chance that you have the lowest identifier. If you have the lowest identifier you can commit, immediately and permanently! Not just you, but all transactions that have the lowest identifier among their potential conflicts. Potentially a linear fraction of transactions in the first iteration alone.

One way to imagine this is as a directed graph on nodes that correspond to transactions, with directed edges from one to another if the one would write at the read set of the other. With random identifiers sprinkled atop the nodes, the number of rounds of iterations is bounded by the longest directed path with increasing node identifiers. Informally, it can be hard to find long increasing paths, because as identifiers increase it is less likely a neighbor will have a larger identifier. If the graph is dense (mostly conflicts) it will happen, but if it is sparse (limited conflicts) it is substantially less likely.

20-year-ago me, who then was facile with random graph theory, would be mortified to read the utter helplessness of the preceding paragraph. Let's leave it with the tl;dr that while there can be many iterations, there don't need to be many iterations.

Asynchronous Maintenance

Let's talk through some of the maintenance we can perform on intents as we go, to avoid unbounded data growth and unbounded work to evaluate the to_rollback view.

Before we go too far, recall that the view shows us what is true at a moment in time, but with intents continually changing we need a way to lock down some of the transaction identifiers. One way to do this is to insist that transaction identifiers only increase, and reject insertions into intents if they use an identifier not strictly greater than the maximum contained therein. Another option is to pair epochs and UUIDs as transaction identifiers, and to advance epochs whenever it feels good to resolve more transactions. We will just use a comment to remind you that it needs to be done, and won't otherwise discuss it.

One of the goals of the maintenance commands that follow is that they can be run (or not run) at any moment. We don't need to lock down the whole system to do this maintenance.

Removing failed transactions

Transactions that must be rolled back can simply be removed from intents. They should have no impact on the result of the to_rollback view.

1
-- Caveat by our bounds on certain ids.
2
DELETE FROM intents
3
WHERE intents.id IN (SELECT * FROM to_rollback);
sql

Removing read sets of committed transactions

A transaction is conditional on its read set lining up with the values present. Once we know a transaction will commit, we can remove its read set and commit it unconditionally.

1
-- Caveat by our bounds on certain ids.
2
DELETE FROM intents
3
WHERE intents.id NOT IN (SELECT * FROM to_rollback)
4
  AND intents.is_read;
sql

Remove overwritten writes

The two rules above remove the read sets for failed and committed transactions. It's now possible, likely even, that some writes are no longer observed: no read occurs before a subsequent write occurs. We can remove those dead writes.

1
-- Caveat by our bounds on certain ids.
2
DELETE FROM intents
3
WHERE NOT intents.is_read
4
  AND EXISTS (SELECT FROM intents i2
5
              WHERE i2.key = intents.key
6
                AND i2.id > intents.id
7
                AND NOT i2.is_read
8
                AND NOT i2.id IN (SELECT id FROM to_rollback))
9
  AND NOT EXISTS (SELECT FROM intents i2
10
                  WHERE i2.key = intents.key
11
                    AND i2.id > intents.id
12
                    AND i2.is_read);
sql

This is a simplification: remove any write followed by another write, if there are no reads of that key at all. As we are also removing reads above, this should be able to remove all overwritten writes.

Implementation and Evaluation

We have a table intents, a view to_rollback, and a few maintenance DDL commands. Let's take them out for an introductory spin, and then see if we can make them perform.

I have some example transactions from Claude, which we'll insert to start.

1
INSERT INTO intents (id, is_read, key, val) VALUES
2
    -- Txn 1: blind initialization of alice. No read.
3
    (1, false, 'alice', '100'),
4

5
    -- Txn 2: initialize bob, asserting bob was previously absent.
6
    (2, true,  'bob',   NULL),
7
    (2, false, 'bob',   '50'),
8

9
    -- Txn 3: transfer 30 alice->bob, observing alice=100, bob=50. Commits.
10
    (3, true,  'alice', '100'),
11
    (3, true,  'bob',   '50'),
12
    (3, false, 'alice', '70'),
13
    (3, false, 'bob',   '80'),
14

15
    -- Txn 4: concurrent transfer using stale view (alice=100). Must rollback.
16
    (4, true,  'alice', '100'),
17
    (4, true,  'bob',   '50'),
18
    (4, false, 'alice', '60'),
19
    (4, false, 'bob',   '90'),
20

21
    -- Txn 5: transfer based on post-3 state. Commits.
22
    (5, true,  'alice', '70'),
23
    (5, true,  'bob',   '80'),
24
    (5, false, 'alice', '50'),
25
    (5, false, 'bob',   '100'),
26

27
    -- Txn 6: initialize x=A. Commits.
28
    (6, true,  'x', NULL),
29
    (6, false, 'x', 'A'),
30

31
    -- Txn 7: x A->B. Commits in iter 1.
32
    (7, true,  'x', 'A'),
33
    (7, false, 'x', 'B'),
34

35
    -- Txn 8: stale read of x=A. Must rollback (writes B already from 7).
36
    (8, true,  'x', 'A'),
37
    (8, false, 'x', 'C'),
38

39
    -- Txn 9: reads x=C. Only true if 8 commits — it doesn't, so 9 rolls back.
40
    -- Iter 1: 8's tentative write makes 9 look fine. Iter 2: 8 in rollback,
41
    -- 9's read no longer matches, 9 rolls back. Iter 3: stable.
42
    (9, true,  'x', 'C'),
43
    (9, false, 'x', 'D');
sql

If we select from to_rollback we should see the following:

1
materialize=> SELECT * FROM to_rollback ORDER BY id;
2
 id
3
----
4
  4
5
  8
6
  9
7
(3 rows)
sql

It would be great to see the current values for each key, so let's create a view that does this.

1
-- The most recent (by id) write that is not rolled back.
2
CREATE VIEW store AS
3
SELECT DISTINCT ON (key) key, val
4
FROM intents
5
WHERE NOT is_read
6
  AND NOT EXISTS (SELECT FROM to_rollback WHERE intents.id = to_rollback.id)
7
ORDER BY key, id DESC;
sql

If we read from store we should see

1
materialize=> SELECT * FROM store;
2
  key  | val
3
-------+-----
4
 x     | B
5
 bob   | 100
6
 alice | 50
7
(3 rows)
8

9
materialize=>
sql

Maintenance

Let's run each of the three async maintenance tasks. We'll select from intents before and after each, to see what has changed. To start, we have

1
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
2
 id | is_read |  key  | val
3
----+---------+-------+-----
4
  1 | f       | alice | 100
5
  2 | t       | bob   |
6
  2 | f       | bob   | 50
7
  3 | t       | alice | 100
8
  3 | t       | bob   | 50
9
  3 | f       | alice | 70
10
  3 | f       | bob   | 80
11
  4 | t       | alice | 100
12
  4 | t       | bob   | 50
13
  4 | f       | alice | 60
14
  4 | f       | bob   | 90
15
  5 | t       | alice | 70
16
  5 | t       | bob   | 80
17
  5 | f       | alice | 50
18
  5 | f       | bob   | 100
19
  6 | t       | x     |
20
  6 | f       | x     | A
21
  7 | t       | x     | A
22
  7 | f       | x     | B
23
  8 | t       | x     | A
24
  8 | f       | x     | C
25
  9 | t       | x     | C
26
  9 | f       | x     | D
27
(23 rows)
28

29
materialize=>
sql

If we remove failed transactions, we prune the eight entries corresponding to ids 4, 8, and 9.

1
materialize=> DELETE FROM intents WHERE id IN (SELECT * FROM to_rollback);
2
DELETE 8
3
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
4
 id | is_read |  key  | val
5
----+---------+-------+-----
6
  1 | f       | alice | 100
7
  2 | t       | bob   |
8
  2 | f       | bob   | 50
9
  3 | t       | alice | 100
10
  3 | t       | bob   | 50
11
  3 | f       | alice | 70
12
  3 | f       | bob   | 80
13
  5 | t       | alice | 70
14
  5 | t       | bob   | 80
15
  5 | f       | alice | 50
16
  5 | f       | bob   | 100
17
  6 | t       | x     |
18
  6 | f       | x     | A
19
  7 | t       | x     | A
20
  7 | f       | x     | B
21
(15 rows)
22

23
materialize=>
sql

Next we'll remove the read sets of committed transactions.

1
materialize=> DELETE FROM intents
2
WHERE intents.id NOT IN (SELECT * FROM to_rollback)
3
  AND intents.is_read;
4
DELETE 7
5
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
6
 id | is_read |  key  | val
7
----+---------+-------+-----
8
  1 | f       | alice | 100
9
  2 | f       | bob   | 50
10
  3 | f       | alice | 70
11
  3 | f       | bob   | 80
12
  5 | f       | alice | 50
13
  5 | f       | bob   | 100
14
  6 | f       | x     | A
15
  7 | f       | x     | B
16
(8 rows)
17

18
materialize=>
sql

Finally, we'll remove writes that are not read.

1
materialize=> DELETE FROM intents
2
WHERE NOT intents.is_read
3
  AND EXISTS (SELECT FROM intents i2
4
              WHERE i2.key = intents.key
5
                AND i2.id > intents.id
6
                AND NOT i2.is_read
7
                AND NOT i2.id IN (SELECT id FROM to_rollback))
8
  AND NOT EXISTS (SELECT FROM intents i2
9
                  WHERE i2.key = intents.key
10
                    AND i2.id > intents.id
11
                    AND i2.is_read);
12
DELETE 5
13
materialize=> SELECT * FROM intents ORDER BY id, is_read DESC, key;
14
 id | is_read |  key  | val
15
----+---------+-------+-----
16
  5 | f       | alice | 50
17
  5 | f       | bob   | 100
18
  7 | f       | x     | B
19
(3 rows)
20

21
materialize=>
sql

We can see that intents now contains the same rows as in store.

Scaling up

These views and DDL are fine, but they are evaluated from scratch when invoked. There is nothing wrong with this, and the above should work on most bog-standard SQL systems. It may take some time to determine the current contents of store for example, especially as intents grows in size.

Let's add quite a lot of data to make that point.

We'll load up 10,000 transactions, each with two reads and two writes, randomly picked from 10,000 locations. This should give us a solid rate of conflict, by the pigeonhole principle. With Claude's help, this was done via:

1
INSERT INTO intents (id, is_read, key, val)
2
  WITH ops AS (
3
    SELECT
4
      g AS id,
5
      'k' || (seahash((g::text || ':rk1')::bytea) % 10000) AS rk1,
6
      'k' || (seahash((g::text || ':rk2')::bytea) % 10000) AS rk2,
7
      'k' || (seahash((g::text || ':wk1')::bytea) % 10000) AS wk1,
8
      'k' || (seahash((g::text || ':wk2')::bytea) % 10000) AS wk2
9
    FROM generate_series(1, 10000) AS g
10
  )
11
  SELECT id, true,  rk1, (SELECT val FROM store WHERE key = ops.rk1) FROM ops
12
  UNION ALL
13
  SELECT id, true,  rk2, (SELECT val FROM store WHERE key = ops.rk2) FROM ops
14
  UNION ALL
15
  SELECT id, false, wk1, id::text || ':' || wk1 FROM ops
16
  UNION ALL
17
  SELECT id, false, wk2, id::text || ':' || wk2 FROM ops;
sql

We can now check out the contents of intents, and also to_rollback and store.

1
materialize=> SELECT count(*) FROM intents;
2
 count
3
-------
4
 40000
5
(1 row)
6

7
Time: 60.834 ms
8
materialize=> SELECT count(*) FROM to_rollback;
9
 count
10
-------
11
  5988
12
(1 row)
13

14
Time: 27634.122 ms (00:27.634)
15
materialize=> SELECT count(*) FROM store;
16
 count
17
-------
18
  5591
19
(1 row)
20

21
Time: 42650.55 ms (00:42.650)
sql

Sure takes a lot of time.

Selecting directly out of store with key or value filters doesn't go any faster. This will not be good enough.

Incremental View Maintenance

Materialize allows you to create indexes on arbitrary views, at which point it will compute and then continually maintain the results as the data change. We can build an index on store to provide continual interactive access to the currently present keys and their values.

1
materialize=> CREATE DEFAULT INDEX ON store;
2
CREATE INDEX
3
Time: 164.292 ms
4
materialize=> SELECT COUNT(*) FROM store;
5
 count
6
-------
7
  5591
8
(1 row)
9

10
Time: 39778.886 ms (00:39.779)
11
materialize=> SELECT COUNT(*) FROM store;
12
 count
13
-------
14
  5591
15
(1 row)
16

17
Time: 36.306 ms
18
materialize=> SELECT COUNT(*) FROM store;
19
 count
20
-------
21
  5591
22
(1 row)
23

24
Time: 32.297 ms
25
materialize=>
sql

The index is "created" immediately, but only comes on line after roughly the same time it takes to compute the result from scratch. Once computed, it stays up and running, and provides interactive access.

1
materialize=> SELECT val FROM store WHERE key = 'k1234';
2
    val
3
-----------
4
 943:k1234
5
(1 row)
6

7
Time: 41.390 ms
8
materialize=> SELECT val FROM store WHERE key = 'k5678';
9
    val
10
------------
11
 3841:k5678
12
(1 row)
13

14
Time: 20.515 ms
15
materialize=>
sql

This is now much closer to "interactive" access than previously. The times drop to ~15ms with serializable isolation (Materialize defaults to strict serializability), which is about the time from NYC to us-east-1 and back again.

Optimization

We'll need both to_rollback and store, and the latter depends on the former. We'll actually want to clean up the index on store, and instead build an index on to_rollback first, so that our index on store can simply work off of its contents. Although there are two dataflows independently maintaining these indexes, Materialize's serializability means their contents will always appear in sync.

Having done that:

1
materialize=> SELECT COUNT(*) FROM intents;
2
 count
3
-------
4
 40000
5
(1 row)
6

7
Time: 98.058 ms
8
materialize=> SELECT COUNT(*) FROM to_rollback;
9
 count
10
-------
11
  5988
12
(1 row)
13

14
Time: 30.412 ms
15
materialize=> SELECT COUNT(*) FROM store;
16
 count
17
-------
18
  5591
19
(1 row)
20

21
Time: 29.841 ms
22
materialize=>
sql

But we aren't done yet. We have these maintenance tasks as well, and they could use some help. Let's exercise each of them, counting their sets, rather than performing the DELETE yet.

1
materialize=>
2
    SELECT count(*)
3
    FROM intents
4
    WHERE id IN (SELECT * FROM to_rollback);
5
 count
6
-------
7
 23952
8
(1 row)
9

10
Time: 73.231 ms
11
materialize=>
sql

That first one isn't so bad.

1
materialize=>
2
    SELECT count(*)
3
    FROM intents
4
    WHERE id NOT IN (SELECT * FROM to_rollback) AND is_read;
5
 count
6
-------
7
  8024
8
(1 row)
9

10
Time: 14183.270 ms (00:14.183)
11
materialize=>
sql

The second one is much less interactive.

1
materialize=>
2
   SELECT count(*) FROM intents
3
   WHERE NOT is_read
4
     AND EXISTS (SELECT FROM intents i2
5
                 WHERE i2.key = intents.key AND i2.id > intents.id
6
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
7
     AND NOT EXISTS (SELECT FROM intents i2
8
                     WHERE i2.key = intents.key AND i2.id > intents.id
9
                       AND i2.is_read);
10
 count
11
-------
12
  1323
13
(1 row)
14

15
Time: 34746.069 ms (00:34.746)
16
materialize=>
sql

That third one is pretty brutal.

Let's improve each of these.

Further Optimization; Task 1

The first query is already pretty fast, but if we look at the logic we run, we can see that it could be faster.

1
materialize=> explain SELECT count(*)
2
    FROM intents
3
    WHERE id IN (SELECT * FROM to_rollback);
4
Physical Plan
5
Explained Query:
6
  →With
7
    cte l0 =
8
      →Accumulable GroupAggregate
9
        Simple aggregates: count(*)
10
        →Differential Join %1 » %0
11
          Join stage 0 in %0 with lookup key #0
12
          →Arrange (#0)
13
Read materialize.transactions.intents
14
          →Arranged materialize.transactions.to_rollback
15
  →Return
16
    →Union
17
      →Unarranged Raw Stream
18
        →Arranged l0
19
      →Map/Filter/Project
20
        Project: #0
21
        Map: 0
22
          →Consolidating Union
23
            →Negate Diffs
24
              →Fused with Child Map/Filter/Project
25
                Project: ()
26
                  →Arranged l0
27
                    Key: ()
28
            →Constant (1 row)
29

30
Source materialize.transactions.intents
31
  project=(#0)
32
  filter=((#0{id}) IS NOT NULL)
33
  pushdown=((#0{id}) IS NOT NULL)
34

35
Used Indexes:
36
  - materialize.transactions.to_rollback_primary_idx (differential join)
37

38
Target cluster: default
39

40
(1 row)
41
Time: 36.076 ms
42
materialize=>
sql

The tell here is

1
→Arrange (#0)
2
Read materialize.transactions.intents
sql

which says that we are reading intents and building an index over it. We could instead pre-form an index on intents, by id.

1
materialize=> CREATE INDEX intents_idx_id ON intents (id);
2
CREATE INDEX
3
Time: 99.994 ms
4
materialize=>
sql

Re-running the EXPLAIN command reveals that it uses indexes for both inputs. The count now comes back in half the time, which wasn't that long to begin with.

Further Optimization; Task 2

The second maintenance task is slow because of a Materialize planning defect. The antijoin we've written, reads for transactions whose id is not in to_rollback, should be implemented with an inner join between intents and to_rollback. Because .. SQL .. the query also keeps null ids, and Materialize trips over the hidden OR in the inner join's predicate.

We can fix this with a different antijoin idiom: NOT EXISTS.

1
materialize=>
2
    SELECT count(*)
3
    FROM intents i
4
    WHERE NOT EXISTS (
5
      SELECT FROM to_rollback tr
6
      WHERE i.id = tr.id
7
    ) AND is_read;
8
 count
9
-------
10
  8024
11
(1 row)
12

13
Time: 58.552 ms
14
materialize=>
sql

An EXPLAIN on this query confirms that it uses the pre-existing indexes on intents and to_rollback.

Further Optimization; Task 3

The third query is pretty complicated. I won't print the EXPLAIN output here, as it is two screenfuls long. Instead we'll do a clever trick that Materialize makes easy.

1
materialize=> CREATE VIEW dead_writes AS
2
   SELECT * FROM intents
3
   WHERE NOT is_read
4
     AND EXISTS (SELECT FROM intents i2
5
                 WHERE i2.key = intents.key AND i2.id > intents.id
6
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
7
     AND NOT EXISTS (SELECT FROM intents i2
8
                     WHERE i2.key = intents.key AND i2.id > intents.id
9
                       AND i2.is_read);
10
CREATE VIEW
11
Time: 143.153 ms
12
materialize=> CREATE DEFAULT INDEX ON dead_writes;
13
CREATE INDEX
14
Time: 106.631 ms
15
materialize=>
sql

We just bind the logic to a named view, and create an index on it. Now the results are immediately available.

1
materialize=> SELECT COUNT(*) FROM dead_writes;
2
 count
3
-------
4
  1323
5
(1 row)
6
Time: 27.275 ms
7
materialize=>
sql

In fact, we can do the same with the two other maintenance tasks, which is just good hygiene. Each of the maintenance tasks have a maintained index, that always contains the rows of intents that we can discard.

One meaningful change is that we'll need to alter our DELETE statements. We'll need to refer to dead_writes, and the other named views, like so:

1
DELETE FROM intents
2
WHERE intents IN (SELECT dead_writes FROM dead_writes);
sql

This plans the same way as

1
SELECT * FROM intents
2
WHERE intents IN (SELECT dead_writes FROM dead_writes);
sql

and EXPLAIN shows us that we do not have the right indexes yet.

1
materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
2
Physical Plan
3
Explained Query:
4
  →Differential Join %0 » %1
5
    Join stage 0 in %1 with lookup key #0..=#3
6
    →Arrange (#0..=#3)
7
      →Fused with Child Map/Filter/Project
8
        Filter: (#0) IS NOT NULL AND (#1) IS NOT NULL AND (#2) IS NOT NULL AND (#3) IS NOT NULL
9
          →Arranged materialize.transactions.intents
10
            Key: (#0{id})
11
    →Distinct GroupAggregate
12
      →Fused with Child Map/Filter/Project
13
        Filter: (#3{val}) IS NOT NULL
14
          →Arranged materialize.transactions.dead_writes
15
            Key: (#0{id}..=#3{val})
16

17
Used Indexes:
18
  - materialize.transactions.intents_idx_id (*** full scan ***)
19
  - materialize.transactions.dead_writes_primary_idx (*** full scan ***)
20

21
Target cluster: default
22

23
(1 row)
24
Time: 35.272 ms
25
materialize=>
sql

Although we are using indexes on intents and dead_writes, we are scanning their contents. If intents is large and dead_writes is empty, we'll do a lot of work to determine this. We have indexes, but they are not the right indexes.

To get the right indexes, we'll want to make two changes:

  1. Add an index on intents by all columns, and
  2. Modify dead_writes to contain distinct records.

With these two changes, we'll use an index for intents, and be able to remove the Distinct around dead_writes.

The first step reveals progress:

1
materialize=> create default index on intents;
2
CREATE INDEX
3
Time: 81.806 ms
4
materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
5
                              Physical Plan
6
--------------------------------------------------------------------------
7
 Explained Query:                                                        +
8
   →Differential Join %1 » %0                                            +
9
     Join stage 0 in %0 with lookup key #0{id}..=#3{val}                 +
10
     →Arranged materialize.transactions.intents                          +
11
     →Distinct GroupAggregate                                            +
12
       →Fused with Child Map/Filter/Project                              +
13
         Filter: (#3{val}) IS NOT NULL                                   +
14
           →Arranged materialize.transactions.dead_writes                +
15
             Key: (#0{id}..=#3{val})                                     +
16
                                                                         +
17
 Used Indexes:                                                           +
18
   - materialize.transactions.dead_writes_primary_idx (*** full scan ***)+
19
   - materialize.transactions.intents_primary_idx (differential join)    +
20
                                                                         +
21
 Target cluster: default                                                 +
22

23
(1 row)
24

25
Time: 32.926 ms
26
materialize=>
sql

This already reduces the cost to being proportional to the size of dead_writes, independent of intents. The other improvement comes from

1
CREATE VIEW dead_writes AS
2
   SELECT DISTINCT * FROM intents
3
   WHERE NOT is_read
4
     AND EXISTS (SELECT FROM intents i2
5
                 WHERE i2.key = intents.key AND i2.id > intents.id
6
                   AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
7
     AND NOT EXISTS (SELECT FROM intents i2
8
                     WHERE i2.key = intents.key AND i2.id > intents.id
9
                       AND i2.is_read);
10

11
-- Explicitly name all columns, as DEFAULT index skips constant is_read.
12
CREATE INDEX dead_writes_idx_all ON dead_writes (id, is_read, key, val);
sql

Notice that we need to explicitly name the index columns. Materialize's CREATE DEFAULT INDEX uses the narrowest primary keys it can find, and is_read is the constant FALSE for dead_writes.

The final plan ends up being:

1
materialize=> EXPLAIN SELECT * FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
2
                            Physical Plan
3
----------------------------------------------------------------------
4
 Explained Query:                                                    +
5
   →Differential Join %1 » %0                                        +
6
     Join stage 0 in %0 with lookup key #0{id}..=#3{val}             +
7
       filter=((#3) IS NOT NULL)                                     +
8
     →Arranged materialize.transactions.intents                      +
9
     →Arranged materialize.transactions.dead_writes                  +
10
                                                                     +
11
 Used Indexes:                                                       +
12
   - materialize.transactions.intents_primary_idx (differential join)+
13
   - materialize.transactions.dead_writes_idx_all (differential join)+
14
                                                                     +
15
 Target cluster: default                                             +
16

17
(1 row)
18

19
Time: 35.371 ms
20
materialize=>
batchfile

Both inputs are now used in indexed form, and the query should immediately spill out the rows to delete from intents.

1
materialize=> SELECT COUNT(*) FROM intents WHERE intents IN (SELECT dead_writes FROM dead_writes);
2
 count
3
-------
4
  1323
5
(1 row)
6

7
Time: 38.828 ms
8
materialize=>
sql

Conclusions

Appendix: Setup SQL

The full schema, views, and indexes from the post, in a single straight-line hunk you can paste into a Materialize session. Order matters: each view's dependencies (and their indexes) come first.

1
-- The one and only base table.
2
CREATE TABLE intents (id INT, is_read BOOL, key TEXT, val TEXT);
3

4
-- Transactions that must roll back.
5
CREATE VIEW to_rollback AS
6
WITH MUTUALLY RECURSIVE
7
    writes(id INT, key TEXT, val TEXT) AS (
8
        SELECT intents.id, key, val
9
        FROM intents
10
        WHERE NOT EXISTS (SELECT FROM rollback WHERE intents.id = rollback.id)
11
          AND NOT intents.is_read
12
    ),
13
    reads(id INT, val TEXT, red TEXT) AS (
14
        SELECT id, val, (
15
            SELECT DISTINCT ON (key) val
16
            FROM writes
17
            WHERE writes.id < intents.id
18
              AND writes.key = intents.key
19
            ORDER BY key, writes.id DESC, val
20
        )
21
        FROM intents
22
        WHERE intents.is_read
23
    ),
24
    rollback(id INT) AS (
25
        SELECT DISTINCT reads.id
26
        FROM reads
27
        WHERE reads.val IS DISTINCT FROM reads.red
28
    )
29
SELECT * FROM rollback;
30

31
CREATE DEFAULT INDEX ON to_rollback;
32

33
-- The most recent (by id) write that is not rolled back.
34
CREATE VIEW store AS
35
SELECT DISTINCT ON (key) key, val
36
FROM intents
37
WHERE NOT is_read
38
  AND NOT EXISTS (SELECT FROM to_rollback WHERE intents.id = to_rollback.id)
39
ORDER BY key, id DESC;
40

41
CREATE DEFAULT INDEX ON store;
42

43
-- Indexes on `intents`: one by id, and one across all columns.
44
CREATE INDEX intents_idx_id ON intents (id);
45
CREATE DEFAULT INDEX ON intents;
46

47
-- Maintenance task 1: rows belonging to rolled-back transactions.
48
CREATE VIEW failed_rows AS
49
SELECT DISTINCT intents.*
50
FROM intents
51
WHERE EXISTS (SELECT FROM to_rollback WHERE intents.id = to_rollback.id);
52

53
CREATE INDEX failed_rows_idx_all ON failed_rows (id, is_read, key, val);
54

55
-- Maintenance task 2: read rows from committed transactions.
56
CREATE VIEW committed_reads AS
57
SELECT DISTINCT intents.*
58
FROM intents
59
WHERE is_read
60
  AND NOT EXISTS (SELECT FROM to_rollback WHERE intents.id = to_rollback.id);
61

62
CREATE INDEX committed_reads_idx_all ON committed_reads (id, is_read, key, val);
63

64
-- Maintenance task 3: writes that are overwritten without intervening reads.
65
CREATE VIEW dead_writes AS
66
SELECT DISTINCT * FROM intents
67
WHERE NOT is_read
68
  AND EXISTS (SELECT FROM intents i2
69
              WHERE i2.key = intents.key AND i2.id > intents.id
70
                AND NOT i2.is_read AND NOT i2.id IN (SELECT id FROM to_rollback))
71
  AND NOT EXISTS (SELECT FROM intents i2
72
                  WHERE i2.key = intents.key AND i2.id > intents.id
73
                    AND i2.is_read);
74

75
CREATE INDEX dead_writes_idx_all ON dead_writes (id, is_read, key, val);
sql

The three DELETE statements that drive the maintenance tasks:

1
DELETE FROM intents
2
WHERE intents IN (SELECT failed_rows FROM failed_rows);
3

4
DELETE FROM intents
5
WHERE intents IN (SELECT committed_reads FROM committed_reads);
6

7
DELETE FROM intents
8
WHERE intents IN (SELECT dead_writes FROM dead_writes);
sql

Appendix: A session of improvements (2026-06-12)

This appendix is written by Claude (Anthropic's Claude Fable 5), reporting on a working session with Frank in which I implemented this post against Materialize v26.27.0, found some problems, fixed them, and measured the results. All numbers are from a 16-worker docker container on an M-series laptop; transactions are two reads and two writes over text keys.

Two corrections to the SQL above

The dead_writes view nests a NOT IN. The third line of its EXISTS subquery reads AND NOT i2.id IN (SELECT id FROM to_rollback), and over the nullable intents schema this plans as a cross join: NOT IN's NULL semantics hide an OR id IS NULL in the antijoin predicate, and the planner correctly refuses to key the disjunction. The cross join's appetite slows every other index on the cluster: absorbing a 10,000-transaction batch degrades from ~150ms to multiple seconds while it is installed. The fix is one expression: AND NOT EXISTS (SELECT FROM to_rollback WHERE to_rollback.id = i2.id). Two other escapes also work, and taught me something about the planner: declare the intents columns NOT NULL, or add explicit IS NOT NULL filters to both sides, and the original NOT IN plans as a keyed antijoin as written. The planner's nullability inference propagates through filters and even through the WITH MUTUALLY RECURSIVE binding; the cross join appears only when non-nullness genuinely cannot be proven, in which case it is required for correctness. Making Materialize plan the unprovable case well is possible — lower NOT IN to a keyed antijoin plus two maintained singleton guards ("does the subquery contain a NULL", "is it empty") — and would make a nice planner improvement.

The whole-row delete idiom silently skips NULLs. DELETE FROM intents WHERE intents IN (SELECT v FROM v) never matches a row with a NULL column, because ROW(..) = ROW(..) with a NULL component is NULL, not TRUE. Reads that assert a key's absence carry NULL val — transactions 2 and 6 in the worked example — so the committed-reads delete strands them forever. At scale it is worse: a batch whose reads observe an empty store leaks every read row, which then blocks the "no later reads" condition of dead_writes, leaving ~25% of dead writes unremovable. The repair is to join on row identity, which is never NULL, and keep val out of the join key entirely:

1
DELETE FROM intents WHERE id IN (SELECT id FROM to_rollback);
2
DELETE FROM intents WHERE is_read AND id IN (SELECT id FROM committed_read_ids);
3
DELETE FROM intents WHERE NOT is_read AND (id, key) IN (SELECT id, key FROM dead_write_keys);
sql

where committed_read_ids and dead_write_keys are maintained, indexed views of the distinct delete keys (SELECT DISTINCT id FROM committed_reads, SELECT DISTINCT id, key FROM dead_writes). With matching indexes on intents — by (id) and by (id, key), replacing the all-columns default index — each delete's read side plans as a join of exactly two existing arrangements: no transient Distinct, no Arrange, no full scan of intents. This matters more than it looks: the delete's read runs while holding the table's write lock, so its plan quality is lock-hold time.

Performance work

Trim the TopK towers. The hierarchical TopK that SQL plans for DISTINCT ON is sized for unbounded groups, but maintenance keeps per-key groups at a handful of rows. OPTIONS (DISTINCT ON INPUT GROUP SIZE = 16) — placed after WHERE, before ORDER BY, in both the reads subquery and store — collapses the tower to one refinement layer. The boundary is inclusive at 16: hints of 15 and 16 plan identically, and 17 adds a layer. store drops from 22 arrangements to 8, the recursion from 38 to 24, and there is a pleasing alignment: if epochs are cut when any key's occupancy approaches 16, the hint is an invariant the epoch controller enforces rather than a hope about the workload.

Rewrite dead_writes with aggregates. Its two correlated EXISTS clauses are intents-by-key self-joins. Per-key MAX aggregates — the largest committed-write id and the largest read id per key, each one hinted reduce layer — replace them: a write is dead iff id < max_cw(key) and coalesce(max_r(key) <= id, true). Same results, a third less CPU, and 14 arrangements instead of 30.

Retire in one statement. Per-statement timing showed each maintenance delete spending 100–200ms, almost entirely waiting for views to absorb the previous statement's writes; three deletes meant three lock-serialized absorb rounds per epoch. A maintained union view of the retirable rows collapses the committed-reads and dead-writes deletes into one statement — one lock, one frontier wait — and is conservative-correct at a single timestamp. (Folding failed transactions in too broke my failure accounting under concurrency: rows that fail between the accounting read and the delete vanish unlogged. Deletes must target exactly what was logged.)

Sustained results. Closed-loop goodput (everything charged: submission, resolution, maintenance) went from 17.5k to 27.7k transactions/second at low contention, and 4.6k to 5.8k at moderate contention, where the commit fraction — not the machinery — is the bound. For calibration I ran Postgres 16 on the same laptop with independent transactions: 12–14k tps at READ COMMITTED, 6–8k at SERIALIZABLE with retries. Postgres wins under contention because it retries at row granularity in sub-millisecond loops, where this design retries at epoch granularity; the atomic counter gets 753 commits/second there and one winner per epoch here. Contention is not what optimistic concurrency control is for, and now there are numbers.

What I learned about where the time goes

The cluster is not CPU-bound; it is coordination-bound, in two ways. Between statements it is event-starved, and within a single epoch's absorb the duration is set by the recursion's iteration critical path rather than by work: the same 400k-row burst takes 940ms on 1 worker and 429ms on 16 — sixteen times the workers for 2.2x the latency, with per-worker efficiency about 7x better at one worker, and no skew. Batch size does not fill the gap: 400k rows and 1.2M rows absorb at the same ~930k rows/second, so a single timestamp has a fixed per-row cost on a given cluster. Capacity comes from pipelining timestamps, which differential dataflow does naturally — epoch n+1's first iteration runs while epoch n's fifth is in flight — but only if epochs arrive as a stream rather than as one synchronous DML statement at a time. Deferring maintenance to "use the idle" is strictly worse in every regime: retirement work is conserved, deferral just makes it lumpy and pushes it into an un-overlapped tail.

The conflict-density sweep is worth a look if you enjoy curves: goodput follows the pigeonhole bound (committed per epoch ≈ number of hot keys, measured 1 / 12 / 86 / 414 winners at K = 1 / 10 / 100 / 1000), while resolution cost at high density is a per-key quadratic — every read of a key pairs with every prior write of it before TopK keeps one, on a single worker. Epoch size is the lever, quadratically: the same 10,000 transactions against one key cost 206 seconds in one epoch and 12 seconds in ten. The worst spot is not the atomic counter everyone fears but the almost-atomic counter, around ten keys, where there is enough key diversity for rollback cascades to oscillate but not enough to spread the load.

The remaining wall is the DML path through the coordinator, which moves ~1M rows/second on this laptop and does not scale with the cluster. The design wants its submission as a source and its retirement as a watermark — a one-row table that every view filters against, advanced by a single-row update, with physical garbage collection demoted to an infrequent bulk chore. At ~1–2µs of CPU per row, the laptop's own capacity for this workload is several million transactions per second; everything between the measured numbers and that one is control plane.

My thanks to Frank for the direction throughout, and in particular for twice declining to believe a number until it was clear what question it answered. The 25k/s goodput I first reported was a stage rate that applies only to a client protocol nobody had built; asked point-blank whether the answer was ~5k or ~20k, the honest closed-loop measurement said 4.6k. The "8% utilization" that suggested idle headroom undercounted transient dataflows, and the headroom it implied was not where I said it was. The numbers that appear above are the ones that survived his skepticism.

Frank

Frank McSherry

Chief Scientist, Materialize

Frank was previously at Microsoft Research Silicon Valley where he co-invented Differential Privacy, and subsequently led the Naiad project. Frank holds a Ph.D in Computer Science from the University of Washington.