Materialize is an incremental view maintenance engine, one which takes your SQL queries expressed as views and continually maintains them as your data change. Surely there are a lot of ways one could do this, ranging from the very naïve (just recompute from scratch) to the more sophisticated end of the spectrum (what we do).

Today we’ll walk through what I think is a great example of where the “sophistication” is important: grouping and reduction operations.

You probably know several great ways to perform grouping and reduction over a static collection of data. This post is about how they may need to change if you want to quickly respond to additions to or deletions from the input data. In particular, we’ll need to consider some trade-offs to improve the latency and reduce the standing memory footprint.

By the end of it, we’ll have some techniques that allow us to maintain aggregations over massive datasets with a surprising (to me) -ly small amount of memory.

Grouping and Reduction

Let’s take a relatively simple query from a few posts back, on aggregating up NYC taxi data from 2018.

sql
SELECT
    passenger_count,
    MIN(fare_amount),
    MAX(fare_amount)
FROM
    tripdata
GROUP BY
    passenger_count;

This query produces the minimum and maximum fare_amount for each distinct passenger_count.

This example has a relatively straight-forward structure:

  • The FROM clause tells us where we get our data from (another collection called tripdata).
  • The GROUP BY clause contains the “key” we should use for each record coming from that source.
  • The SELECT clause tells us how we should distill down each of those groups. Each selected field needs to be either from the grouping key, or an aggregate over fields that need not be in the key.

The sorts of results you get from some of the taxi data look like this:

materialize=> SELECT passenger_count, MIN(fare_amount), MAX(fare_amount)
materialize-> FROM tripdata
materialize-> GROUP BY passenger_count;
 passenger_count | MIN  | MAX
-----------------+------+------
                 |      |
               0 |  -16 |  557
               1 | -450 | 8016
               2 | -198 |  700
               3 | -100 |  499
               4 |  -75 |  888
               5 |  -57 |  237
               6 |  -52 |  266
               7 |    0 |   78
               8 |    0 |   88
               9 |    5 |   98
(11 rows)

Time: 1741.500 ms (00:01.742)
materialize=>

There are some interesting questions about negative fares, and they get more exciting the more data you look at. We aren’t going to worry about the specific content of the data though.

How should we implement this query, and queries like it, in a way that allows us to efficiently maintain the results as the underlying data change? We’ll walk through three progressively more clever approaches, calling out the potential performance pitfalls you may not have known would exist. For each, we’ll want to maintain a small memory footprint, while providing high throughput and low latency updates, and any time we screw that up it’s a bug.

Approach 0: Implementation in analytic processors

Let’s start with a rough description of how you might perform this aggregation in an standard analytic processor. I am going to oversimplify substantially, and I apologize to the various people whose work has been to make such a thing smarter and more efficient. This being the first in the list doesn’t mean it is least sophisticated, just that it is least appropriate for maintaining computations.

To a first approximation, analytic processors are prepared to answer arbitrary aggregation queries by scanning across the full volume of data as quickly as possible. Although they can maintain some pre-computed aggregation (see OLAP cubes), they must still accommodate novel queries for which the answers have not yet been prepared. The simplest way to do this is to actually scan across all of the records, and maintain the aggregates for each key as you go.

As the data are scanned, in-memory aggregations are maintained for each key. For the query above, for each new record one would extract passenger_count and fare_amount and would consider updating the minimum and maximum associated with passenger_count. This in-memory state is likely proportional to the number of keys (unless the source data are already ordered by the key), and as the scan only adds records to the aggregates (no retractions) the state can be relatively simple: just the current values of the aggregates.

Unfortunately, if you want to maintain views over data that change arbitrarily, not just grow unboundedly, we’ll need to be more clever. At the very least, correctly maintaining the minimum or maximum requires keeping around all values we’ve seen, as all other values could plausibly be retracted at any point, forcing us to recall any specific value. It turns out this isn’t the only quirk (try out count(distinct expr) next).

