Editor’s note: Natacha Crooks is currently a visiting researcher at Materialize. Starting in Fall 2020, Natacha will be an assistant professor of computer science at UC Berkeley.
The quality of a result for streaming systems is traditionally measured along three axes: latency, how long did it take the system to compute the query, freshness, how up-to-date is the result, and correctness, does the result make sense. For a result to make sense, it must reflect the inputs that were ingested in the system, and must do so in a way that is coherent with the time at which these messages were generated. Existing stream processors have (in our view, unduly) focused their efforts solely on the former. Specifically, some platforms have almost exclusively sought to achieve “exactly once delivery” (no message is lost or delivered more than once, even in the presence of failures). While reliable atomic message delivery is a useful building block for a message bus, streaming computation platforms require additional guarantees to ensure that messages are shown in a way that is coherent with time. This property is traditionally referred to as consistency in distributed systems, but remains poorly specified in a streaming context, where users often have to reason about the consistency guarantees of their streaming microservices, or worse, settle for eventual consistency in their streaming data platform. In this blog post, we summarise and define what consistency means for a streaming computation platform, as a first step towards holding streaming systems accountable to these guarantees.
Many streaming systems consist of views (think joins, filters, or aggregates) that take as input one or more data streams, perform computation on these streams, and produce one or more output streams as a result. These views are organised into hierarchies, where the output of a view can be used as the input to a later view in the hierarchy. Consider the example below: there are two streams of data, stream 1 and stream 2. Stream 1 ingests tuples containing fields
a,b,c. The first view creates a projection on this stream, displaying
(a,b) only. The second view then filters records for which
a>5. Finally, the last view takes this output and joins it against the
d field of stream 2.
Streams are traditionally unbounded: new data can come in at any point. The output of views must be continuously recomputed to reflect this new data. This can be done naively, by throwing out the existing results and replaying all the data that ever existed, or incrementally, by doing the minimal amount of work necessary to generate an updated result.
Intuitively, a streaming system is consistent if at all times
t the views V correctly reflect the the inputs at time
t. Otherwise said, the views V should represent a state of the world that could have existed at time
Time in computer science orders events: given two events
e’ happening at time t and t’ respectively, an event
e is said to happen before
e’ if t < t’.
In its simplest form, time corresponds to real-time: an event
e happening at 12:21pm happens before an event
e' that happened at 12:55pm. Two events that both happen at 12:35pm are said to be concurrent. Network telemetry data and ad clicks often have real-time information associated with events.
Unfortunately, the distributed and heterogeneous nature of modern analytics pipeline often makes it challenging to define a single notion of time across all system components. There may not, for instance, exist a centralised, unified clock. Two separate IOT devices like a fridge and a toaster may have no way to determine which of two events happened first. Distributed systems instead often rely on the notion of logical time. Logical time captures dependencies that arise out of real-world systems. It captures two notions: causal dependencies and atomic dependencies.
Causal dependencies capture real-word effects: if a network packet arriving (event
e) causes a router function to be triggered (event
e’) then we can say that
e’ happens after e. Causal dependencies are transitive: if the router function trigger then further causes an additional event
e’’ also happens after
Atomic dependencies instead capture events that should be thought of happening simultaneously. Consider for instance a database transaction that writes to two objects x (event
e) and y (event
e’) in the database: ACID semantics state that the effects of a transaction should appear to take effect atomically. Both events e and e’ should thus be assigned to the same timestamp.
Readers familiar with existing stream processing systems may have heard about the joint notions of event-time and system-time: event-time defines a notion of ordering that is application specific (it can capture causal dependencies or atomic dependencies), while system-time instead defines the time at which the system processed the data. It thus refers exclusively to real-time.
Given our notions of real-time and logical-time, let’s first look at what consistency looks like for a system with a single view. We’re going to refer to this notion as internal consistency.
A view is internally consistent at time
t if it reflects all events that have a timestamp smaller or equal to
t , and no events that have a timestamp greater than
To illustrate, consider the following stream and let’s assume that our timestamps follow real-time.
The following view is not internally consistent at time 3 as it misses the insert
[a:8,b:Bob,c:UK] despite the fact that this event happens at time 2.
This next view is also not consistent at time 3 as it contains the insert
(a=7,b=Frank) which happens at time 4. It is however consistent at time 4 as it contains all events with timestamp
<=4 but no event with timestamp
What happens if you instead have multiple views organised in a hierarchy? Consider the following view structure and the two streams 1 and 2:
Let’s now assume that we capture the following state of the system (see below): the first three events of Stream 1 are processed by all three views, but only the first two events of stream 2 are processed by the join. The Project view is consistent at time 2 as it reflects all events with timestamp
<=2. The Select view is also consistent at time 2. In contrast, the Join view cannot be consistent at this time as it does not reflect Stream 2’s event
In a real system, a user will issue sequences of queries to different views (this is sometimes referred to as a session). These queries should return a sequence of results that is consistent with how the system evolved: later queries should not return results that correspond to earlier system states. Simply put: streaming systems should not go back in time. Let’s define this a little bit more formally.
Let’s define a client that issues an ordered sequence of queries
q1 -> q2 -> q3 as part of a client session. And let’s say that a query
q has a timestamp
t (we’ll write it
q(t)) if it returns a view that is consistent at time
t. We want to enforce the following guarantee:
if q1(t) -> q2(t’), then t’>=t. In other words: queries later in the session should return results from later views.
To illustrate, consider the example below: let’s say that
q1 returns the view consistent at time 7, which contains six records. But let’s now assume that the user reissues this same query, only to return the results consistent at time
ts=4. Three records would magically disappear! Consistency anomalies like this one make the task of programming complex systems significantly harder as they introduce behaviours which do not reflect what could have happened in a real-world execution.
What about multiple users?
The definitions above applied to a single client. What happens if we have multiple clients accessing the system, each executing a different stream of queries? There are two options here: consistency guarantees could apply either per user (often called session guarantees) or globally (called sequential consistency). To illustrate the difference, let’s return to our earlier example, but let’s now assume that the two queries are issued by different users Alice and Bob. Alice issues the first query at time
ts=7 while Bob issues the second query at time
ts=4. Should Bob’s query necessarily return a result that is consistent with a timestamp greater than 4? If we only enforce session guarantees, Bob would see a result that is potentially earlier than Alice. If users never communicate, this is fine. If however, there is some external channel through which they exchange information (which is a common scenario where a single user’s query spawns several internal queries between multiple microservices), then the results can appear incorrect. It is thus preferable to enforce the stronger guarantees of sequential consistency (or linearizability, which includes sequential consistency), and ensure that views can never go back in time. There is a performance tradeoff: new users may have to wait longer for the system to catch-up to the current timestamp (we’ll discuss in a later blog post how Materialize minimises that cost by using something called shared arrangements).
What about failures?
For the same set of inputs received, querying a view at time t should always return the same result independently of how many times the system failed and independently of when it started processing data. We want the output of a view for a given time t to be deterministic. This is useful for a couple of reasons. First, it just intuitively makes sense. Given the same time and the same sequence of events, the output should remain stable. Second, it makes a bunch of system-related concerns significantly easier to implement (like for instance durability or active replication).
The first half of this post defined two notions: first, internal consistency, and second, query consistency. It conveniently remained silent on how to actually enforce these guarantees. We’ll limit this blog post to sketching out what the core challenges are and defer implementation details to a future post (shameless plug: Materialize uses some pretty cool tricks to guarantee consistent, up-to-date results with a low memory footprint).
Let’s look at internal consistency first: how can we determine whether a view is internally consistent at time t? Specifically, how do we determine whether a view has processed all events with timestamp
ts <=t and no events with timestamp
>t. There are two technical hurdles here:
Identifying dependencies: first, we need a way to express logical or real-time dependencies between events across sources. This is challenging as different sources may rely on very different notions of time or may lack that information altogether. Moreover, data analytic pipelines may not always have the ability to modify their data streams to capture these dependencies. It may thus be necessary to instantiate separate metadata sources to summarise that information. How can we capture the fact that a router function activation in stream 2 was caused by a packet arriving (an event which we might have recorded in stream 1)?
Closing timestamps: second, we need a way to determine when timestamps are “closed”, i.e., when a given datastream or view is guaranteed never to issue an event with that timestamp. This information is not always readily available. Debezium for instance, one of the leading change data capture tools, does not allow users to determine when they have seen all the events of a particular transaction. Determining when a timestamp has closed is also problematic when sources issue no data: consider a source that is updated once at time
t=1 and never again. It should still be possible to “close” timestamps
t=2, t=3, t=4. Failing to close empty timestamps would artificially delay views from becoming internally consistent as the system would be stuck waiting for data that might never arrive.
Consider the following example: We have two data sources, one which ingests customer information (left stream), and one which maintains region information (right stream). The right most stream rarely changes (possibly never) as we expand the company’s sales regions slowly. In contrast, the leftmost stream is updated frequently. We need a mechanism to state that the system will not observe any further events associated with timestamps
5,6,7 on the right stream, otherwise the join will never return a view consistent at these timestamps.
Looking at query consistency next: given the ability to determine whether a view is consistent for a timestamp
t, how do we determine how to select this timestamp
t? There are two approaches to doing this:
- User-driven: the user-driven approach asks the user to provide a timestamp t, and will return the result of the view that is consistent at t. This may cause the system to stall until the view becomes consistent (until all timestamps <=t have closed).
- System-driven: the system-driven approach places the onus on the system to identify a timestamp for which the view v is consistent. To guarantee fresh results, the system should pick the latest timestamp for which v is consistent (it would be trivial to always return the initial state, but not very useful).
Selecting the timestamp t requires making interesting trade-offs between the freshness of results, the latency with which the system can return those results, and the memory footprint associated with maintaining the views. Consider the following example:
Let’s assume that Stream 1 consists of updates to the same customer Joe, and that we want to join it against the slow moving stream on the right (i.e.: the three updates shown in the figure take a long time to arrive). Let’s further assume that all three updates to Joe arrive in quick succession. We have two options here: 1) we keep all the updates. This means that, as soon as event 1 arrives on the right hand stream, we can immediately return a view that is consistent at time t=0 to the user (thus minimising delay). As soon as the second event arrives, we can refresh our view to be consistent at timestamp
t=1, etc. We minimise latency and increase freshness, but we cause memory to increase. 2) Alternatively, we can choose to discard all intermediate updates. The flipside is that the system now has to wait until all three events in the right stream have arrived to return a consistent view, which increases latency.
Tying it all together (consistently)
The database and batch data warehousing world has matured over several decades, to the point where users can reliably expect performant and consistent answers. However, stream processors today either require that users figure this out for themselves when building consistent streaming microservices, or settle for eventual consistency. We don’t believe that users need to make this tradeoff: it is possible to have a consistent, declarative, high performance streaming platform.
In later posts we will talk about how Materialize achieves this, using shared arrangements to minimize the cost of recomputing queries, and aggressively advances timestamps to reduce latencies while maintaining consistency. In the meantime, you can register for a Materialize account here to get started, check out the source code on GitHub, or read on!