Temporal filters (time windows)
A temporal filter is a WHERE
or HAVING
clause which uses the mz_now()
function.
This function returns Materialize’s current virtual timestamp, which works to keep up with real time as data is processed.
Applying a temporal filter reduces the working dataset, saving memory resources and focusing results on the recent past.
Here is a typical temporal filter that considers records whose timestamps are within the last 5 minutes.
WHERE mz_now() <= event_ts + INTERVAL '5min'
Consider this diagram that shows a record B
falling out of the result set as time moves forward:
WHERE event_ts >= mz_now() - INTERVAL '5min'
.
However, there are currently no valid operators for the mz_timestamp
type that would allow this.
Requirements
You can only use mz_now()
to establish a temporal filter under the following conditions:
mz_now()
appears in aWHERE
orHAVING
clause.- The clause must compare
mz_now()
to anumeric
ortimestamp
expression not containingmz_now()
- The comparison must be one of
=
,<
,<=
,>
, or>=
, or operators that desugar to them or a conjunction of them (for example,BETWEEN...AND...
). At the moment, you can’t use the!=
operator withmz_now()
.
You cannot use temporal filters in the WHERE
clause of an aggregate FILTER
expression.
Examples
These examples create real objects. After you have tried the examples, make sure to drop these objects and spin down any resources you may have created.
Sliding window
It is common for real-time applications to be concerned with only a recent period of time. We call this a sliding window. Other systems use this term differently because they cannot achieve a continuously sliding window.
In this case, we will filter a table to only include only records from the last 30 seconds.
-
First, create a table called
events
and a view of the most recent 30 seconds of events.--Create a table of timestamped events. CREATE TABLE events ( content TEXT, event_ts TIMESTAMP ); -- Create a view of events from the last 30 seconds. CREATE VIEW last_30_sec AS SELECT event_ts, content FROM events WHERE mz_now() <= event_ts + INTERVAL '30s';
-
Next, subscribe to the results of the view.
COPY (SUBSCRIBE (SELECT ts, content FROM last_30_sec)) TO STDOUT;
-
In a separate session, insert a record.
INSERT INTO events VALUES ('hello', now());
-
Back in the first session, watch the record expire after 30 seconds.
1686868190714 1 2023-06-15 22:29:50.711 hello 1686868220712 -1 2023-06-15 22:29:50.711 hello
Press
Ctrl+C
to quit theSUBSCRIBE
when you are ready.
You can materialize the last_30_sec
view by creating an index on it (results stored in memory) or by recreating it as a MATERIALIZED VIEW
(results persisted to storage). When you do so, Materialize will keep the results up to date with records expiring automatically according to the temporal filter.
Time-to-Live (TTL)
The time to live (TTL) pattern helps to filter rows with user-defined expiration times.
This example uses a tasks
table with a time to live for each task.
Materialize then helps perform actions according to each task’s expiration time.
-
First, create a table:
CREATE TABLE tasks (name TEXT, created_ts TIMESTAMP, ttl INTERVAL);
-
Add some tasks to track:
INSERT INTO tasks VALUES ('send_email', now(), INTERVAL '5 minutes'); INSERT INTO tasks VALUES ('time_to_eat', now(), INTERVAL '1 hour'); INSERT INTO tasks VALUES ('security_block', now(), INTERVAL '1 day');
-
Create a view using a temporal filter over the expiration time. For our example, the expiration time represents the sum between the task’s
created_ts
and itsttl
.CREATE MATERIALIZED VIEW tracking_tasks AS SELECT name, created_ts + ttl as expiration_time FROM tasks WHERE mz_now() < created_ts + ttl;
The moment
mz_now()
crosses the expiration time of a record, that record is retracted (removed) from the result set.
You can now:
-
Query the remaining time for a row:
SELECT expiration_time - now() AS remaining_ttl FROM tracking_tasks WHERE name = 'time_to_eat';
-
Check if a particular row is still available:
SELECT true FROM tracking_tasks WHERE name = 'security_block';
-
Trigger an external process when a row expires:
INSERT INTO tasks VALUES ('send_email', now(), INTERVAL '5 seconds'); COPY( SUBSCRIBE tracking_tasks WITH (SNAPSHOT = false) ) TO STDOUT;
mz_timestamp | mz_diff | name | expiration_time | -------------|---------|------------|-----------------| ... | -1 | send_email | ... | <-- Time to send the email!
Periodically emit results
Suppose you want to count the number of records in each 1 minute time window, grouped by an id
column.
You don’t care to receive every update as it happens; instead, you would prefer Materialize to emit a single result at the end of each window.
Materialize date functions are helpful for use cases like this where you want to bucket records into time windows.
The strategy for this example is to put an initial temporal filter on the input (say, 30 days) to bound it, use the date_bin
function to bin records into 1 minute windows, use a second temporal filter to emit results at the end of the window, and finally apply a third temporal filter shorter than the first (say, 7 days) to set how long results should persist in Materialize.
- First, create a table for the input records.
CREATE TABLE input (id INT, event_ts TIMESTAMP);
- Create a view that filters the input for the most recent 30 days and buckets records into 1 minute windows.
CREATE VIEW input_recent_bucketed AS SELECT id, date_bin( '1 minute', event_ts, '2000-01-01 00:00:00+00' ) + INTERVAL '1 minute' AS window_end FROM input WHERE mz_now() <= event_ts + INTERVAL '30 days';
- Create the final output view that does the aggregation and maintains 7 days worth of results.
This
CREATE MATERIALIZED VIEW output AS SELECT id, count(id) AS count, window_end FROM input_recent_bucketed WHERE mz_now() >= window_end AND mz_now() < window_end + INTERVAL '7 days' GROUP BY window_end, id;
WHERE
clause means “the result for a 1-minute window should come into effect whenmz_now()
reacheswindow_end
and be removed 7 days later”. Without the latter constraint, records in the result set would receive strange updates as records expire from the initial 30 day filter on the input. - Subscribe to the
output
.COPY (SUBSCRIBE (SELECT * FROM output)) TO STDOUT;
- In a different session, insert some records.
INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (2, now());
- Back at the
SUBSCRIBE
, wait about a minute for your final aggregation result to show up the moment the 1 minute window ends.
If you are very patient, you will see these results retracted in 7 days. Pressmz_timestamp | mz_diff | id | count | window_end --------------|---------|-------|-------|---------------------- 1686889140000 1 1 3 2023-06-16 04:19:00 1686889140000 1 2 1 2023-06-16 04:19:00
Ctrl+C
to exit theSUBSCRIBE
when you are finished playing.
From here, you could create a Kafka sink and use Kafka Connect to archive the historical results to a data warehouse (ignoring Kafka tombstone records that represent retracted results).
Late arriving events
For various reasons, it’s possible for records to arrive out of order. For example, network connectivity issues may cause a mobile device to emit data with a timestamp from the relatively distant past. How can you account for late arriving data in Materialize?
Consider the temporal filter for the most recent hour’s worth of records.
WHERE mz_now() <= event_ts + INTERVAL '1hr'
Suppose a record with a timestamp 11:00:00
arrives “late” with a virtual timestamp of 11:59:59
and you query this collection at a virtual timestamp of 12:00:00
.
According to the temporal filter, the record is included for results as of virtual time 11:59:59
and retracted just after 12:00:00
.
Let’s say another record comes in with a timestamp of 11:00:00
, but mz_now()
has marched forward to 12:00:01
.
Unfortunately, this record does not pass the filter and is excluded from processing altogether.
In conclusion: if you want to account for late arriving data up to some given time duration, you must adjust your temporal filter to allow for such records to make an appearance in the result set. This is often referred to as a grace period.
Temporal filter pushdown
All of the queries in the previous examples only return results based on recently-added events. Materialize can “push down” filters that match this pattern all the way down to its storage layer, skipping over old data that’s not relevant to the query. Here are the key benefits of this optimization:
- For ad-hoc
SELECT
queries, temporal filter pushdown can substantially improve query latency. - When a materialized view is created or the cluster maintaining it restarts, temporal filter pushdown can substantially reduce the time it takes to start serving results.
The columns filtered should correlate with the insertion or update time of the row.
In the examples above, the event_ts
value in each event correlates with the time the event was inserted, so filters that reference these columns should be pushed down to the storage layer.
However, the values in the content
column are not correlated with insertion time in any way, so filters against content
will probably not be pushed down to the storage layer.
Temporal filters that consist of arithmetic, date math, and comparisons are eligible for pushdown, including all the examples in this page.
However, more complex filters might not be. You can check whether the filters in your query can be pushed down by using the filter_pushdown
option in an EXPLAIN
statement. For example:
EXPLAIN WITH(filter_pushdown)
SELECT count(*)
FROM events
WHERE mz_now() <= event_ts + INTERVAL '30s';
----
Explained Query:
[...]
Source materialize.public.events
filter=((mz_now() <= timestamp_to_mz_timestamp((#1 + 00:00:30))))
pushdown=((mz_now() <= timestamp_to_mz_timestamp((#1 + 00:00:30))))
The filter in our query appears in the pushdown=
list at the bottom of the output, so the filter pushdown optimization will be able to filter out irrelevant ranges of data in that source and make the overall query more efficient.
Some common functions, such as casting from a string to a timestamp, can prevent filter pushdown for a query. For similar functions that do allow pushdown, see the pushdown functions documentation.