We’ll want to take a more systematic tour through the ways we might maintain SQL aggregations.

Approach 1: Index all records by key

The first approach we’ll look at is also our most naïve, so bear with me for the moment.

From a collection tripdata, and some key expressions specified in the GROUP BY clause, we could simply group the entire collection by the grouping keys, in this case the passenger_count column. Each of the rows of tripdata would be assigned to one of these groups, by its passenger_count column, and maintained in something like a list.

With records grouped by key, we can apply each of the aggregation functions to its group. For each passenger_count we can scan through the collected records and determine the right output. In our case, the minimum and maximum fare_amount values can be determined in one scan.

This has a pretty easy implementation in differential dataflow:

rust
tripdata
    // Extract a key from each record.
    .map(|record| (key_fn(record), record))
    // Group by key, and reduce records.
    .reduce(|key, input, output| {
        // Form a record of the aggregates.
        let mut record = Vec::new();
        // Insert aggregates in order.
        for agg_fn in agg_fns {
            record.push(agg_fn(input));
        }
        // Produce the single record as output.
        output.push((record, 1));
    })

Downsides

There are a few downsides here: 1. First, we are keeping all sorts of information about each record that we don’t end up using. There are seventeen columns in our taxi data, and clearly we only really care about two of them. 2. Second, the amount of memory required by differential dataflow is proportional to the number of distinct records, even though we aren’t interested in most of the distinctions between these records. Because we keep around columns like the pick-up and drop-off times, we’ll probably have a footprint proportional to the number of input records. 3. Third, the incremental update story is pretty terrible for large groups. Some of the groups are going to be pretty big (a lot of trips have passenger counts of one or two), and if we add or remove a record, it can take a while to re-do the computation to update the aggregate. This is especially true when we remove a record, where we can’t just look at the new value and the old aggregate.

Attempt 2: Index relevant values by key

In our second approach, we’ll distill down the input records into the values required for the aggregation.

Differential dataflow maintains collections of distinct records, along with their multiplicities. The taxi records have lots of distinct data like tpep_pickup_datetime and tpep_dropoff_datetime the pair of which possibly uniquely identify records and increase the apparent number of distinct records. By reducing the records to the necessary columns we also reduce the number of distinct records, which results in a more compact memory footprint, lower recomputation time, and generally higher throughput.

To do this, we’ll need to break apart what was called agg_fn up above into two parts: 1. the aggregation itself (e.g. MIN and MAX) and 2. the value that should be fed in to the aggregation (e.g. fare_amount).

This approach gives us a slightly modified version of the differential dataflow code up above, where we first transform the data to extract the relevant values, and then perform the reduction:

