Materialize computes and maintains SQL queries as your underlying data change. This makes it especially well-suited to tracking the current state of various SQL queries and aggregates!
But, what if you want to root around in the past? Maybe you want to compare today’s numbers to yesterday‘s numbers. Maybe you want to scrub through the past, moving windows around looking for the most interesting moments where exciting things happened!
Today, we’ll build up one way to use Materialize to explore historical temporal data. As is often the case, we’ll write things in vanilla SQL, but take advantage of Materialize’s unique performance to build surprisingly reactive applications. By the end, we’ll have a few queries that taken together allow you to interactively browse aggregates for arbitrary historical ranges.
Temporal data, and queries
You’ve probably got some data with timestamps in it.
I’m going to use a fairly common data set of NYC taxi rides. You are welcome to grab it too, but it is fairly large. Feel free to grab a small subset, or just follow along for now.
Each record in this data represents a ride, and has a pick up and drop off time. Let’s take the drop off time as the “event time” for now, as this is presumably when the data (including fare, distance, etc) was finalized.
This SQL query is commonly used on this data, which is representative of some time-slice-y aggregations:
SELECT passenger_count, -- a key MIN(fare_amount), -- some aggregate MAX(fare_amount) -- some aggregate FROM tripdata -- much data GROUP BY passenger_count -- that key again
This query determines for each
passenger_count (number of folks in the taxi) the minimum and maximum fares paid. The query aggregates across all of the data, because it is meant to exercise analytic systems and that’s what’s going to do the most work. But, this isn’t always (or even usually) what folks want.
Let’s imagine instead that what you want is to subset the data by some time interval.
SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) FROM tripdata WHERE -- /--your arguments-\ drop_off BETWEEN <TIME1> AND <TIME2> GROUP BY passenger_count
This query does the same aggregation, but over a restricted amount of data that may be more meaningful to you. Perhaps the data corresponds to some month you are investigating. Perhaps you are scrubbing around through time looking for the moments of greatest disparity. In any case, you aren’t interested in watching just the one aggregate across all of the data.
A first approach: That Query
That query up there works fine. Materialize can compute and efficiently maintain it for you.
The problem is that you might want to change
TIME2 (or both!). That would make it a brand new query, and Materialize would need to start from scratch. That’s probably not the experience you were hoping for.
Ideally, you could supply pairs
(TIME1, TIME2) as data, and get results streamed out at you. You would get to keep the query the same, and just interactively change arguments to it. For bonus points, you (and others) could add arguments, and get results to multiple queries at the same time.
Let’s commit to a specific approach: a table with schema:
CREATE TABLE requests (key int8, time1 TIMESTAMP, time2 TIMESTAMP);
key is what identifies a request and the
time2 columns are the requests parameters. You (and others) can change this relation, adding in new requests and removing those no longer of interest. We’d like to build up a view in Materialize that allows these live input changes, and provides interactive, always up to date outputs.
For example, we’ll want to set something up so that when you do
INSERT INTO requests VALUES (1234, '2019-12-29 21:53:00'::timestamp, '2019-12-31 11:03:00'::timestamp);
you get something out the other end that looks like
key | passenger_count | min | max ------+-----------------+------+------ 1234 | | 0 | 92 1234 | 0 | -23 | 340 1234 | 1 | -275 | 709 1234 | 2 | -87 | 260 1234 | 3 | -55 | 470 1234 | 4 | -52 | 220 1234 | 5 | -52 | 215 1234 | 6 | -4 | 143 1234 | 7 | 7 | 7 1234 | 8 | 85 | 85
The data are messed up, with absent
passenger_count and negative fares, I agree. This is how you know it is real data.
A second approach: (Lateral) Joins
If you’ve been here on the blog before, you may have a hunch that LATERAL joins are a candidate solution. If not, check out this sweet blog post on lateral joins. It turns out that we can write a query that lateral joins
requests with our
SELECT up above to produce the results we want:
-- Lateral join `requests` and the parameterized query. CREATE MATERIALIZED VIEW results AS SELECT * FROM requests, LATERAL ( SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) FROM tripdata WHERE -- /-from requests-\ drop_off BETWEEN TIME1 AND TIME2 GROUP BY passenger_count );
LATERAL keyword exposes the columns of
requests to the subquery that follows. In particular, it allows correlation with the
TIME2 columns of
requests. The result is a correlated subquery that produces independent results for each pair of bindings.
This is syntactically and semantically great! It does exactly what we want, and is very concise.
Unfortunately, it is also very inefficient.
The problem is that Materialize cannot discern any common structure between the subqueries. The implementation will cross-join
tripdata, and then perform the reduction. Each new row in
requests will effectively prompt as much computation as a from-scratch query.
While is was easy to write, this view will not result in interactive responses to new queries.
Lateral joins work great when the parameter bindings narrow the computation, for example when they populate equality constraints. When the parameter bindings are used in other (e.g. inequality, for Materialize) constraints, it is much less obvious how to share the computation and state of the subqueries.
A third approach: Time Slicing
While naive lateral joins could not identify commonality between the subquery for
requests, that commonality does exist. We just need to figure out how to express it to Materialize.
To see it, imagine all
TIME2 bindings were cleanly on a day boundary. We could first perform the aggregation above for each day.
-- Reduce down to per-day aggregates CREATE VIEW daily_aggregates AS SELECT passenger_count, date_trunc('day', drop_off) as time, MIN(fare_amount) as min_fare_amount, MAX(fare_amount) as max_fare_amount FROM tripdata GROUP BY passenger_count, date_trunc('day', drop_off);
From this reduced data, each of our requests could then pick up and stitch together their days of interest. The reduction down to days is common across all requests, although each then has its own unique work to do assembling the aggregates. It’s not immediately clear how to do that assembly, but we’ll get there. It turns out it is much less work than re-reading the whole
The restriction to day-aligned request intervals is a pretty big one. What if the request times aren’t aligned to days but to hours instead?
-- Reduce down to per-hour aggregates CREATE VIEW hourly_aggregates AS SELECT passenger_count, date_trunc('hour', drop_off) as time, MIN(fare_amount) as min_fare_amount, MAX(fare_amount) as max_fare_amount FROM tripdata GROUP BY passenger_count, date_trunc('hour', drop_off);
Now we have aggregates at the granularity of hours. We can do minutes too!
You may think we’ve made life harder because there are so many more aggregates to put back together. There are 24 times as many hours as there are days, and 1,440 times as many minutes as days. That is a lot more work to do than when we had to point out the days of interest. Expressing a request for a week-long interval would require 10,080 minutes as input.
However, no one said you had to use only minutes. Or only hours. You can cover most of your hypothetical week with daily aggregates, six at least. You can then just grab a few houly aggregates at each end, and a few minutely aggregates if you want those too.
Let’s spell this out with an example, as it will be important to be clear. Let’s say your request times are
| key | time1 | time2 | |------:|--------------------:|--------------------:| | 12345 | 2019-12-29 21:53:00 | 2019-12-31 11:03:00 |
If we want to collect aggregates that cover the span from
time2, we can do that with the following intervals. Notice that all of these intervals are either a minute, an an hour, or a day.
| key | time1 | time2 | |------:|--------------------:|--------------------:| | 12345 | 2019-12-29 21:53:00 | 2019-12-29 21:54:00 | | 12345 | 2019-12-29 21:54:00 | 2019-12-29 21:55:00 | | 12345 | 2019-12-29 21:55:00 | 2019-12-29 21:56:00 | | 12345 | 2019-12-29 21:56:00 | 2019-12-29 21:57:00 | | 12345 | 2019-12-29 21:57:00 | 2019-12-29 21:58:00 | | 12345 | 2019-12-29 21:58:00 | 2019-12-29 21:59:00 | | 12345 | 2019-12-29 21:59:00 | 2019-12-29 22:00:00 | | 12345 | 2019-12-29 22:00:00 | 2019-12-29 23:00:00 | | 12345 | 2019-12-29 23:00:00 | 2019-12-30 00:00:00 | | 12345 | 2019-12-30 00:00:00 | 2019-12-31 00:00:00 | | 12345 | 2019-12-31 00:00:00 | 2019-12-31 01:00:00 | ... approximately 10 hours later ... | 12345 | 2019-12-30 11:00:00 | 2019-12-30 11:01:00 | | 12345 | 2019-12-30 11:01:00 | 2019-12-30 11:02:00 | | 12345 | 2019-12-30 11:02:00 | 2019-12-30 11:03:00 |
It turns out that we chose a relatively concise request interval for this example. In general, you might need as many as ~23-ish hours and ~59-ish minutes on each end of the interval. But, this is not nearly as intractable a number of intervals as if done minute-by-minute.
That sure is a lot of input to provide to
requests, isn’t it? It seems like it would be tedious to do, and easy to get wrong. Absolutely! We’ll write some queries in the appendix that automatically produce these intervals for you!
For now, let’s imagine for now we have your request input in this more expansive representation. How might you get your aggregate results out? What SQL queries do we need to write to make that happen?
First, we need to take our daily, hourly, and minutely aggregates and turn them in to intervals. This is no more complicated than (shown for days, but the same structure for hours and minutes):
-- Reframe daily aggregates using an interval. CREATE VIEW daily_intervals AS SELECT passenger_count, time as time1, time + INTERVAL '1 day' as time2, min_fare_amount, max_fare_amount FROM daily_aggregates;
We now have the data written down as a key (
passenger_count), an interval (
time2), and the aggregate values (
max_fare_amount). We can repeat this for hours and minutes.
The reason we convert to intervals is so that we can put all records in the same collection.
-- Homogenous collection of aggregates by intervals. CREATE VIEW all_intervals AS SELECT * FROM daily_intervals UNION ALL SELECT * FROM hourly_intervals UNION ALL SELECT * FROM minutely_intervals;
We can safely use
UNION ALL instead of
UNION because all the measurements are distinct: they are distinct in each input, and intervals from different inputs have different widths. It turns out
UNION ALL is much more efficient than
UNION, because we don’t need to do the work of deduplication.
Now, we can just join the expanded
all_intervals, and aggregate out the various time intervals to get accumulated results.
-- Select out intervals of interest and aggregate. CREATE MATERIALIZED VIEW results AS SELECT key, passenger_count, MIN(min_fare_amount), MAX(max_fare_amount) FROM requests, -- IMPORTANT: as days, hours, and minutes all_intervals WHERE requests.time1 = all_intervals.time1 AND requests.time2 = all_intervals.time2 GROUP BY key, passenger_count;
This view will collect the relevant intervals, and apply the reduction functions to the aggregates from each of the intervals. However, each request starts from relatively few bits of pre-aggregated data, rather than reconsidering the entire collection.
Trying it out
We’ve just defined a materialized view, and Materialize will keep this up to date as the input data change. Let’s see just how interactive it is.
We’ll open up another Materialize session and use the handy
materialize=> COPY (TAIL results) TO STDOUT; 1609383407362 1 1234 0 92 1609383407362 1 1234 0 -23 340 1609383407362 1 1234 1 -275 709 1609383407362 1 1234 2 -87 260 1609383407362 1 1234 3 -55 470 1609383407362 1 1234 4 -52 220 1609383407362 1 1234 5 -52 215 1609383407362 1 1234 6 -4 143 1609383407362 1 1234 7 7 7 1609383407362 1 1234 8 85 85 NOTICE: TAIL waiting for more data NOTICE: TAIL waiting for more data NOTICE: TAIL waiting for more data ...
This gives us a consistent snapshot of the current results (our test request we
INSERTed up above). The snapshot is then followed by a live stream of timestamped updates, each describing a new consistent state. As it happens, there are no updates because I am currently changing neither
As soon as we type into our first shell something like (note: new times):
INSERT INTO requests VALUES (123, '2019-12-23 21:53:00'::timestamp, '2019-12-29 11:03:00'::timestamp);
we should see some new results appear in the
... NOTICE: TAIL waiting for more data 1609383413088 1 123 -40 108 1609383413088 1 123 0 -78 400 1609383413088 1 123 1 -275 743 1609383413088 1 123 2 -222 499 1609383413088 1 123 3 -80 499 1609383413088 1 123 4 -150 499 1609383413088 1 123 5 -52 231 1609383413088 1 123 6 -65 168 1609383413088 1 123 7 4 74 1609383413088 1 123 8 80 85 1609383413088 1 123 9 9 70 NOTICE: TAIL waiting for more data NOTICE: TAIL waiting for more data ...
I don’t have anything quantitative for you about how long this took, other than “apparently immediately”. There was no perceptible amount of time between insertion and reporting the new results. This makes sense, as we just had to track down tens of records and accumulate them. We didn’t even have to build a new dataflow, as the
results materialized view is already up and running.
Should the underlying
tripdata collection change, each of the registered queries will have updates pushed at them. In this case, the input data come pre-sliced by time, so loading the data is the main way to change it.
We’ve seen a fairly reproducible pattern for slicing out intervals of time from aggregations. Our example used relatively few keys, and only did minimum and maximum aggregations. However, it should be clear-ish that the approach generalizes, to more keys and other aggregations.
What I like about this example is that we’ve implemented, using SQL, a reactive computation with interesting performance properties. We used our knowledge and understanding of computer science, and were able to do something better as a result. While it can be great to blast out the first SQL that you can think of, Materialize responds well to more precise direction.
If you’d like to take this example out for a spin, go and grab a copy of Materialize now!
Appendix: All the views you need to write
I wrote all the views down just to make sure that they worked. It took a fair bit of unhelpful-reading SQL to do. But, I wanted to make sure you had access to them to reproduce this, if you wanted!
There are three classes of things to do:
- Convert input request intervals to those of minutes, hours, and days,
- Group and aggregate data by minutes, hours, and days,
- Join request intervals with aggregated data, with a final aggregation.
Converting request intervals to minutes, hours, and days
We asked the user to provide pre-sliced intervals. That seems error-prone. Surely we can do that for them?
Indeed we can, but my version is pretty terrible to read. Perhaps it can be improved.
Let’s imagine that we have a relation
(key, time1, time2) triples, and no requirement that they be aligned to days, hours, minutes. We need to peel out some minutes near
time1, then some hours, then some days, then some hours, and then some minutes ending at
The logic to do this isn’t impossible, or even that hard, just wordy.
Here’s what I wrote for the “minutes near
-- Try out each of the 60 minutes near `time1`; -- accept those that lie entirely between `time1` and `time2`. CREATE VIEW time1_minutes AS SELECT key, date_trunc('hour', queries.time1) + x * INTERVAL '1 minute' as time1, date_trunc('hour', queries.time1) + (x + 1) * INTERVAL '1 minute' as time2 FROM requests, generate_series(0, 59) x WHERE -- The entire interval must lie between `time1` and `time2`. queries.time1 <= date_trunc('hour', queries.time1) + x * INTERVAL '1 minute' AND queries.time2 >= date_trunc('hour', queries.time1) + (x + 1) * INTERVAL '1 minute';
In case you read SQL as well as I do, what’s going on here is that we pull out the hour of
time1, and try out the 60 one-minute intervals after it. Each interval is kept only if it starts after
time1, and ends before
It’s actually not that complicated, computationally (it is a
flat_map in Materialize, which maintains no state). The logic generalizes to hours, days, etc., and can be used on the way back down if you round from
time2 instead of
You’ll then need to take all of these intervals and union them together
-- union together derived "aligned" intervals. CREATE VIEW request_intervals AS SELECT * FROM time1_minutes UNION SELECT * FROM time1_hours UNION SELECT * FROM days UNION SELECT * FROM time2_hours UNION SELECT * FROM time2_minutes;
You’ll notice I’ve invented
days here. I’ll leave that as homework for you. It’s also worth stressing that I used
UNION rather than
UNION ALL. There can be repetition of these intervals if e.g.
time2 are within the same day (or hour).
Grouping and aggregate data by minute, hour, and day
The input data need to be grouped into intervals by minute, hour, and day.
I did that by first dropping each record in each of the types of interval. For example, here are the daily intervals. Notice that there is no aggregation yet.
-- Drop the data into multiple windows CREATE VIEW daily_intervals AS SELECT passenger_count, date_trunc('day', dropoff) as time1, date_trunc('day', dropoff) + INTERVAL '1 day' as time2, fare_amount FROM tripdata;
We can do the same thing for hours and minutes. Once we’ve produce those views too, we can aggregate them up.
-- Union contributions to each interval, and aggregate. CREATE VIEW all_intervals AS SELECT passenger_count, time1, time2, MIN(fare_amount) as min_fare_amount, MAX(fare_amount) as max_fare_amount FROM ( SELECT * FROM daily_aggregates UNION ALL SELECT * FROM hourly_aggregates UNION ALL SELECT * FROM minutely_aggregates ) GROUP BY passenger_count, time1, time2;
UNION ALL here because it is more efficient than
UNION, and no records will be duplicated across the inputs because the intervals have different widths.
Finally, I knew that I wanted access to this information by
(time1, time2). I intentionally left the input views unmaterialized until this point, and only now create an appropriate index. This is the point at which your computer will start chugging away, reading data and splitting it off into various intervals and aggregates.
-- Index `all_intervals` by `(time1, time2)`. CREATE INDEX all_by_intervals ON all_intervals (time1, time2);
Join queries and input data, and reduce
Finally, we need to join the
request_intervals collection (of multiple aligned intervals) with the
all_intervals collection (of input data contributions). This will select out those aggregations that will contribute to each of the queries. Finally, we need to do a finishing aggregation to reduce the partial aggregates to one value.
-- Join queries and input data, and finish the reduction. CREATE MATERIALIZED VIEW results AS SELECT key, passenger_count, MIN(min_fare_amount), MAX(max_fare_amount) FROM request_intervals, all_intervals WHERE request_intervals.time1 = all_intervals.time1 AND request_intervals.time2 = all_intervals.time2 GROUP BY key, passenger_count;