Generalizing linear operators in differential dataflow
Differential dataflow uses simple linear operators: `map`, `filter`, `flat_map` and complex: `explode` and temporal filter operators. But, with some thinking, we can generalize them all to a restricted form of join.
Differential dataflows contain many operators, some of which are very complicated, but many of which are relatively simple.
The map
operator applies a transformation to each record. The filter
operator applies a predicate to each record, and drops records that do not pass it. The flat_map
operator applies a function to each record that can result in any number of output records.
These three methods are all generalized by the flat_map
method, which you may be able to see with a bit of head scratching. They each have pretty simple implementations; usually just a few lines of code.
There are a few more linear operators, slightly more complicated and interesting operators. It turns out that these operators can be generalized as well, though to something more advanced than flat_map
.
In this post we'll work through these more complicated, and very interesting, linear operator and generalize them. It turns out they generalize to an interesting restricted form of join, which is great news for fans of relational algebra! We'll wrap with a discussion of the implications for Materialize, which unlike differential dataflow has the ability to fuse and optimize these general linear operators.
Differential dataflow background
Differential dataflow acts on streams of updates, where each individual update is a triple
(data, time, diff)
The data
component describes where the update occurs: which record experiences the change. The time
component describes when the update occurs: at which moment should the change take effect. The diff
component describes what the update change is: most commonly, an integer describing the copies of the record to insert or delete.
The stream of these triples describe the history of changes to a collection of records. We can transform the update stream into the complete collections at each time. Likewise, we can convert any changing collection to an update stream, just by subtracting from each collection the prior collection.
For example, we might imagine a collection of names evolving from an initially empty set to insert and remove various names:
("frank", 6, +1)
("frank", 8, +1)
("david", 8, +1)
("frank", 9, 2)
This collection starts empty, adds "frank", adds another "frank" and a "david", and then removes "frank" twice. The update stream tells us enough to reconstruct the collection at any time, but it is much more concise.
Differential dataflow operators act on these streams of updates. Their jobs are to transform their input streams of updates into new output streams of updates, that describe some new changing collection. The map
operator takes the update stream for one collection and produces the update stream for a collection in which each record was subjected to the map's transformation. The filter
operator takes the update stream for one collection and produces the update stream for the subset of records that satisfy the predicate. The join
operator takes the update streams for two collections and produces the update stream for the collection that pairs up records with matching keys.
For example, the operator map(x (x, x.len()))
, which appends the length of each name, should transform the above collection of names to:
(("frank", 5), 6, +1)
(("frank", 5), 8, +1)
(("david", 5), 8, +1)
(("frank", 5), 9, 2)
You can determine this by thinking through what the output collection should look like at each time, and noticing that it changes at the same moments that the input collection changes.
In each case, differential dataflow operators should behave as if they were continually reapplying some simple logic to a static collection of data, but instead they act on update streams, changes over time, and produce the corresponding output update streams.
Linear operators
Some of our operators have the mathematical property of "linearity". Specifically,
OP(x + y) = OP(x) + OP(y)
Linearity means that the operator can be applied record by record if we want.
Let's recall the example of the map(x (x, x.len()))
operator. This operator acts independently on each input record. Across a collection of data, it acts on each input record, and accumulates the results. The map
operator is linear, independent of the action it should apply to each record. It might even be unnatural to think of applying the operator to a collection, as its logic is only defined on individual data
.
The main exciting thing about a linear operator is that it gives us a pretty easy differential dataflow operator implementation. For any single input record data
, our linear operator applied to the singleton collection { data }
produces some output collection { datum1, datum2, .., datumk }
. We can implement this operator on update streams by mapping any input update triple (data, time, diff)
to the output update triples
(datum1, time, diff)
(datum2, time, diff)
..
(datumk, time, diff)
Notice that one data
record may produce multiple output updates, and for a collection of many records we should accumulate up all of the output updates.
It turns out this is a correct operator implementation! It's also pretty easy to implement, and keeps our map
, filter
, and flat_map
operators simple and performant.
Each of those differential dataflow operators are also linear themselves, on update streams not just static collections, which you can double check if you like!
Even more linear operators
As it turns out, there are some other interesting operators out there. Linear operators!
Here are two of the interesting ones:

