important
Materialize is a cloud-native database built on streaming internals. Our core feature: incrementally updated materialized views, is based on PostgreSQL materialized views––and aims to supplant them entirely, even for PostgreSQL users. In updates to Materialize shipped May 2023, we think we’ve just about done it with some big ergonomic wins. If you’d like to try Materialize for free, register here, or you can learn more in the docs.
How Postgres sources work in Materialize
PostgreSQL offers a replication stream of changes to your tables, and Materialize can act as a read replica of that stream. Once we get the data into Materialize, though, you can build complex, incrementally maintained materialized views over that data; this gives you the expressive power of PostgreSQL but with the computational model of Timely and Differential Dataflow.
This means that rather than continually recomputing the state of your views in miniature batch jobs and waiting for recomputation, Materialize proactively and incrementally computes the state of your views as new data comes into the system.
First Implementation
Our initial implementation of PG sources proved that our the above concept was the right one: you could set it up and build the same materialized views in Materialize that you would have in PostgreSQL, and they would instead be incrementally maintained. One MZ user, Alvin Khaled, put it well:
It’s like a magically better third type of view: Postgres has regular views and manually refreshed materialized views. Now I have a materialized view that’s always up-to-date.
The magic was real, but we had a few rough corners that didn’t align with the reality of running on top of a production database:
- Materialize didn’t support any changes to the upstream database’s schema, and would error if any occurred.
- If any table in the publication caused an error, we errored out the entire source. For example, if
a_table
had an incompatible schema change, you could no longer select fromb_table
,c_table
, etc. - Users could not add or remove tables from the Postgres source; you were limited to only using those tables you ingested at the outset.
In recent releases of Materialize, we’ve fixed the first two issues and have a design and some preliminary work done on the third.
Why use Postgres sources?
First, though, I want to touch briefly on why you’d want to use Postgres sources in Materialize, even if you’re not currently a heavy PostgreSQL user.
The biggest advantage Postgres sources have over Kafka sources is the power of REPLICA IDENTITY FULL
. Experienced DBAs will be leery, as are the PostgreSQL docs:
REPLICA IDENTITY FULL
is very inefficient and should only be used as a fallback if no other solution is possible. [31.1. Publication]
This requires a brief technical explanation. Logical replication sends along a stringified version of your relation’s data, which it also stores in the database’s write-ahead log or WAL. When using replication slots (as Materialize does), it compacts the WAL periodically, ensuring to never compact any data that a replication slot might still need.
When using REPLICA IDENTITY FULL
, PostgreSQL stores each row that it would send out; this contrasts
with other REPLICA IDENTITY
options which only store only the collection’s key and updated values
for UPDATE
and DELETE
statements. This means that REPLICA IDENTITY FULL
maintains more state
in its WAL than other REPLICA IDENTITY
options, meaning that the WAL can require much more disk
space.
However, in the context of Materialize, if you want to identify a relation as having a key in the upstream source, and only propagate changes to that key, Materialize needs to persist both the key and its most recent value––and it needs to be somewhere we can access very quickly while ingesting data. This can be costly to increase the size and power of the machine to be able to store an entire replica of your upstream data.
Using REPLICA IDENTITY FULL
, though, means that we can rely on PostgreSQL to handle that very
complex task––something it’s been tuned over the last 25+ years to do. This means that users get the
benefit of powerful semantics (i.e relations with keys) on smaller machines than Kafka sources
require.
Ok––on to the more technical bits.
Supporting schema changes
From the PG docs on logical replication:
The database schema and DDL commands are not replicated. The initial schema can be copied by hand using
pg_dump --schema-only
. Subsequent schema changes would need to be kept in sync manually. (Note, however, that there is no need for the schemas to be absolutely the same on both sides.) [31.4. Restrictions]
Naturally, we aspire to something more graceful than “manually change your tables’ schemas in Materialize.” We wanted to build the right thing, though, so our initial buildout of PG sources was very conservative and said, “If we detect any schema changes, error because we haven’t yet decided what to do.”
One challenge we face is that it is not currently trivial to change the number of columns to
“upstream” relations in Materialize––we are at a funny spot where we treat *
(as in SELECT *
)
literally, and if we were to add columns to a relation, we would change the resultant relation’s
arity and a cascade of failures could ensure. (We’ve made some forays into expanding *
expressions
into the columns they represent, but haven’t merged the changes yet.)
So with the knowledge that we cannot simply add columns, we determined we could do maybe the next best thing: just ignore them. (A corollary to this, though, is that we cannot necessarily just drop columns because of similar arity concerns––however, we could support it by just producing NULL values, though we haven’t yet had any users express interest in that.)
We were pleasantly surprised by how naturally and easily support for ignoring columns slotted in. Because PG doesn’t support reordering columns in relations, it means only that we need to truncate the rows we ingest at to the width of the relation we have record of (assuming the columns we have are a strict prefix of the relation’s current columns).
Not quite that simple, though…
However, this line from the description above…
Note, however, that there is no need for the schemas to be absolutely the same on both sides.
…intimates a problem with schema change detection, as well.
In the logical replication stream, you will receive Relation
messages, which do describe changes
to the underlying relations. What isn’t laid out so clearly something outlined elsewhere in the
logical replication
docs:
Columns of a table are also matched by name. The order of columns in the subscriber table does not need to match that of the publisher.
What this points to is that the Relation
messages’ schemas only include the relation’s names,
meaning it is possible to make a schema change that only speciously looks like it’s equivalent, e.g.
CREATE TABLE t (a INT, b INT);
INSERT INTO t VALUES (1, 2), (2, 3);
ALTER TABLE t DROP COLUMN b;
ALTER TABLE t ADD COLUMN b INT;
INSERT INTO t VALUES (3, 4);
PostgreSQL itself will understand that the tuples in t
are:
a|b
1|
2|
3|4
While if your read replica (e.g. Materialize) does not get updated, you’ll believe that the state
of t
is:
a|b
1|2
2|3
3|4
That’s clearly very bad!
What this meant for us is that we need to discard the Relation
message’s contents and instead
re-examine the PG database’s catalog, where we could determine if the relation changed subtly and
perniciously or not.
Improved error handling
As any SRE will tell you, every system works differently in both coarse and subtle ways. Unsurprisingly, this is the case with Kafka and PostgreSQL and how they expect you to consume from them.
In Kafka, the topic is the smallest atomic logical unit of consumption––these are, in some way, akin to PostgreSQL’s tables. You might have topics for users, orders, etc. Materialize, as a Kafka consumer, consumes topics and if there is an error it is an error in consuming that topic––this means we only need one error output per topic. However, each Kafka source in Materialize consumes only one topic, so this also means we only need one error output per Kafka source.
However, the taxonomy of PostgreSQL’s replicated objects differs: instead of consuming individual tables (which would be akin to the strategy in consuming data from Kafka), you instead consume from PG publications, which represent a set of tables. It is an exercise left to the consumer to then demultiplex the replication stream into its constituent tables.
Because we built our Kafka sources first, though, this meant that the internal machinery we had in place to surface errors meant that we only had one error collection per source. For PG, this meant that if we encountered any error for any table in the publication we had to send out an error that would wedge the entire source.
Given that we were initially very eager in producing errors in the face of schema changes, Postgres sources were a little too easy to put in an inoperable state.
To solve this, we needed to start generating error collections per table. Fortunately, by its nature
as being able to process streams of data, Timely and Differential provide means of arbitrarily
demultiplexing data given some key, which lets us send (table_id, error)
, and we can ensure that
only the proper table’s error collection receives the error.
Adding and removing tables
This feature has yet to merge, but we’re far enough along with the design and foundational refactoring that we’re confident we can give a sneak peek.
tip
July 2023 Update: Adding and removing tables without impact to other tables in a Postgres Source is now possible with ALTER SOURCE ... ADD SUBSOURCE
and ALTER SOURCE ... DROP SUBSOURCE
syntax. For more information, see ‘ALTER SOURCE` documentation.
As we mentioned before, the thing you’re consuming when ingesting data from PG is a publication, but really a publication is just a mechanism to filter entries in the PG write-ahead log (or WAL). Because each Postgres source consumes from one publication, this means we only kept one record of where we were in the WAL.
This poses a problem because we don’t have a mechanism by which to add another table to those we want to ingest: we only understand our current position in the WAL.
This shortcoming’s solved straightforwardly: begin tracking the WAL position for each table. This lets us understand that if we encounter any tables whose WAL position is 0, we know that the table needs to be snapshot to be brought up to the same state as the other tables.
Not quite that simple, though…
The idea of taking a snapshot of a table is great, but we need to be able to correlate that snapshot with some state of the WAL (known as its Log Sequence Number or LSN). PostgreSQL includes a mechanism to accomplish this by opening a temporary replication slot inside of a transaction, which provides the LSN at which the snapshot “ends” and the replication slot will begin serving data.
However, almost certainly, the snapshot’s consistent LSN will be further into the WAL than the primary source and the rest of the tables, but we need the new tables to be at exactly the same location in the WAL as everything else.
One of the other folks on the storage team, Petros, had the insight that we can:
- Begin reading the primary replication stream (not the temporary one we used for the snapshot)
- Take any data from the table we snapshotted that we read and emit a retraction against it
- Commit the snapshot to Materialize at the primary replication stream’s LSN
We call this process “rewinding” and it gives us all of the flexibility we need to align any collection of tables to the the primary replication slot’s position in the WAL.
Onward
This is a survey of the changes we’ve made, but there are others. For example, we’ve increased throughput of PG sources generally and are doing work to increase them further still.
If you’d like to experience the power of materialized views that are continuously and efficiently updated as your data changes, you can get immediate access to our platform with a free 14-day trial here, or you can get in touch with our field engineering team to get a demo and talk through your use case here.