Speeding up Timely Dataflow by 100x

A bit of a bait title, but by the end of the post we will have sped up a timely dataflow computation by 100x. We won't have changed the computation, just a flag that it uses. We will also see a great example of why timely dataflow's approach to progress tracking can be orders of magnitude more efficient than other stream processors.
The set-up: big dataflows
We're going to start with the problem, which is a real enough problem we see at Materialize.
Dramatically simplified, we are going to turn on 1,000 dataflows all with the same structure:
INPUT -> REGION { ARRANGE -> FILTER^999 } -> PROBE
Very schematic, but let's introduce the players:
INPUTis an operator that allows you to provide more input, and advance the "time", indicating to others that you'll have no more data bearing the previous timestamps.REGIONis an organizational scope: it wraps groups of operators but presents outwards as a single operator itself. We use these at Materialize to structure computation to make it easier to understand. It will also make it fast.ARRANGEis differential dataflow's "index build" operator. It collects updates that flow past and ensures that we have a multi-versioned, indexed, roll-up of all updates, so that other operators can share that resource.FILTERsubsets data, and in this story it's just a placeholder for "more work". We've put 999 in a row, and they are just there to force each region to be a non-trivial amount of logic, which .. is real (courtesy: SQL).PROBEis an operator that watches to confirm the completion of timestamps. It is how we know once we've advanced an input, whether the work has flowed through the dataflow fully.
So we'll do this 1,000 times, with each dataflow containing ~1,000 operators.
The experiment then is to repeatedly introduce an update into one input, and then advance the times on all inputs. In essence, one second has passed and only one of the inputs had any updates, but we do need to confirm that all outputs are correct. Let's run that harness, on my laptop with one worker:
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
So after a bit of loading, it seems like steps take about 350ms on average. Is that good?
In a conventional stream processor, we have advanced the times of 1,000 inputs from t to t+1, say, and then need to propagate that information along 1,000 dataflow edges for each input. Flink's approach to moving this information literally has the operators communicate this information from one to the next. There are over 1,000,000 operators here, again not unrealistic, which means that we are spending about 350ns per operator. That's not zero, but it's not that bad for the control plane of a distributed system. It would also potentially be a lot more as each of the edges is potentially coordination among workers (not in this single-worker case, though).
A reality check
Is the above realistic? Aren't things always in motion in stream processors? No. Stream processors are surprisingly idle when you connect them to business logic, rather than just event firehoses. Your fraud detector does fire now and again, but if it is producing thousands of alerts every second you may have a different problem. Business logic generally refines and reduces raw event firehoses, and of the 1,000 things you are trying to do, it is not uncommon that only a fraction of them are lighting up at any moment.
There's another point that is often overlooked, that stream processors are dynamical systems themselves operating "open loop", in that their input does not wait for their output to be complete. The real world changes at some rate that doesn't depend on your choice of stream processor, and it doesn't slow down just because your tools do.
There is a virtuous side to this cycle, though. If your stream processor goes a bit faster, say from 1Hz to 10Hz, the amount of work it has to do for each tick can drop by 10x. And .. that can now unlock going to 100Hz, at which point the per-tick work drops by another 10x. And so on, until you run in to your system's fixed overheads.
In stream processing, you get the best results by chasing your fixed overheads, so that you can take zero time when nothing happens, and as close to "unit time" as you can when only one thing happens. These determine the equilibrium that your processor settles in to, and set the floor for the latency to respond to a change. We want to do well processing only a few updates each tick, because this is the equilibrium we want to arrive at.
Flink's approach to progress tracking doesn't do zero work when nothing happens, it does ~1,000,000 units of work. It has to, as long as communication happens directly between operators. Timely dataflow also does ~1,000,000 units of work, but importantly it doesn't have to.
We are going to get our 100x by being smarter about tracking progress.
Smartness: tracking progress in timely dataflow
We spend most of our time above determining that little has happened. Many operators are involved, and you might think that is necessary because each one informs the next. This is not how timely dataflow tracks progress.
Timely dataflow uses "timestamp capabilities", which are system-minted tokens that give an operator permission to speak at some time. The system knows who holds these, and that anyone with one is allowed to send messages at that time or greater. Generally, the INPUT operators each hold one, with a time that they advance to signal to others that they can all move forward. Other operators transiently hold them; the ARRANGE operator accumulates updates until the time they reference passes, and it can finalize their net update. The FILTER operator holds none.
What is useful about these capabilities is that it allows the timely dataflow system itself to have a bird's eye view of the work in flight in the system. It can see at any moment that all of the INPUT operators have a capability at t+1 say, and there is one message in flight with time t, but only in one dataflow. Informally, this could be enough to tell all 999 other dataflows that they are cleared through t+1, without going operator-to-operator to discover this.
In fact, this is already how timely works. Each worker observes the passage of time through the capabilities that come and go, and passes this information on to all operators it hosts. The problem here is the "all". There are 1,000,000 operators to tell, even if we did little work to figure out what we should tell them.
In actual fact, we know that most of these operators do not care. The FILTER operators certainly don't care, and the ARRANGE operator .. may care. More on that. The real problem is the REGION operator, who needs to know in order to pass the information on to its ARRANGE and FILTER operators. Once we pass the information to the REGION operator, it has no choice but to at least check to see if its hosted operators are interested, and this is where the time goes.
Opting out of timestamp progress
Timely operators have always been allowed to express the information "I do not care about the passage of time". These operators will not be scheduled on the basis of timestamps changing. They may still be scheduled when data arrive, or if explicitly activated, but they will not be activated just because time moved forward for them.
In this binary framing, only INPUT and FILTER can say that they are oblivious to time passing.
ARRANGEneeds to see time pass to unlock its accumulated updates.REGIONneeds to see time pass to informARRANGE.PROBEneeds to see time pass to report the passage of time (natch).
So the opting out isn't all that useful in this example. The ARRANGE operator "taints" the REGION, which always needs to be scheduled. This results in the region exploring all of its children, only to discover that all but one of them do not care.
The recent timely change was to add a third option to whether an operator cares to hear about time passing, between "never" and "always". That new option is "if I hold a capability".
Informally, if you have time-based work to do, then yes you care about time. But if you have no work to do, nor ability to produce any output at all, then no reason to check the clock.
In this framing,
ARRANGEneeds to see time pass when it holds a capability, for its updates.REGIONneeds to see time pass because it has a child that needs to.
The second is the unlock. Most of the time there is no pending work in a region, and we can avoid even investigating who among its operators might be interested. We do have to check with the region itself, but then we can bounce without doing the 1,000 operators of work. The PROBE at the other end gets the message, because it was never waiting for the 1,000 in the first place (in timely; the equivalent operator would wait in other systems).
How does it work when we make this change?
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
This is now about ~4ms per iteration, as compared to 350ms before. We are doing what would be ~1,000,000 operators worth of work in other systems as if at 4ns per operator. You can make the 999 arbitrarily large to make the gap arbitrarily awkward looking.
As we discussed above, the faster you go the less there is to do with each tick. Then the faster you go the next time around. By burning away the fixed costs, the check-in with every operator, we've found a much faster equilibrium we can arrive at.
Conclusions
Timely dataflow's approach to progress tracking is fundamentally different to, and I would say better than, conventional stream processors. By having the system track the flow of time, rather than operators, you unlock options that aren't available to the more myopic implementations.
Moreover these are real problems. A dataflow that goes A to B to C is a cute whiteboard picture, but it isn't real. Real dataflows are numerous and complex.
Even just the Materialize "catalog server", an internal cluster we spin up for each user, has ~12,000 dataflow operators, spread over ~100 dataflows. Moment-by-moment most of these are dormant, because no one has changed the number of clusters, the available RBAC roles, or live table names. And yet by their existence they cost in a conventional stream processor, slowing down the things that do matter moment-to-moment (e.g. the hydration status of that view you just deployed).
Timely dataflow's progress tracking unlocks orders of magnitude improvements in end-to-end latency when faced with complex dataflows.
Technical details and sneaky caveats
Nothing too sneaky here, but two things worth calling out.
First, the REGION operator itself needs to choose at least "if I hold a capability", because of deferred work it does in progress tracking. Essentially, it also holds on to capabilities, because of how it acknowledges the movement of messages through the system. There is an optimization that has the region delay acknowledgement of messages (not unlike TCP) until it is clear that failing to do so would block the system. It receives this signal by comparing the passing of time with the message timestamps it has deferred acknowledging.
Second, the ARRANGE operator currently has a second function: re-importing itself into another dataflow. This second function requires that it mirror the passage of time into that dataflow, which means that it needs "always" set to perform this function. In essence, it falls back to "Flink-style" progress tracking when sharing across dataflows. However, many/most arrangements do not need to be shared across dataflows. In most Materialize dataflows, the complex dataflow results in a single ARRANGE operator who needs to be shared: the index that results from CREATE INDEX, and none of the supporting arrangements. Putting this operator outside the REGION solves the problem.