Differential dataflow has an
explode
operator, which is a tooexciting name for an operator that is allowed to producediff
information in its output. Theexplode
operator maps eachdata
to an iterator over(value, diff)
all of which it then produces for each input. The original intent might be that you'd have accumulations(key, count)
that you might want to turn in tocount
copies ofkey
. Theexplode
operator would let you do this efficiently, without actually producingcount
actual copies ofkey
(perhapscount
is enormous). But, the operator is also really interesting because it can produce negativediff
values, turning a positive record into a negative (and vice versa). This all checks out mathematically, but it can seem a bit weird. It is easy to get wrong. 
Materialize has a concept of "temporal filter" (more on that here) which is able to transform inequality constraints between
data
andtime
into an operator that adjuststime
. Concretely, if you say thattime
must live betweenlower(data)
andupper(data)
then the operator can replace eachdata
by the updates(data, lower(data), +1) (data, upper(data), 1)
These updates defer the introduction of
data
untillower(data)
and retractdata
atupper(data)
.
The implementations of these two operators are a bit more subtle than the easier linear operators up above. The explode
operator needs to be sure to multiply the input diff
with the produced diff
. The temporal filter operator needs to be sure to take the maximum of the input time
with those produced by lower
and upper
. It also needs to multiply differences, so that the upper bounds flip the sign of the input update.
Each of these operations requires care in their implementation, and things are certainly becoming more complicated. It would be great if there weren't as many special cases!
All of the linear operators
All of the operators above, and indeed all linear operators, are instances of one most general linear operator.
Let logic
be any function from a single record data
to an update stream (let's say "a set of update triples"). Let LARGE
be the collection containing the sum over all data
of the collection data x logic(data)
, where x
is Cartesian product. This means LARGE
contains many records of the form (data, value)
, where value
is among the things produced by logic(data)
. The update stream for LARGE
contains ((data, value), time, diff)
for each (value, time, diff)
in logic(data)
.
The operator that performs an equijoin (on data
) between its input and LARGE
is a linear operator. If you project away the data
component, keeping only the value
components, you can represent any linear operator through your choice of logic
(which determines LARGE
).
The equijoin operator in differential dataflow is not terrible, but it probably isn't obvious how it works. If you have two update streams, each with keys from some common type, say
input1 = { ((key, value1), time1, diff1) }
input2 = { ((key, value2), time2, diff2) }
then for any pair of updates that have a key
that matches, we produce as output the update
((key, (value1, value2)), lattice_join(time1, time2), diff1 * diff2)
This produces a collection of keyed data with pairs of values, at the least time greater than each input time, and with a difference that is the product of input differences. It turns out that these are the updates that produce the keybased matches between the varying collections.
Let's work through some examples. We'll need to assume some "minimal time", which I'll take to be 0
.

map(f)
: letlogic(data)
produce{ (f(data), 0, +1) }
. It describes the collection that always contains exactlyf(data)
. If we join a collection ofdata
withLARGE
and retainvalue
we'll get justf(data)
for presentdata
. 
filter(p)
: letlogic(data)
produce either{ (data, 0, +1) }
ifp(data)
is true, or the empty collection otherwise. It describes the collection that always contains either exactlydata
or is empty, based onp(data)
. If we join a collection ofdata
withLARGE
and retainvalue
we'll get just the presentdata
satisfying the predicate. 
flat_map(f)
: letlogic(data)
produce the set containing(value, 0, +1)
for eachvalue
enumerated byf(data)
. It describes the collection that always contains exactly the collectionf(data)
. If we join a collection ofdata
withLARGE
and retainvalue
we'll get justf(data)
for presentdata
. 
explode(f)
: letlogic(data)
produce the set containing(value, 0, diff)
for each(value, diff)
enumerated byf(data)
. It describes the collection that is always defined by the updatesf(data)
. If we join a collection ofdata
withLARGE
and retainvalue
we'll accumulate the updates for the presentdata
. 
temporal filters: let
logic(data)
produce{ (data, lower(data), +1), (data, upper(data), 1) }
. It describes the collection that containsdata
exactly from timelower(data)
until timeupper(data)
. If we join a collection ofdata
withLARGE
and retainvalue
we'll get just the presentdata
and only fromlower(data)
toupper(data)
.
In each of these cases, we join our input collection with LARGE
and then project away data
. Although perhaps less obvious than we might like, the join implements the correct behavior for the linear operator.
An implementation
This "general linear operator" has a simple implementation, though one that I find hard to justify verbally without the join analogy. For a timely dataflow stream of (data, time, diff)
update triples, we can use timely's flat_map
operator to react to each of these triples.
This implementation just follows our statement above about what a differential dataflow join should do, and that the second half of the join is produced by logic
.
// Linear operator on a stream of update triples.
// Parameterized by the function `logic`.
self.flat_map(move (data, time, diff)
logic(data)
.into_iter()
.map(move (data2, time2, diff2)
(
data2, // new `data2`
time.join(&time2), // joined times
diff.multiply(&diff2), // multiplied diffs
)
)
)
For each data
, we enumerate logic(data)
, and produce new output updates. The updates have the newly enumerated data, each at the time that is time
and time2
merged by the lattice join operator, and with diff
and diff2
merged by multiplication.
You can also check out the (new) operator join_function
in the differential dataflow repository, where it looks like (with all of the gory Rust details):
/// Joins each record against a collection defined by the function `logic`.
///
/// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
/// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
/// modifications made to the results, namely joining timestamps and multiplying differences.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(scope {
/// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
/// // for x from 0 through 9.
/// scope.new_collection_from(0 .. 10isize).1
/// .join_function(x
/// // data time diff
/// vec![(2*x, (3*x) as u64, x),
/// (2*x, (4*x) as u64, x)]
/// );
/// });
/// }
/// ```
pub fn join_function<D2, R2, I, L>(&self, mut logic: L) > Collection<G, D2, <R2 as Multiply<R>>::Output>
where
G::Timestamp: Lattice,
D2: Data,
R2: Semigroup+Multiply<R>,
<R2 as Multiply<R>>::Output: Data+Semigroup,
I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
L: FnMut(D)>I+'static,
{
self.inner
.flat_map(move (x, t, d) logic(x).into_iter().map(move (x,t2,d2) (x, t.join(&t2), d2.multiply(&d))))
.as_collection()
}
Fusing logic
We've seen just above that these linear operators are defined by logic
. The type of logic is that it maps individual data
records to an iterator over update triples. We also know that if we want to, we could put a bunch of join_function
calls in sequence.
// apply a sequence of linear operators.
my_collection
.join_function(logic1)
.join_function(logic2)
.join_function(logic3)
Now, that's fine; it will work correctly and everything. However, it does mean that each operator will explicitly produce its results and hand them to the next operator (these are timely dataflow streams, not Rust iterators).
Wouldn't it be nice if we could just compose these things? Maybe write something like
// apply a sequence of linear operators.
let logic = logic1.followed_by(logic2).followed_by(logic3);
my_collection.join_function(logic)
It turns out this followed_by
function is just the logic we've seen up above. We can enumerate the argument iterator, and for each element apply logic
and yield all of the results. It is even the same flat_map
operator, just defined on a self
that is an iterator rather than a timely dataflow stream.
// Linear operator on a stream of update triples.
// Parameterized by the function `logic`.
self.flat_map(move (data, time, diff)
logic(data)
.into_iter()
.map(move (data2, time2, diff2)
(
data2, // new `data2`
time.join(&time2), // joined times
diff.multiply(&diff2), // multiplied diffs
)
)
)
This may look like a relatively minor bit of optimization, and that isn't entirely wrong. What this does for us though is put front and center the ability to fuse these operations, which is the first step towards optimizing them. Differential dataflow uses Rust, which will codegen to LLVM which can then do optimizations and that is all great news.
What I'm most interested in is how we can do even more optimization when the operators are expressed declaratively.
Linear operators in Materialize
Materialize is, among many other things, a declarative SQL layer on top of differential dataflow.
By being declarative, Materialize has the ability to restructure the queries it receives. In particular, it is delighted to take stacks of Map
, Filter
, and Project
actions and fuse them together. This is exceedingly helpful because these linear operators can be fused in to operators like Join
, and even restructured for multiway joins, where they can substantially reduce the volume of data stored and moved around.
However, Materialize stalls out on anything more complicated than the three operations above. Until very recently, it also stalled out on temporal filters, though through some care these can now be fused as well. Unfortunately, they can't yet be fused into a join, but they do unblock fusing other operators.
In addition, Materialize has a great number of special purposed "table valued functions" which are used to implement flat_map
like behavior. For example, you might type something like
SELECT *
FROM
my_data,
generate_series(1, my_data.count);
which produces each row of my_data
as many times as my_data.count
, with counters that go up and everything. Here generate_series
is the table valued function, and it is even used as a join! It is basically what we are doing up above with join_function
!
Materialize has a few other tricks that end up with similar situations. The repeat_row
table valued function can produce negative rows as output, which means it is more explode
than flat_map
. The temporal filters mentioned above are grammatically filter
expressions, but are really more like table valued functions. These cases all live outside the framework of Map
, Filter
, and Project
.
So I'm thrilled by the idea that all of these concepts might be unified up into one framework. That unified representation could then be optimized, and fused in to other operators. For those of you using temporal filters, this would allow them to be better pushed down in to joins, and it can reduce their memory footprint substantially in some cases. Internally, some of our CDC format unpacking uses this logic, and jointly optimizing that logic with the SQL you have layered on top of it gives us the ability to unpack and manipulate less.
All in all, I'm excited that we might end up reducing the number of concepts that we work with, simplifying things at the same time as we open up new doors for performance. Join us on Slack if you're interested in learning more about the innerworkings of Materialize, and if this sounds like something you'd like to work on, we're hiring!