Making Iceberg Work for Operational Data

Apache Iceberg has become the de facto open table format for analytics — it's what Snowflake, Databricks, and AWS S3 Tables all converged on. Write Parquet files to object storage, track them with some JSON metadata, and every analytics engine can read the table.
But Iceberg was designed for batch ETL jobs that run periodically and write big, consolidated files. Iceberg wants big, infrequent commits. Operational data changes continuously.
Materialize is a live context engine. Think Postgres, but the views update themselves as the underlying data changes. Our users build on it for operational workloads that demand freshness, but the same data also needs to reach analytics tools like Snowflake or Spark. Iceberg gives users a way to compute once and serve both — but only if you can get operational semantics into a format that wasn't designed for them, without paying the memory and latency costs that usually come with batching.
To that end, we’ve added a new Iceberg sink that allows you to deliver data from Materialize to your data warehouse or data lake, providing a true Kappa architecture.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 | |
17 | |
18 | |
19 | |
20 | |
21 | |
22 | |
23 | |
24 | |
25 | |
26 | |
27 | |
28 | |
29 | |
30 | |
31 | |
32 | |
33 | |
34 | |
35 | |
If you’d like to get started right away, visit our docs here. But if you’re curious, we’ll walk you through how we get operational data into Iceberg without the memory and latency costs of batching.
How Materialize Thinks About Consistency
Operational data is highly mutable. Rows get inserted, updated, and deleted constantly. Materialize assigns every change a virtual timestamp. Not wall-clock time, but a counter that advances as the system processes data. The timestamp establishes a total ordering over all mutations and defines transaction boundaries. Changes sharing a timestamp are part of the same transaction, even across tables.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
The system tracks progress with a frontier. Everything before it is complete and immutable, everything after it might still change. Frontiers advance continuously, roughly once per second, even when no data changes. “Nothing happened at this moment” is itself information downstream operators need. If the frontier stops advancing, they can’t distinguish idle from slow.
Changes at the same timestamp consolidate. If a row is inserted and deleted at timestamp T, they cancel out. Downstream never sees the row. Any range of timestamps is therefore a consistent snapshot. Every transaction within the range is fully included, none are partial.
The Naive Approach
The obvious way to write live data to Iceberg is:
- Buffer rows in memory
- Wait for your commit interval (say, 10 seconds)
- Consolidate everything (cancel out insert/delete pairs)
- Write Parquet files
- Commit to Iceberg
Why batch at all? Iceberg commits aren’t free. Each commit writes files to object storage. Too many small commits mean you have bloated metadata, and slow queries. While your iceberg service may compact the data, your writes bill might become expensive.
The consolidation step is important. If a row was inserted at T1 and deleted at T2 within the batch, buffering lets you cancel them out before writing anything. The downstream system never sees the row existed.
This works, but it has problems. If you’re ingesting high-volume data, you’re holding potentially gigabytes in memory waiting for that window to close. You’re also not doing any useful work during that window—all your Parquet encoding and S3 uploads happen in a burst at the end.
With a 10-second window, maybe this is fine. But what about 30 minutes? What about workloads where you want larger commits to reduce Iceberg metadata overhead? The memory cost starts to hurt. A lot.
Minting Batch Descriptions Ahead of Time
A batch is just a time range. A lower bound (inclusive) and an upper bound (exclusive). Any change with a timestamp in that range belongs to that batch. Instead of waiting for a batch window to close before we know its boundaries, we mint batch descriptions ahead of time.
The sink maintains multiple concurrent batch descriptions, each covering the next interval of logical time. These are broadcast to all workers so that when a row arrives, every worker independently knows which batch it belongs to. No coordination needed.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
When a row shows up at time T0+5s, we don’t need to wait for the batch to “close.” Workers start streaming to S3 immediately. In practice, there are small fixed-size buffers to avoid drip-feeding individual rows, but memory usage stays bounded no matter how large the dataset. We’re never holding the entire batch window’s worth of data.
Because batch boundaries align with timestamp boundaries, each Iceberg commit is a transactionally consistent snapshot. The sink preserves single-table transaction semantics out of the box.
Why might rows arrive out of timestamp order? Different workers process different partitions of the data, and the system doesn’t enforce global ordering. A worker might receive a change at T0+15s before another worker finishes processing T0+5s. The batch descriptions let each worker independently route rows to the right batch without coordination.
As time progresses and the oldest batch’s upper boundary passes - meaning the frontier has advanced past it, so no more data can arrive for that batch - we retire it and int a new one at the end:
1 | |
2 | |
3 | |
the system maintains a sliding window of batches, always looking ahead. Batch boundaries are metadata you can compute ahead of time. You don’t need to wait for data to arrive to know where it belongs.
The Delete Problem
So far so good for inserts. But Materialize handles full CDC semantics. When a row is updated in a source database, Materialize sees that as a retraction of the old value and an insertion of the new value. Deletes are retractions with no corresponding insert.
This is where Iceberg’s batch-oriented design creates friction.
Consider this sequence within a single 10-second batch:
- Insert row with key=A at timestamp T1
- We write it to Parquet, upload to S3
- Delete row with key=A at timestamp T2
- ???
The row is already in S3. We can’t un-write it. Iceberg’s answer is delete files. Separate Parquet files that say “ignore these rows when reading.” There are two types:
Equality deletes: “If you see a row matching this primary key, ignore it.” Simple to write, expensive to read. Every row gets compared against the delete list.
Position deletes: “In file X at position Y, ignore that row.” Extremely cheap to read—it’s just a bitmask. But you need to know exactly which file and position the row is in.
For rows written within the current batch, we do know their position. We keep a hash map of the primary key to file position for everything we’ve written in the current batch.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
If a delete arrives for a key in the map, we emit a position delete (cheap) and remove it from the map. If the key isn’t in the map, it was written in a previous batch - we fall back to an equality delete (expensive).
The only state we keep in memory during a batch is primary keys and their positions. Not the full row data. For workloads where deletes hit recently-inserted data, we almost never pay the equality delete cost.
Could we avoid within-batch deletes entirely by buffering and consolidating before writing? Yes, but we’d be back to holding everything in memory. We chose to let the data flow and trust the downstream system to handle it. Data warehouses are built for this; munging though large data volumes is what they do.
RisingWave wrote a great post on this problem and arrived at the same trick we use—worth reading for a deeper dive on the tradeoffs.
Recovery Without External State
When the sink crashes and restarts, or Materialize upgrades to a new version, it needs to know where to resume. The typical answer is “check some external database” or “look at Kafka offsets.” We store progress directly in Iceberg snapshot properties instead.
Every commit includes three pieces of metadata stored in the snapshot’s summary properties—a key-value map Iceberg preserves with each snapshot:
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
mz-frontier records the timestamp frontier at commit time. Everything before it is in Iceberg. mz-sink-version is a version number for fencing. mz-sink-id identifies which sink wrote the snapshot.
On startup, we scan the table’s snapshots from newest to oldest, looking for our metadata. External engines like S3 Tables or Spark might have compacted the table, creating snapshots with operation="replace" that don’t have our metadata. We skip those and keep looking.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
Once we find our last committed frontier, we resume from there.
If we find a higher mz-sink-version than ours, it means a newer version of the sink has already taken over. We’ve been fenced out—time to shut down and let the new one run.
Iceberg itself is the coordination primitive. No ZooKeeper, no separate metadata store. The destination system’s own metadata becomes the coordination layer.
The Empty Snapshot Problem
Frontiers advance continuously — roughly once per second — even when no data is changing. To track progress, we commit empty snapshots just to update the mz-frontier metadata.
That’s roughly 3,600 empty snapshots per hour. Each one is tiny — just a JSON pointer and our three metadata fields. S3 Tables and other engines will eventually compact them away via their maintenance jobs. But it still feels wrong.
Iceberg’s spec has a table-level metadata field (properties in the table metadata JSON) that seems designed for exactly this use case. But the documentation strongly implies you shouldn’t update it frequently. There are warnings about concurrent modification and no clear guidance on what “frequently” means.
So we're appending empty snapshots. It works. It's not elegant.
I want to benchmark the actual overhead and potentially push back on the spec authors. If you've run into this same problem, or if you know the history behind the metadata field restrictions, I'd love to hear about it.
Multi-Table Transactions
Everything above gives us single-table consistency. Each Iceberg commit is a consistent snapshot of one table. But operational data doesn’t live in one table.
Consider an e-commerce system. An order is placed, inventory decreases, a shipment record is created. In the source database, these happen in the same transaction. Materialize preserves that. Multiple materialized views that derive from the same source see a consistent snapshot of the world, because they share the same logical timestamp. If views A and B both see changes at timestamp T, they are guaranteed to reflect the same transaction.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 | |
The Iceberg spec doesn’t support multi-table transactional commits. Each table gets its own independent commit. A reader querying orders and shipments at the same moment might see the order by not the corresponding shipment. Even though Materialize had both at the same time. The consistent we maintained all the way through the pipeline breaks at the last mile.
This isn’t a hypothetical problem. Any system exporting CDC data to multiple Iceberg tables faces it. The standard workarounds are to “just query with enough delay that everything has probably landed”, or to just use “one big table.” Both of which work until they don’t.
We’ve started conversations on the Iceberg mailing list about adding this capability. This is the kind of thing that becomes tractable once you have an operational system that tracks logical time across its entire pipeline.

