Use dbt to manage Materialize
dbt-materialize
adapter can only be used with dbt Core. Making the
adapter available in dbt Cloud depends on prioritization by dbt Labs. If you
require dbt Cloud support, please reach out to the dbt Labs team.
dbt has become the standard for data transformation (“the T in ELT”). It combines the accessibility of SQL with software engineering best practices, allowing you to not only build reliable data pipelines, but also document, test and version-control them.
In this guide, weβll cover how to use dbt and Materialize to transform streaming data in real time β from model building to continuous testing.
Setup
Setting up a dbt project with Materialize is similar to setting it up with any other database that requires a non-native adapter. To get up and running, you need to:
-
Install the
dbt-materialize
plugin (optionally using a virtual environment):python3 -m venv dbt-venv # create the virtual environment source dbt-venv/bin/activate # activate the virtual environment pip install dbt-core dbt-materialize # install dbt-core and the adapter
The installation will include the
dbt-postgres
dependency. To check that the plugin was successfully installed, run:dbt --version
materialize
should be listed under “Plugins”. If this is not the case, double-check that the virtual environment is activated! -
To get started, make sure you have a Materialize account.
Create and configure a dbt project
A dbt project is
a directory that contains all dbt needs to run and keep track of your
transformations. At a minimum, it must have a project file
(dbt_project.yml
) and at least one model
(.sql
).
To create a new project, run:
dbt init <project_name>
This command will bootstrap a starter project with default configurations and
create a profiles.yml
file, if it doesn’t exist. To help you get started, the
dbt init
project includes sample models to run the Materialize quickstart.
Connect to Materialize
dbt manages all your connection configurations (or, profiles) in a file called
profiles.yml
. By
default, this file is located under ~/.dbt/
.
-
Locate the
profiles.yml
file in your machine:dbt debug --config-dir
Note: If you started from an existing project but it’s your first time setting up dbt, it’s possible that this file doesn’t exist yet. You can manually create it in the suggested location.
-
Open
profiles.yml
and adapt it to connect to Materialize using the reference profile configuration.As an example, the following profile would allow you to connect to Materialize in two different environments: a developer environment (
dev
) and a production environment (prod
).default: outputs: prod: type: materialize threads: 1 host: <host> port: 6875 # Materialize user or service account (recommended) # to connect as user: <user@domain.com> pass: <password> database: materialize schema: public # optionally use the cluster connection # parameter to specify the default cluster # for the connection cluster: <prod_cluster> sslmode: require dev: type: materialize threads: 1 host: <host> port: 6875 user: <user@domain.com> pass: <password> database: <dev_database> schema: <dev_schema> cluster: <dev_cluster> sslmode: require target: dev
The
target
parameter allows you to configure the target environment that dbt will use to run your models. -
To test the connection to Materialize, run:
dbt debug
If the output reads
All checks passed!
, you’re good to go! The dbt documentation has some helpful pointers in case you run into errors.
Build and run dbt models
For dbt to know how to persist (or not) a transformation, the model needs to be
associated with a materialization
strategy. Because Materialize is optimized for real-time transformations of
streaming data and the core of dbt is built around batch, the dbt-materialize
adapter implements a few custom materialization types:
Type | Details | Config options |
---|---|---|
source | Creates a source. | cluster, indexes |
view | Creates a view. | indexes |
materialized_view | Creates a materialized view. The materializedview legacy materialization name is supported for backwards compatibility. |
cluster, indexes |
table | Creates a materialized view (actual table support pending discussion#29633). | cluster, indexes |
sink | Creates a sink. | cluster |
ephemeral | Executes queries using CTEs. |
Create a materialization for each SQL statement you’re planning to deploy. Each
individual materialization should be stored as a .sql
file under the
directory defined by model-paths
in dbt_project.yml
.
Sources
In Materialize, a source describes an external system
you want to read data from, and provides details about how to decode and
interpret that data. You can instruct dbt to create a source using the custom
source
materialization. Once a source has been defined, it can be referenced
from another model using the dbt ref()
or source()
functions.
source
models.
Create a Kafka source.
Filename: sources/kafka_topic_a.sql
{{ config(materialized='source') }}
FROM KAFKA CONNECTION kafka_connection (TOPIC 'topic_a')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
The source above would be compiled to:
database.schema.kafka_topic_a
Create a PostgreSQL source.
Filename: sources/pg.sql
{{ config(materialized='source') }}
FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES
Materialize will automatically create a subsource for each table in the
mz_source
publication. Pulling subsources into the dbt context automatically
isn’t supported yet. Follow the discussion in dbt-core #6104
for updates!
A possible workaround is to define PostgreSQL sources as a dbt source
in a .yml
file, nested under a sources:
key, and list each subsource under
the tables:
key.
sources:
- name: pg
schema: "{{ target.schema }}"
tables:
- name: table_a
- name: table_b
Once a subsource has been defined this way, it can be referenced from another
model using the dbt source()
function. To ensure that dbt is able to determine the proper order to run the
models in, you should additionally force a dependency on the parent source
model (pg
), as described in the dbt documentation.
Filename: staging/dep_subsources.sql
-- depends_on: {{ ref('pg') }}
{{ config(materialized='view') }}
SELECT
table_a.foo AS foo,
table_b.bar AS bar
FROM {{ source('pg','table_a') }}
INNER JOIN
{{ source('pg','table_b') }}
ON table_a.id = table_b.foo_id
The source and subsources above would be compiled to:
database.schema.pg
database.schema.table_a
database.schema.table_b
Create a MySQL source.
Filename: sources/mysql.sql
{{ config(materialized='source') }}
FROM MYSQL CONNECTION mysql_connection
FOR ALL TABLES;
Materialize will automatically create a subsource for each table in the upstream database. Pulling subsources into the dbt context automatically isn’t supported yet. Follow the discussion in dbt-core #6104 for updates!
A possible workaround is to define MySQL sources as a dbt source
in a .yml
file, nested under a sources:
key, and list each subsource under
the tables:
key.
sources:
- name: mysql
schema: "{{ target.schema }}"
tables:
- name: table_a
- name: table_b
Once a subsource has been defined this way, it can be referenced from another
model using the dbt source()
function. To ensure that dbt is able to determine the proper order to run the
models in, you should additionally force a dependency on the parent source
model (mysql
), as described in the dbt documentation.
Filename: staging/dep_subsources.sql
-- depends_on: {{ ref('mysql') }}
{{ config(materialized='view') }}
SELECT
table_a.foo AS foo,
table_b.bar AS bar
FROM {{ source('mysql','table_a') }}
INNER JOIN
{{ source('mysql','table_b') }}
ON table_a.id = table_b.foo_id
The source and subsources above would be compiled to:
database.schema.mysql
database.schema.table_a
database.schema.table_b
Create a webhook source.
Filename: sources/webhook.sql
{{ config(materialized='source') }}
FROM WEBHOOK
BODY FORMAT JSON
CHECK (
WITH (
HEADERS,
BODY AS request_body,
-- Make sure to fully qualify the secret if it isn't in the same
-- namespace as the source!
SECRET basic_hook_auth
)
constant_time_eq(headers->'authorization', basic_hook_auth)
);
The source above would be compiled to:
database.schema.webhook
Views and materialized views
In dbt, a model
is a SELECT
statement that encapsulates a data transformation you want to run
on top of your database. When you use dbt with Materialize, your models stay
up-to-date without manual or configured refreshes. This allows you to
efficiently transform streaming data using the same thought process you’d use
for batch transformations against any other database.
Depending on your usage patterns, you can transform data using view
or materialized_view
models. For guidance and best
practices on when to use views and materialized views in Materialize, see
Indexed views vs. materialized views.
Views
dbt models are materialized as views by default. Although
this means you can skip the materialized
configuration in the model
definition to create views in Materialize, we recommend explicitly setting the
materialization type for maintainability.
Filename: models/view_a.sql
{{ config(materialized='view') }}
SELECT
col_a, ...
-- Reference model dependencies using the dbt ref() function
FROM {{ ref('kafka_topic_a') }}
The model above will be compiled to the following SQL statement:
CREATE VIEW database.schema.view_a AS
SELECT
col_a, ...
FROM database.schema.kafka_topic_a;
The resulting view will not keep results incrementally updated without an
index (see Creating an index on a view). Once a
view
model has been defined, it can be referenced from another model using
the dbt ref()
function.
Creating an index on a view
To keep results up-to-date in Materialize, you can create indexes
on view models using the index
configuration. This
allows you to bypass the need for maintaining complex incremental logic or
re-running dbt to refresh your models.
Filename: models/view_a.sql
{{ config(materialized='view',
indexes=[{'columns': ['col_a'], 'cluster': 'cluster_a'}]) }}
SELECT
col_a, ...
FROM {{ ref('kafka_topic_a') }}
The model above will be compiled to the following SQL statements:
CREATE VIEW database.schema.view_a AS
SELECT
col_a, ...
FROM database.schema.kafka_topic_a;
CREATE INDEX database.schema.view_a_idx IN CLUSTER cluster_a ON view_a (col_a);
As new data arrives, indexes keep view results incrementally updated in memory within a cluster. Indexes help optimize query performance and make queries against views fast and computationally free.
Materialized views
To materialize a model as a materialized view,
set the materialized
configuration to materialized_view
.
Filename: models/materialized_view_a.sql
{{ config(materialized='materialized_view') }}
SELECT
col_a, ...
-- Reference model dependencies using the dbt ref() function
FROM {{ ref('view_a') }}
The model above will be compiled to the following SQL statement:
CREATE MATERIALIZED VIEW database.schema.materialized_view_a AS
SELECT
col_a, ...
FROM database.schema.view_a;
The resulting materialized view will keep results incrementally updated in
durable storage as new data arrives. Once a materialized_view
model has been
defined, it can be referenced from another model using the dbt ref()
function.
Creating an index on a materialized view
With a materialized view, your models are kept up-to-date in Materialize as new data arrives. This allows you to bypass the need for maintaining complex incremental logic or re-run dbt to refresh your models.
These results are incrementally updated in durable storage β which makes
them available across clusters β but aren’t optimized for performance. To make
results also available in memory within a cluster, you
can create indexes on materialized view models using the
index
configuration.
Filename: models/materialized_view_a.sql
{{ config(materialized='materialized_view')
indexes=[{'columns': ['col_a'], 'cluster': 'cluster_b'}]) }}
SELECT
col_a, ...
FROM {{ ref('view_a') }}
The model above will be compiled to the following SQL statements:
CREATE MATERIALIZED VIEW database.schema.materialized_view_a AS
SELECT
col_a, ...
FROM database.schema.view_a;
CREATE INDEX database.schema.materialized_view_a_idx IN CLUSTER cluster_b ON materialized_view_a (col_a);
As new data arrives, results are incrementally updated in durable storage and also accessible in memory within the cluster the index is created in. Indexes help optimize query performance and make queries against materialized views faster.
Using refresh strategies
To enable this feature in your Materialize region, contact our team.
For data that doesn’t require up-to-the-second freshness, or that can be accessed using different patterns to optimize for performance and cost (e.g., hot vs. cold data), it might be appropriate to use a non-default refresh strategy.
To configure a refresh strategy in a materialized view model, use the
refresh_interval
configuration.
Materialized view models configured with a refresh strategy must be deployed in
a scheduled cluster for cost savings to be
significant β so you must also specify a valid scheduled cluster
using the
cluster
configuration.
Filename: models/materialized_view_refresh.sql
{{ config(materialized='materialized_view', cluster='my_scheduled_cluster', refresh_interval={'at_creation': True, 'every': '1 day', 'aligned_to': '2024-10-22T10:40:33+00:00'}) }}
SELECT
col_a, ...
FROM {{ ref('view_a') }}
The model above will be compiled to the following SQL statement:
CREATE MATERIALIZED VIEW database.schema.materialized_view_refresh
IN CLUSTER my_scheduled_cluster
WITH (
-- Refresh at creation, so the view is populated ahead of
-- the first user-specified refresh time
REFRESH AT CREATION,
-- Refresh every day at 10PM UTC
REFRESH EVERY '1 day' ALIGNED TO '2024-10-22T10:40:33+00:00'
) AS
SELECT ...;
Materialized views configured with a refresh strategy are not incrementally maintained and must recompute their results from scratch on every refresh.
Sinks
In Materialize, a sink describes an external system you
want to write data to, and provides details about how to encode that data. You
can instruct dbt to create a sink using the custom sink
materialization.
Create a Kafka sink.
Filename: sinks/kafka_topic_c.sql
{{ config(materialized='sink') }}
FROM {{ ref('materialized_view_a') }}
INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic_c')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM
The sink above would be compiled to:
database.schema.kafka_topic_c
Configuration: clusters, databases and indexes
Clusters
Use the cluster
option to specify the cluster in which a
materialized_view
, source
, sink
model, or index
configuration is
created. If unspecified, the default cluster for the connection is used.
{{ config(materialized='materialized_view', cluster='cluster_a') }}
To dynamically generate the name of a cluster (e.g., based on the target
environment), you can override the generate_cluster_name
macro with your
custom logic under the directory defined by macro-paths
in dbt_project.yml
.
Filename: macros/generate_cluster_name.sql
{% macro generate_cluster_name(custom_cluster_name) -%}
{%- if target.name == 'prod' -%}
{{ custom_cluster_name }}
{%- else -%}
{{ target.name }}_{{ custom_cluster_name }}
{%- endif -%}
{%- endmacro %}
Databases
Use the database
option to specify the database
in which a source
, view
, materialized_view
or sink
is created. If
unspecified, the default database for the connection is used.
{{ config(materialized='materialized_view', database='database_a') }}
Indexes
Use the indexes
configuration to define a list of indexes on
source
, view
, table
or materialized view
materializations. In
Materialize, indexes on a view maintain view results in
memory within a cluster. As the underlying data changes, indexes
incrementally update the view results in memory.
Each index
configuration can have the following components:
Component | Value | Description |
---|---|---|
columns |
list |
One or more columns on which the index is defined. To create an index that uses all columns, use the default component instead. |
name |
string |
The name for the index. If unspecified, Materialize will use the materialization name and column names provided. |
cluster |
string |
The cluster to use to create the index. If unspecified, indexes will be created in the cluster used to create the materialization. |
default |
bool |
Default: False . If set to True , creates a default index. |
Creating a multi-column index
{{ config(materialized='view',
indexes=[{'columns': ['col_a','col_b'], 'cluster': 'cluster_a'}]) }}
Creating a default index
{{ config(materialized='view',
indexes=[{'default': True}]) }}
Configuration: refresh strategies
To enable this feature in your Materialize region, contact our team.
Minimum requirements: dbt-materialize
v1.7.3+
Use the refresh_interval
configuration to define refresh strategies
for materialized view models.
The refresh_interval
configuration can have the following components:
Component | Value | Description |
---|---|---|
at |
string |
The specific time to refresh the materialized view at, using the refresh at strategy. |
at_creation |
bool |
Default: false . Whether to trigger a first refresh when the materialized view is created. |
every |
string |
The regular interval to refresh the materialized view at, using the refresh every strategy. |
aligned_to |
string |
The phase of the regular interval to refresh the materialized view at, using the refresh every strategy. If unspecified, defaults to the time when the materialized view is created. |
on_commit |
bool |
Default: false . Whether to use the default refresh on commit strategy. Setting this component to true is equivalent to not specifying refresh_interval in the configuration block, so we recommend only using it for the special case of parametrizing the configuration option (e.g., in macros). |
Configuration: model contracts and constraints
Model contracts
Minimum requirements: dbt-materialize
v1.6.0+
You can enforce model contracts
for view
, materialized_view
and table
materializations to guarantee that
there are no surprise breakages to your pipelines when the shape of the data
changes.
- name: model_with_contract
config:
contract:
enforced: true
columns:
- name: col_with_constraints
data_type: string
- name: col_without_constraints
data_type: int
Setting the contract
configuration to enforced: true
requires you to specify
a name
and data_type
for every column in your models. If there is a
mismatch between the defined contract and the model youβre trying to run, dbt
will fail during compilation! Optionally, you can also configure column-level
constraints.
Constraints
Minimum requirements: dbt-materialize
v1.6.1+
Materialize supports enforcing column-level not_null
constraints
for materialized_view
materializations. No other constraint or materialization
types are supported.
- name: model_with_constraints
config:
contract:
enforced: true
columns:
- name: col_with_constraints
data_type: string
constraints:
- type: not_null
- name: col_without_constraints
data_type: int
A not_null
constraint will be compiled to an ASSERT NOT NULL
option for the specified columns of the materialize view.
CREATE MATERIALIZED VIEW model_with_constraints
WITH (
ASSERT NOT NULL col_with_constraints
)
AS
SELECT NULL AS col_with_constraints,
2 AS col_without_constraints;
Build and run dbt
-
Run the dbt models:
dbt run
This command generates executable SQL code from any model files under the specified directory and runs it in the target environment. You can find the compiled statements under
/target/run
andtarget/compiled
in the dbt project folder. -
Using the SQL Shell, or your preferred SQL client connected to Materialize, double-check that all objects have been created:
SHOW SOURCES [FROM database.schema];
name ------------------- mysql_table_a mysql_table_b postgres_table_a postgres_table_b kafka_topic_a
SHOW VIEWS;
name ------------------- view_a
SHOW MATERIALIZED VIEWS;
name ------------------- materialized_view_a
That’s it! From here on, Materialize makes sure that your models are incrementally updated as new data streams in, and that you get fresh and correct results with millisecond latency whenever you query your views.
Test and document a dbt project
Configure continuous testing
Using dbt in a streaming context means that you’re able to run data quality and integrity tests non-stop. This is useful to monitor failures as soon as they happen, and trigger real-time alerts downstream.
-
To configure your project for continuous testing, add a
data_tests
property todbt_project.yml
with thestore_failures
configuration:data_tests: dbt_project.name: models: +store_failures: true +schema: 'etl_failure'
This will instruct dbt to create a materialized view for each configured test that can keep track of failures over time. By default, test views are created in a schema suffixed with
dbt_test__audit
. To specify a custom suffix, use theschema
config.Note: As an alternative, you can specify the
--store-failures
flag when runningdbt test
. -
Add tests to your models using the
data_tests
property in the model configuration.yml
files:models: - name: materialized_view_a description: 'materialized view a description' columns: - name: col_a description: 'column a description' data_tests: - not_null - unique
The type of test and the columns being tested are used as a base for naming the test materialized views. For example, the configuration above would create views named
not_null_col_a
andunique_col_a
. -
Run the tests:
dbt test # use --select test_type:data to only run data tests!
When configured to
store_failures
, this command will create a materialized view for each test using the respectiveSELECT
statements, instead of doing a one-off check for failures as part of its execution.This guarantees that your tests keep running in the background as views that are automatically updated as soon as an assertion fails.
-
Using the SQL Shell, or your preferred SQL client connected to Materialize, that the schema storing the tests has been created, as well as the test materialized views:
SHOW SCHEMAS;
name ------------------- public public_etl_failure
SHOW MATERIALIZED VIEWS FROM public_etl_failure;
name ------------------- not_null_col_a unique_col_a
With continuous testing in place, you can then build alerts off of the test
materialized views using any common PostgreSQL-compatible client library
and SUBSCRIBE
(see the Python cheatsheet
for a reference implementation).
Generate documentation
dbt can automatically generate documentation for your project as a shareable website. This brings data governance to your streaming pipelines, speeding up life-saving processes like data discovery (where to find what data) and lineage (the path data takes from source (s) to sink(s), as well as the transformations that happen along the way).
If you’ve already created .yml
files with helpful properties
about your project resources (like model and column descriptions, or tests), you
are all set.
-
To generate documentation for your project, run:
dbt docs generate
dbt will grab any additional project information and Materialize catalog metadata, then compile it into
.json
files (manifest.json
andcatalog.json
, respectively) that can be used to feed the documentation website. You can find the compiled files under/target
, in the dbt project folder. -
Launch the documentation website. By default, this command starts a web server on port 8000:
dbt docs serve #--port <port>
-
In a browser, navigate to
localhost:8000
. There, you can find an overview of your dbt project, browse existing models and metadata, and in general keep track of what’s going on.If you click View Lineage Graph in the lower right corner, you can even inspect the lineage of your streaming pipelines!
Persist documentation
Minimum requirements: dbt-materialize
v1.6.1+
To persist model- and column-level descriptions as comments
in Materialize, use the persist_docs
configuration.
dbt run
command invocations.
For “use-at-your-own-risk” workarounds, see dbt-core
#4226. π»
-
To enable docs persistence, add a
models
property todbt_project.yml
with thepersist-docs
configuration:models: +persist_docs: relation: true columns: true
As an alternative, you can configure
persist-docs
in the config block of your models:{{ config( materialized=materialized_view, persist_docs={"relation": true, "columns": true} ) }}
-
Once
persist-docs
is configured, anydescription
defined in your.yml
files is persisted to Materialize in the mz_internal.mz_comments system catalog table on everydbt run
:SELECT * FROM mz_internal.mz_comments;
id | object_type | object_sub_id | comment ------+-------------------+---------------+---------------------------------- u622 | materialize-view | | materialized view a description u626 | materialized-view | 1 | column a description u626 | materialized-view | 2 | column b description