rust
tripdata
    // Extract relevant values from each record.
    .map(|record| (
        key_fn(record),
        agg_vals.iter(|val| val(record)).collect(),
    )
    // Group by key, and reduce values.
    .reduce(|_key, input, output| {
        // Form a record of the aggregates.
        let mut record = Vec::new();
        // Insert aggregates in order.
        for agg_fn in agg_fns {
            record.push(agg_fn(input));
        }
        // Produce the single record as output.
        output.push((record, 1));
    })

This can result in a substantial pre-reduction in the amount of data differential dataflow needs to maintain, without much of a complexity burden. In the example of our taxi data there are only 3,105 distinct (passenger_count, fare_amount) pairs, which means across all keys we’ll only have those many records to keep track of. That’s a substantial reduction from the 102 million input records.

This sounds pretty good! With so few distinct values, what could possibly go wrong?

Downsides

Well, there are still three problems that we saw with this strategy (although we used it for quite a while!). To draw them out, let’s use a more complicated query.

sql
SELECT
    passenger_count,
    MIN(fare_amount),
    MAX(fare_amount),
    COUNT(DISTINCT trip_distance)
FROM
    tripdata
GROUP BY
    passenger_count;

This isn’t very different query, but it calls out some important distinctions!

First, we have a new field trip_distance, and this increases the number of distinct records we’ll have to maintain: each distinct (passenger_count, fare_amount, trip_distance) triple now needs to be recorded. On the full 102 million rows there are 31,833 distinct triples, up from 3,105 distinct pairs without trip_distance. We would like to avoid multiplicative growth as we add aggregates.

Second, keywords like DISTINCT should only result in recomputation when the distinct set of values change. We haven’t presented a good way to do that other than to re-evaluate some “distinct” operation over the values whenever they change, and if there are thousands to millions of values that’s a bit of a problem. This is a missed opportunity, which also blocks some bonus optimizations we’ll throw in once we fix it.

Third, the incremental update costs for MIN and MAX are still higher than we would like. When passenger_count is one, there are 11,072 distinct pairs of fare_amount and trip_distance, and if each time we experience a change we’d have to re-maximize over 11k elements, we are going to have both high latency and low throughput. That pain increases as we reduce over larger groups; when Arjun wanted to track the most prolific Wikipedia editors (maximizing without a key over all editors) materialized effectively ground to a halt.

Attempt 3: Deconstruct and Reconstitute Reductions

Our third attempt aims to maximize the robustness of our implementations, to make it increasingly unlikely that you will experience increased latencies or reduced throughputs by virtue of the structure of your data. This description represents where we are at Materialize at the moment.

Factoring reductions

As we have seen in the examples above, SQL makes it pretty easy to have multiple reductions for each GROUP BY statement. Also as we saw, maintaining the cross-product of values for each of the reductions can substantially increase the memory footprint of the reduction dataflow.

We’ll address this by factoring each reduction into independent differential dataflows for each aggregate, effectively turning the single query

sql
SELECT
    passenger_count,
    MIN(fare_amount),
    MAX(fare_amount),
    COUNT(DISTINCT trip_distance)
FROM
    tripdata
GROUP BY
    passenger_count;

into three query fragments:

sql
SELECT
    passenger_count,
    MIN(fare_amount)
..

SELECT
    passenger_count,
    MAX(fare_amount)
..

SELECT
    passenger_count,
    COUNT(DISTINCT trip_distance)
..

As you can see, there is some redundancy here, and we’ll almost certainly tidy that up in the future.

What this factoring does is ensure that each additional aggregate introduces an additive incremental cost, rather than a multiplicative incremental cost. Above, we only need to maintain distinct (passenger_count, fare_amount) and (passenger_count, trip_distance) pairs, rather than distinct triples of all three.

Each aggregate is rendered independently, roughly following the structure we’ve described above (though will modify below). The aggregations are then grouped in a final differential reduce operator, which collects and arranges the individual aggregates into one record for each key.

When the number of aggregates is zero or one, we skip this multiple stage silliness.

Distinctness

Any SQL reduction can have a DISTINCT modifier, with the intended semantics that only the distinct extracted values should be aggregated for each key. The DISTINCT modifier is set on an aggregate-by-aggregate basis, so it is helpful we have broken the aggregates apart; we can optionally insert a distinct operator for each aggregate:

rust
tripdata
    // Extract relevant values from each record.
    .map(|record| (
        (key_fn(record), agg_val(record))
    )
    // Optional, if DISTINCT is specified
    .distinct()
    // Group by key, and reduce values.
    .reduce(|_key, input, output| {
        // ...
    })

The main benefit of this construction is that when a record is added or removed, but the set of distinct records does not change, that input update is suppressed early and cheaply. We do not reform any groups, or re-evaluate any aggregation functions. Plus, we no longer need to perform any distinctness logic in the reduce either; this will make aggregations like COUNT(DISTINCT ..) substantially easier.

Hierarchical aggregation

When we look at a query like

sql
SELECT
    passenger_count,
    MIN(fare_amount)
...

we might reasonably worry that there could be lots of distinct values of fare_amount, and that to correctly maintain this minimum under arbitrary modifications to the input collection, we’ll need to both write them all down and possibly consult them all when updates happen (at least, when retractions happen, to determine if the retraction was of the minimum value and if in its absence the minimum changes).

As it happens, there aren’t a lot of distinct fare_amount values here, but that shouldn’t stop us from worrying. We could just as easily ask for the minimum of tpep_pickup_datetime, and there are millions of those in the dataset. That will probably take a fair bit more memory, as we do need to record the values, but we shouldn’t have to rescan all of the data on each change.

One way to get around this is to perform hierarchical aggregation. Rather than perform just one reduce using the grouping key, we can create a sequence of reduce operators whose keys have the form

(key, record.hash() >> shift)

for a shift that increases from 0 up to 64.

This choice of modified key performs the reduction not by key, but by subgroups within key that decrease in number as shift increases. By starting with a small shift we have the opportunity to apply the reduction to groups that are unlikely to include many records, and as we increase shift we get closer and closer to the final grouping by key. However, at no point are the groups especially large; they are repeatedly reduced, and the reduced value fed in to the next reduction.

In materialized we currently increase shift by 4 each round, doing 16 rounds of reduction no one of which has groups of more than 16 elements (except perhaps the very first, with some very unlucky data).

The crucial observation here is that when an input change occurs, we only need to propagate it through 16 layers of updates that involve at most 16 records each. That is not the very fastest way to retire one single update, but it does ensure that all updates perform at most a logarithmically bounded amount of work. Even when that update is to one of 100 million taxi pick-up timestamps.

Bonus: In-place aggregations

A fair number of SQL aggregations end up summing things. Even when they say COUNT or AVG or STDDEV, these are each really just flavors of SUM. Even ANY or ALL (which underlie EXISTS and NOT EXISTS) can be written as SUM. Fortunately, differential dataflow knows how to efficiently maintain the sum of things in-place, without maintaining the distinct values that lead to the sum. This results in a very lean memory profile of just a few counts for each key.

Why “just a few counts” rather than “one count”, you ask?

It turns out that SQL has some delightful quirks surrounding NULL, and we need to distinguish between three qualitatively different states: 1. No values for this key (no output) 2. No non-NULL values for this key (NULL output) 3. Legit values forming meaningful aggregate (legit output)

To deal with this, we implement each SUM by tracking three aggregate quantities for each key: 1. The number of values, 2. The number of non-NULL values, 3. The aggregate value itself.

Not all of the aggregates need this complexity (COUNT(*) just needs the first number) but we use three just to keep the dataflow logic simple enough. The in-place aggregations don’t result in multiplicative increases in distinct elements, and in the future we should fuse all of these aggregations together, and just extend the list of aggregates from three to “as many as are needed”.

Concluding thoughts

Writing a robust incremental reduce dataflow is non-trivial. We aren’t done yet.

There are many pitfalls to watch out for, and many opportunities to be missed. Somewhat surprising, to me at least, is that when we do these things well there doesn’t need to be a massive memory footprint. Maintaining the fares-by-passengers dataset takes only 3,105 records. Maintaining the minimum of millions of timestamps involves interacting with only a few hundred records.

Incremental reduce dataflows don’t need to be expensive. But they do require a bit of careful thought. That’s what we’re here for!

If you’d like to check out Materialize, to see just how well it works on weird reductions, you can register for a Materialize account here or swing over to the Materialize repository and see how the magic works!

More Articles

Technical Article

Why not RocksDB for streaming storage?

An explanation of our rationale for why Materialize chose not to use RocksDB as its underlying storage engine.
Arjun Narayan

Aug 6, 2020

Product Update

Release: Materialize 0.4

Materialize 0.4 introduces an Operational Data Warehouse with real-time streaming capabilities for immediate data action & analysis.

Jul 28, 2020

Technical Article

Streaming Database Roadmap Guide | Materialize

A guide to creating a streaming database with Materialize, from using a streaming framework to developing a scalable platform.

Arjun Narayan

Jun 11, 2020

Try Materialize Free