Temporal filters (time windows)
A temporal filter is a
HAVING clause which uses the
This function returns Materialize’s current virtual timestamp, which works to keep up with real time as data is processed.
Applying a temporal filter reduces the working dataset, saving memory resources and focusing results on the recent past.
Here is a typical temporal filter that considers records whose timestamps are within the last 5 minutes.
WHERE mz_now() <= event_ts + INTERVAL '5min'
Consider this diagram that shows a record
B falling out of the result set as time moves forward:
WHERE event_ts >= mz_now() - INTERVAL '5min'. However, there are currently no valid operators for the
mz_timestamptype that would allow this.
You can only use
mz_now() to establish a temporal filter under the following conditions:
mz_now()appears in a
- The clause must compare
timestampexpression not containing
- The comparison must be one of
>=, or operators that desugar to them or a conjunction of them (for example,
BETWEEN...AND...). At the moment, you can’t use the
You cannot use temporal filters in the
WHERE clause of an aggregate
These examples create real objects. After you have tried the examples, make sure to drop these objects and spin down any resources you may have created.
It is common for real-time applications to be concerned with only a recent period of time. We call this a sliding window. Other systems use this term differently because they cannot achieve a continuously sliding window.
In this case, we will filter a table to only include only records from the last 30 seconds.
First, create a table called
eventsand a view of the most recent 30 seconds of events.
--Create a table of timestamped events. CREATE TABLE events ( content TEXT, event_ts TIMESTAMP ); -- Create a view of events from the last 30 seconds. CREATE VIEW last_30_sec AS SELECT event_ts, content FROM events WHERE mz_now() <= event_ts + INTERVAL '30s';
Next, subscribe to the results of the view.
COPY (SUBSCRIBE (SELECT ts, content FROM last_30_sec)) TO STDOUT;
In a separate session, insert a record.
INSERT INTO events VALUES ('hello', now());
Back in the first session, watch the record expire after 30 seconds.
1686868190714 1 2023-06-15 22:29:50.711 hello 1686868220712 -1 2023-06-15 22:29:50.711 hello
Ctrl+Cto quit the
SUBSCRIBEwhen you are ready.
You can materialize the
last_30_sec view by creating an index on it (results stored in memory) or by recreating it as a
MATERIALIZED VIEW (results persisted to storage). When you do so, Materialize will keep the results up to date with records expiring automatically according to the temporal filter.
The time to live (TTL) pattern helps to filter rows with user-defined expiration times.
This example uses a
tasks table with a time to live for each task.
Materialize then helps perform actions according to each task’s expiration time.
First, create a table:
CREATE TABLE tasks (name TEXT, created_ts TIMESTAMP, ttl INTERVAL);
Add some tasks to track:
INSERT INTO tasks VALUES ('send_email', now(), INTERVAL '5 minutes'); INSERT INTO tasks VALUES ('time_to_eat', now(), INTERVAL '1 hour'); INSERT INTO tasks VALUES ('security_block', now(), INTERVAL '1 day');
Create a view using a temporal filter over the expiration time. For our example, the expiration time represents the sum between the task’s
CREATE MATERIALIZED VIEW tracking_tasks AS SELECT name, created_ts + ttl as expiration_time FROM tasks WHERE mz_now() < created_ts + ttl;
mz_now()crosses the expiration time of a record, that record is retracted (removed) from the result set.
You can now:
Query the remaining time for a row:
SELECT expiration_time - now() AS remaining_ttl FROM tracking_tasks WHERE name = 'time_to_eat';
Check if a particular row is still available:
SELECT true FROM tracking_tasks WHERE name = 'security_block';
Trigger an external process when a row expires:
INSERT INTO tasks VALUES ('send_email', now(), INTERVAL '5 seconds'); COPY( SUBSCRIBE tracking_tasks WITH (SNAPSHOT = false) ) TO STDOUT;
mz_timestamp | mz_diff | name | expiration_time | -------------|---------|------------|-----------------| ... | -1 | send_email | ... | <-- Time to send the email!
Periodically emit results
Suppose you want to count the number of records in each 1 minute time window, grouped by an
You don’t care to receive every update as it happens; instead, you would prefer Materialize to emit a single result at the end of each window.
Materialize date functions are helpful for use cases like this where you want to bucket records into time windows.
The strategy for this example is to put an initial temporal filter on the input (say, 30 days) to bound it, use the
date_bin function to bin records into 1 minute windows, use a second temporal filter to emit results at the end of the window, and finally apply a third temporal filter shorter than the first (say, 7 days) to set how long results should persist in Materialize.
- First, create a table for the input records.
CREATE TABLE input (id INT, event_ts TIMESTAMP);
- Create a view that filters the input for the most recent 30 days and buckets records into 1 minute windows.
CREATE VIEW input_recent_bucketed AS SELECT id, date_bin( '1 minute', event_ts, '2000-01-01 00:00:00+00' ) + INTERVAL '1 minute' AS window_end FROM input WHERE mz_now() <= event_ts + INTERVAL '30 days';
- Create the final output view that does the aggregation and maintains 7 days worth of results.
CREATE MATERIALIZED VIEW output AS SELECT id, count(id) AS count, window_end FROM input_recent_bucketed WHERE mz_now() >= window_end AND mz_now() < window_end + INTERVAL '7 days' GROUP BY window_end, id;
WHEREclause means “the result for a 1-minute window should come into effect when
window_endand be removed 7 days later”. Without the latter constraint, records in the result set would receive strange updates as records expire from the initial 30 day filter on the input.
- Subscribe to the
COPY (SUBSCRIBE (SELECT * FROM output)) TO STDOUT;
- In a different session, insert some records.
INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (1, now()); -- wait a moment INSERT INTO input VALUES (2, now());
- Back at the
SUBSCRIBE, wait about a minute for your final aggregation result to show up the moment the 1 minute window ends.
mz_timestamp | mz_diff | id | count | window_end --------------|---------|-------|-------|---------------------- 1686889140000 1 1 3 2023-06-16 04:19:00 1686889140000 1 2 1 2023-06-16 04:19:00
If you are very patient, you will see these results retracted in 7 days. Press
Ctrl+Cto exit the
SUBSCRIBEwhen you are finished playing.
From here, you could create a Kafka sink and use Kafka Connect to archive the historical results to a data warehouse (ignoring Kafka tombstone records that represent retracted results).
Late arriving events
For various reasons, it’s possible for records to arrive out of order. For example, network connectivity issues may cause a mobile device to emit data with a timestamp from the relatively distant past. How can you account for late arriving data in Materialize?
Consider the temporal filter for the most recent hour’s worth of records.
WHERE mz_now() <= event_ts + INTERVAL '1hr'
Suppose a record with a timestamp
11:00:00 arrives “late” with a virtual timestamp of
11:59:59 and you query this collection at a virtual timestamp of
According to the temporal filter, the record is included for results as of virtual time
11:59:59 and retracted just after
Let’s say another record comes in with a timestamp of
mz_now() has marched forward to
Unfortunately, this record does not pass the filter and is excluded from processing altogether.
In conclusion: if you want to account for late arriving data up to some given time duration, you must adjust your temporal filter to allow for such records to make an appearance in the result set. This is often referred to as a grace period.
Temporal filter pushdown
All of the queries in the previous examples only return results based on recently-added events. Materialize can “push down” filters that match this pattern all the way down to its storage layer, skipping over old data that’s not relevant to the query. Here are the key benefits of this optimization:
- For ad-hoc
SELECTqueries, temporal filter pushdown can substantially improve query latency.
- When a materialized view is created or the cluster maintaining it restarts, temporal filter pushdown can substantially reduce the time it takes to start serving results.
The columns filtered should correlate with the insertion or update time of the row.
In the examples above, the
event_ts value in each event correlates with the time the event was inserted, so filters that reference these columns should be pushed down to the storage layer.
However, the values in the
content column are not correlated with insertion time in any way, so filters against
content will probably not be pushed down to the storage layer.
Temporal filters that consist of arithmetic, date math, and comparisons are eligible for pushdown, including all the examples in this page.
However, more complex filters might not be. You can check whether the filters in your query can be pushed down by using the
filter_pushdown option in an
EXPLAIN statement. For example:
EXPLAIN WITH(filter_pushdown) SELECT count(*) FROM events WHERE mz_now() <= event_ts + INTERVAL '30s'; ---- Explained Query: [...] Source materialize.public.events filter=((mz_now() <= timestamp_to_mz_timestamp((#1 + 00:00:30)))) pushdown=((mz_now() <= timestamp_to_mz_timestamp((#1 + 00:00:30))))
The filter in our query appears in the
pushdown= list at the bottom of the output, so the filter pushdown optimization will be able to filter out irrelevant ranges of data in that source and make the overall query more efficient.
Some common functions, such as casting from a string to a timestamp, can prevent filter pushdown for a query. For similar functions that do allow pushdown, see the pushdown functions documentation.