Generalizing linear operators in differential dataflow

Differential dataflows contain many operators, some of which are very complicated, but many of which are relatively simple.
The map operator applies a transformation to each record. The filter operator applies a predicate to each record, and drops records that do not pass it. The flat_map operator applies a function to each record that can result in any number of output records.
These three methods are all generalized by the flat_map method, which you may be able to see with a bit of head scratching. They each have pretty simple implementations; usually just a few lines of code.
There are a few more linear operators, slightly more complicated and interesting operators. It turns out that these operators can be generalized as well, though to something more advanced than flat_map.
In this post we'll work through these more complicated, and very interesting, linear operator and generalize them. It turns out they generalize to an interesting restricted form of join, which is great news for fans of relational algebra! We'll wrap with a discussion of the implications for Materialize, which unlike differential dataflow has the ability to fuse and optimize these general linear operators.
Differential dataflow background
Differential dataflow acts on streams of updates, where each individual update is a triple
1 | |
2 | |
The data component describes where the update occurs: which record experiences the change. The time component describes when the update occurs: at which moment should the change take effect. The diff component describes what the update change is: most commonly, an integer describing the copies of the record to insert or delete.
The stream of these triples describe the history of changes to a collection of records. We can transform the update stream into the complete collections at each time. Likewise, we can convert any changing collection to an update stream, just by subtracting from each collection the prior collection.
For example, we might imagine a collection of names evolving from an initially empty set to insert and remove various names:
1 | |
2 | |
3 | |
4 | |
5 | |
This collection starts empty, adds "frank", adds another "frank" and a "david", and then removes "frank" twice. The update stream tells us enough to reconstruct the collection at any time, but it is much more concise.
Differential dataflow operators act on these streams of updates. Their jobs are to transform their input streams of updates into new output streams of updates, that describe some new changing collection. The map operator takes the update stream for one collection and produces the update stream for a collection in which each record was subjected to the map's transformation. The filter operator takes the update stream for one collection and produces the update stream for the subset of records that satisfy the predicate. The join operator takes the update streams for two collections and produces the update stream for the collection that pairs up records with matching keys.
For example, the operator map(|x| (x, x.len())), which appends the length of each name, should transform the above collection of names to:
1 | |
2 | |
3 | |
4 | |
5 | |
You can determine this by thinking through what the output collection should look like at each time, and noticing that it changes at the same moments that the input collection changes.
In each case, differential dataflow operators should behave as if they were continually re-applying some simple logic to a static collection of data, but instead they act on update streams, changes over time, and produce the corresponding output update streams.
Linear operators
Some of our operators have the mathematical property of "linearity". Specifically,
1 | |
2 | |
Linearity means that the operator can be applied record by record if we want.
Let's recall the example of the map(|x| (x, x.len())) operator. This operator acts independently on each input record. Across a collection of data, it acts on each input record, and accumulates the results. The map operator is linear, independent of the action it should apply to each record. It might even be unnatural to think of applying the operator to a collection, as its logic is only defined on individual data.
The main exciting thing about a linear operator is that it gives us a pretty easy differential dataflow operator implementation. For any single input record data, our linear operator applied to the singleton collection { data } produces some output collection { datum1, datum2, .., datumk }. We can implement this operator on update streams by mapping any input update triple (data, time, diff) to the output update triples
1 | |
2 | |
3 | |
4 | |
5 | |
Notice that one data record may produce multiple output updates, and for a collection of many records we should accumulate up all of the output updates.
It turns out this is a correct operator implementation! It's also pretty easy to implement, and keeps our map, filter, and flat_map operators simple and performant.
Each of those differential dataflow operators are also linear themselves, on update streams not just static collections, which you can double check if you like!
Even more linear operators
As it turns out, there are some other interesting operators out there. Linear operators!
Here are two of the interesting ones:
- Differential dataflow has an
explodeoperator, which is a too-exciting name for an operator that is allowed to producediffinformation in its output. Theexplodeoperator maps eachdatato an iterator over(value, diff)all of which it then produces for each input. The original intent might be that you'd have accumulations(key, count)that you might want to turn in tocountcopies ofkey. Theexplodeoperator would let you do this efficiently, without actually producingcountactual copies ofkey(perhapscountis enormous). But, the operator is also really interesting because it can produce negativediffvalues, turning a positive record into a negative (and vice versa). This all checks out mathematically, but it can seem a bit weird. It is easy to get wrong. - Materialize has a concept of "temporal filter" (more on that here) which is able to transform inequality constraints between
dataandtimeinto an operator that adjuststime. Concretely, if you say thattimemust live betweenlower(data)andupper(data)then the operator can replace eachdataby the updates
1 | |
2 | |
3 | |
- These updates defer the introduction of
datauntillower(data)and retractdataatupper(data).
The implementations of these two operators are a bit more subtle than the easier linear operators up above. The explode operator needs to be sure to multiply the input diff with the produced diff. The temporal filter operator needs to be sure to take the maximum of the input time with those produced by lower and upper. It also needs to multiply differences, so that the upper bounds flip the sign of the input update.
Each of these operations requires care in their implementation, and things are certainly becoming more complicated. It would be great if there weren't as many special cases!
All of the linear operators
All of the operators above, and indeed all linear operators, are instances of one most general linear operator.
Let logic be any function from a single record data to an update stream (let's say "a set of update triples"). Let LARGE be the collection containing the sum over all data of the collection data x logic(data), where x is Cartesian product. This means LARGE contains many records of the form (data, value), where value is among the things produced by logic(data). The update stream for LARGE contains ((data, value), time, diff) for each (value, time, diff) in logic(data).
The operator that performs an equijoin (on data) between its input and LARGE is a linear operator. If you project away the data component, keeping only the value components, you can represent any linear operator through your choice of logic (which determines LARGE).
The equijoin operator in differential dataflow is not terrible, but it probably isn't obvious how it works. If you have two update streams, each with keys from some common type, say
1 | |
2 | |
3 | |
then for any pair of updates that have a key that matches, we produce as output the update
1 | |
2 | |
This produces a collection of keyed data with pairs of values, at the least time greater than each input time, and with a difference that is the product of input differences. It turns out that these are the updates that produce the key-based matches between the varying collections.
Let's work through some examples. We'll need to assume some "minimal time", which I'll take to be 0.
map(f): letlogic(data)produce{ (f(data), 0, +1) }. It describes the collection that always contains exactlyf(data). If we join a collection ofdatawithLARGEand retainvaluewe'll get justf(data)for presentdata.filter(p): letlogic(data)produce either{ (data, 0, +1) }ifp(data)is true, or the empty collection otherwise. It describes the collection that always contains either exactlydataor is empty, based onp(data). If we join a collection ofdatawithLARGEand retainvaluewe'll get just the presentdatasatisfying the predicate.flat_map(f): letlogic(data)produce the set containing(value, 0, +1)for eachvalueenumerated byf(data). It describes the collection that always contains exactly the collectionf(data). If we join a collection ofdatawithLARGEand retainvaluewe'll get justf(data)for presentdata.explode(f): letlogic(data)produce the set containing(value, 0, diff)for each(value, diff)enumerated byf(data). It describes the collection that is always defined by the updatesf(data). If we join a collection ofdatawithLARGEand retainvaluewe'll accumulate the updates for the presentdata.- temporal filters: let
logic(data)produce{ (data, lower(data), +1), (data, upper(data), -1) }. It describes the collection that containsdataexactly from timelower(data)until timeupper(data). If we join a collection ofdatawithLARGEand retainvaluewe'll get just the presentdataand only fromlower(data)toupper(data).
In each of these cases, we join our input collection with LARGE and then project away data. Although perhaps less obvious than we might like, the join implements the correct behavior for the linear operator.
An implementation
This "general linear operator" has a simple implementation, though one that I find hard to justify verbally without the join analogy. For a timely dataflow stream of (data, time, diff) update triples, we can use timely's flat_map operator to react to each of these triples.
This implementation just follows our statement above about what a differential dataflow join should do, and that the second half of the join is produced by logic.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
For each data, we enumerate logic(data), and produce new output updates. The updates have the newly enumerated data, each at the time that is time and time2 merged by the lattice join operator, and with diff and diff2 merged by multiplication.
You can also check out the (new) operator join_function in the differential dataflow repository, where it looks like (with all of the gory Rust details):
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 | |
17 | |
18 | |
19 | |
20 | |
21 | |
22 | |
23 | |
24 | |
25 | |
26 | |
27 | |
28 | |
29 | |
30 | |
31 | |
32 | |
33 | |
34 | |
35 | |
36 | |
37 | |
38 | |
39 | |
40 | |
41 | |
Fusing logic
We've seen just above that these linear operators are defined by logic. The type of logic is that it maps individual data records to an iterator over update triples. We also know that if we want to, we could put a bunch of join_function calls in sequence.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
Now, that's fine; it will work correctly and everything. However, it does mean that each operator will explicitly produce its results and hand them to the next operator (these are timely dataflow streams, not Rust iterators).
Wouldn't it be nice if we could just compose these things? Maybe write something like
1 | |
2 | |
3 | |
4 | |
It turns out this followed_by function is just the logic we've seen up above. We can enumerate the argument iterator, and for each element apply logic and yield all of the results. It is even the same flat_map operator, just defined on a self that is an iterator rather than a timely dataflow stream.
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
This may look like a relatively minor bit of optimization, and that isn't entirely wrong. What this does for us though is put front and center the ability to fuse these operations, which is the first step towards optimizing them. Differential dataflow uses Rust, which will codegen to LLVM which can then do optimizations and that is all great news.
What I'm most interested in is how we can do even more optimization when the operators are expressed declaratively.
Linear operators in Materialize
Materialize is, among many other things, a declarative SQL layer on top of differential dataflow.
By being declarative, Materialize has the ability to restructure the queries it receives. In particular, it is delighted to take stacks of Map, Filter, and Project actions and fuse them together. This is exceedingly helpful because these linear operators can be fused in to operators like Join, and even restructured for multiway joins, where they can substantially reduce the volume of data stored and moved around.
However, Materialize stalls out on anything more complicated than the three operations above. Until very recently, it also stalled out on temporal filters, though through some care these can now be fused as well. Unfortunately, they can't yet be fused into a join, but they do unblock fusing other operators.
In addition, Materialize has a great number of special purposed "table valued functions" which are used to implement flat_map-like behavior. For example, you might type something like
1 | |
2 | |
3 | |
4 | |
5 | |
which produces each row of my_data as many times as my_data.count, with counters that go up and everything. Here generate_series is the table valued function, and it is even used as a join! It is basically what we are doing up above with join_function!
Materialize has a few other tricks that end up with similar situations. The repeat_row table valued function can produce negative rows as output, which means it is more explode than flat_map. The temporal filters mentioned above are grammatically filter expressions, but are really more like table valued functions. These cases all live outside the framework of Map, Filter, and Project.
So I'm thrilled by the idea that all of these concepts might be unified up into one framework. That unified representation could then be optimized, and fused in to other operators. For those of you using temporal filters, this would allow them to be better pushed down in to joins, and it can reduce their memory footprint substantially in some cases. Internally, some of our CDC format unpacking uses this logic, and jointly optimizing that logic with the SQL you have layered on top of it gives us the ability to unpack and manipulate less.
All in all, I'm excited that we might end up reducing the number of concepts that we work with, simplifying things at the same time as we open up new doors for performance. Join us on Slack if you're interested in learning more about the inner-workings of Materialize, and if this sounds like something you'd like to work on, we're hiring!
