Use dbt to manage Materialize

NOTE: The dbt-materialize adapter can only be used with dbt Core. Making the adapter available in dbt Cloud depends on prioritization by dbt Labs. If you require dbt Cloud support, please reach out to the dbt Labs team.

dbt has become the standard for data transformation (“the T in ELT”). It combines the accessibility of SQL with software engineering best practices, allowing you to not only build reliable data pipelines, but also document, test and version-control them.

In this guide, we’ll cover how to use dbt and Materialize to transform streaming data in real time β€” from model building to continuous testing.

Setup

Setting up a dbt project with Materialize is similar to setting it up with any other database that requires a non-native adapter. To get up and running, you need to:

  1. Install the dbt-materialize plugin (optionally using a virtual environment):

    python3 -m venv dbt-venv                  # create the virtual environment
    source dbt-venv/bin/activate              # activate the virtual environment
    pip install dbt-core dbt-materialize      # install dbt-core and the adapter
    

    The installation will include the dbt-postgres dependency. To check that the plugin was successfully installed, run:

    dbt --version
    

    materialize should be listed under “Plugins”. If this is not the case, double-check that the virtual environment is activated!

  2. To get started, make sure you have a Materialize account.

Create and configure a dbt project

A dbt project is a directory that contains all dbt needs to run and keep track of your transformations. At a minimum, it must have a project file (dbt_project.yml) and at least one model (.sql).

To create a new project, run:

dbt init <project_name>

This command will bootstrap a starter project with default configurations and create a profiles.yml file, if it doesn’t exist. To help you get started, the dbt init project includes sample models to run the Materialize quickstart.

Connect to Materialize

dbt manages all your connection configurations (or, profiles) in a file called profiles.yml. By default, this file is located under ~/.dbt/.

  1. Locate the profiles.yml file in your machine:

    dbt debug --config-dir
    

    Note: If you started from an existing project but it’s your first time setting up dbt, it’s possible that this file doesn’t exist yet. You can manually create it in the suggested location.

  2. Open profiles.yml and adapt it to connect to Materialize using the reference profile configuration.

    As an example, the following profile would allow you to connect to Materialize in two different environments: a developer environment (dev) and a production environment (prod).

    default:
      outputs:
    
        prod:
          type: materialize
          threads: 1
          host: <host>
          port: 6875
          user: <user@domain.com>
          pass: <password>
          database: materialize
          schema: public
          # optionally use the cluster connection
          # parameter to specify the default cluster
          # for the connection
          cluster: <prod_cluster>
          sslmode: require
        dev:
          type: materialize
          threads: 1
          host: <host>
          port: 6875
          user: <user@domain.com>
          pass: <password>
          database: <dev_database>
          schema: <dev_schema>
          cluster: <dev_cluster>
          sslmode: require
    
      target: dev
    

    The target parameter allows you to configure the target environment that dbt will use to run your models.

  3. To test the connection to Materialize, run:

    dbt debug
    

    If the output reads All checks passed!, you’re good to go! The dbt documentation has some helpful pointers in case you run into errors.

Build and run dbt models

For dbt to know how to persist (or not) a transformation, the model needs to be associated with a materialization strategy. Because Materialize is optimized for real-time transformations of streaming data and the core of dbt is built around batch, the dbt-materialize adapter implements a few custom materialization types:

Type Details Config options
source Creates a source. cluster, indexes
view Creates a view. indexes
materialized_view Creates a materialized view. The materializedview legacy materialization name is supported for backwards compatibility. cluster, indexes
table Creates a materialized view (actual table support pending discussion#29633). cluster, indexes
sink Creates a sink. cluster
ephemeral Executes queries using CTEs.

Create a materialization for each SQL statement you’re planning to deploy. Each individual materialization should be stored as a .sql file under the directory defined by model-paths in dbt_project.yml.

Sources

In Materialize, a source describes an external system you want to read data from, and provides details about how to decode and interpret that data. You can instruct dbt to create a source using the custom source materialization. Once a source has been defined, it can be referenced from another model using the dbt ref() or source() functions.

NOTE: To create a source, you first need to create a connection that specifies access and authentication parameters. Connections are not exposed in dbt, and need to exist before you run any source models.

Create a Kafka source.

Filename: sources/kafka_topic_a.sql

{{ config(materialized='source') }}

FROM KAFKA CONNECTION kafka_connection (TOPIC 'topic_a')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection

The source above would be compiled to:

database.schema.kafka_topic_a

Create a PostgreSQL source.

Filename: sources/pg.sql

{{ config(materialized='source') }}

FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
FOR ALL TABLES

Materialize will automatically create a subsource for each table in the mz_source publication. Pulling subsources into the dbt context automatically isn’t supported yet. Follow the discussion in dbt-core #6104 for updates!

A possible workaround is to define PostgreSQL sources as a dbt source in a .yml file, nested under a sources: key, and list each subsource under the tables: key.

sources:
  - name: pg
    schema: "{{ target.schema }}"
    tables:
      - name: table_a
      - name: table_b

Once a subsource has been defined this way, it can be referenced from another model using the dbt source() function. To ensure that dbt is able to determine the proper order to run the models in, you should additionally force a dependency on the parent source model (pg), as described in the dbt documentation.

Filename: staging/dep_subsources.sql

-- depends_on: {{ ref('pg') }}
{{ config(materialized='view') }}

SELECT
    table_a.foo AS foo,
    table_b.bar AS bar
FROM {{ source('pg','table_a') }}
INNER JOIN
     {{ source('pg','table_b') }}
    ON table_a.id = table_b.foo_id

The source and subsources above would be compiled to:

database.schema.pg
database.schema.table_a
database.schema.table_b

Create a MySQL source.

Filename: sources/mysql.sql

{{ config(materialized='source') }}

FROM MYSQL CONNECTION mysql_connection
FOR ALL TABLES;

Materialize will automatically create a subsource for each table in the upstream database. Pulling subsources into the dbt context automatically isn’t supported yet. Follow the discussion in dbt-core #6104 for updates!

A possible workaround is to define MySQL sources as a dbt source in a .yml file, nested under a sources: key, and list each subsource under the tables: key.

sources:
  - name: mysql
    schema: "{{ target.schema }}"
    tables:
      - name: table_a
      - name: table_b

Once a subsource has been defined this way, it can be referenced from another model using the dbt source() function. To ensure that dbt is able to determine the proper order to run the models in, you should additionally force a dependency on the parent source model (mysql), as described in the dbt documentation.

Filename: staging/dep_subsources.sql

-- depends_on: {{ ref('mysql') }}
{{ config(materialized='view') }}

SELECT
    table_a.foo AS foo,
    table_b.bar AS bar
FROM {{ source('mysql','table_a') }}
INNER JOIN
     {{ source('mysql','table_b') }}
    ON table_a.id = table_b.foo_id

The source and subsources above would be compiled to:

database.schema.mysql
database.schema.table_a
database.schema.table_b

Create a webhook source.

Filename: sources/webhook.sql

{{ config(materialized='source') }}

FROM WEBHOOK
    BODY FORMAT JSON
    CHECK (
      WITH (
        HEADERS,
        BODY AS request_body,
        -- Make sure to fully qualify the secret if it isn't in the same
        -- namespace as the source!
        SECRET basic_hook_auth
      )
      constant_time_eq(headers->'authorization', basic_hook_auth)
    );

The source above would be compiled to:

database.schema.webhook

Views and materialized views

In dbt, a model is a SELECT statement that encapsulates a data transformation you want to run on top of your database. When you use dbt with Materialize, your models stay up-to-date without manual or configured refreshes. This allows you to efficiently transform streaming data using the same thought process you’d use for batch transformations against any other database.

Depending on your usage patterns, you can transform data using view or materialized_view models. For guidance and best practices on when to use views and materialized views in Materialize, see Indexed views vs. materialized views.

Views

dbt models are materialized as views by default. Although this means you can skip the materialized configuration in the model definition to create views in Materialize, we recommend explicitly setting the materialization type for maintainability.

Filename: models/view_a.sql

{{ config(materialized='view') }}

SELECT
    col_a, ...
-- Reference model dependencies using the dbt ref() function
FROM {{ ref('kafka_topic_a') }}

The model above will be compiled to the following SQL statement:

CREATE VIEW database.schema.view_a AS
SELECT
    col_a, ...
FROM database.schema.kafka_topic_a;

The resulting view will not keep results incrementally updated without an index (see Creating an index on a view). Once a view model has been defined, it can be referenced from another model using the dbt ref() function.

Creating an index on a view
πŸ’‘ Tip: For guidance and best practices on how to use indexes in Materialize, see Indexes on views.

To keep results up-to-date in Materialize, you can create indexes on view models using the index configuration. This allows you to bypass the need for maintaining complex incremental logic or re-running dbt to refresh your models.

Filename: models/view_a.sql

{{ config(materialized='view',
          indexes=[{'columns': ['col_a'], 'cluster': 'cluster_a'}]) }}

SELECT
    col_a, ...
FROM {{ ref('kafka_topic_a') }}

The model above will be compiled to the following SQL statements:

CREATE VIEW database.schema.view_a AS
SELECT
    col_a, ...
FROM database.schema.kafka_topic_a;

CREATE INDEX database.schema.view_a_idx IN CLUSTER cluster_a ON view_a (col_a);

As new data arrives, indexes keep view results incrementally updated in memory within a cluster. Indexes help optimize query performance and make queries against views fast and computationally free.

Materialized views

To materialize a model as a materialized view, set the materialized configuration to materialized_view.

Filename: models/materialized_view_a.sql

{{ config(materialized='materialized_view') }}

SELECT
    col_a, ...
-- Reference model dependencies using the dbt ref() function
FROM {{ ref('view_a') }}

The model above will be compiled to the following SQL statement:

CREATE MATERIALIZED VIEW database.schema.materialized_view_a AS
SELECT
    col_a, ...
FROM database.schema.view_a;

The resulting materialized view will keep results incrementally updated in durable storage as new data arrives. Once a materialized_view model has been defined, it can be referenced from another model using the dbt ref() function.

Creating an index on a materialized view
πŸ’‘ Tip: For guidance and best practices on how to use indexes in Materialize, see Indexes on materialized views.

With a materialized view, your models are kept up-to-date in Materialize as new data arrives. This allows you to bypass the need for maintaining complex incremental logic or re-run dbt to refresh your models.

These results are incrementally updated in durable storage β€” which makes them available across clusters β€” but aren’t optimized for performance. To make results also available in memory within a cluster, you can create indexes on materialized view models using the index configuration.

Filename: models/materialized_view_a.sql

{{ config(materialized='materialized_view')
          indexes=[{'columns': ['col_a'], 'cluster': 'cluster_b'}]) }}

SELECT
    col_a, ...
FROM {{ ref('view_a') }}

The model above will be compiled to the following SQL statements:

CREATE MATERIALIZED VIEW database.schema.materialized_view_a AS
SELECT
    col_a, ...
FROM database.schema.view_a;

CREATE INDEX database.schema.materialized_view_a_idx IN CLUSTER cluster_b ON materialized_view_a (col_a);

As new data arrives, results are incrementally updated in durable storage and also accessible in memory within the cluster the index is created in. Indexes help optimize query performance and make queries against materialized views faster.

Using refresh strategies
πŸ’‘ Tip: For guidance and best practices on how to use refresh strategies in Materialize, see Refresh strategies.
PREVIEW This feature is in private preview. It is under active development and may have stability or performance issues. It isn't subject to our backwards compatibility guarantees.

To enable this feature in your Materialize region, contact our team.

For data that doesn’t require up-to-the-second freshness, or that can be accessed using different patterns to optimize for performance and cost (e.g., hot vs. cold data), it might be appropriate to use a non-default refresh strategy.

To configure a refresh strategy in a materialized view model, use the refresh_interval configuration. Materialized view models configured with a refresh strategy must be deployed in a scheduled cluster for cost savings to be significant β€” so you must also specify a valid scheduled cluster using the cluster configuration.

Filename: models/materialized_view_refresh.sql

{{ config(materialized='materialized_view', cluster='my_scheduled_cluster', refresh_interval={'at_creation': True, 'every': '1 day', 'aligned_to': '2024-10-22T10:40:33+00:00'}) }}

SELECT
    col_a, ...
FROM {{ ref('view_a') }}

The model above will be compiled to the following SQL statement:

CREATE MATERIALIZED VIEW database.schema.materialized_view_refresh
IN CLUSTER my_scheduled_cluster
WITH (
  -- Refresh at creation, so the view is populated ahead of
  -- the first user-specified refresh time
  REFRESH AT CREATION,
  -- Refresh every day at 10PM UTC
  REFRESH EVERY '1 day' ALIGNED TO '2024-10-22T10:40:33+00:00'
) AS
SELECT ...;

Materialized views configured with a refresh strategy are not incrementally maintained and must recompute their results from scratch on every refresh.

Sinks

In Materialize, a sink describes an external system you want to write data to, and provides details about how to encode that data. You can instruct dbt to create a sink using the custom sink materialization.

Create a Kafka sink.

Filename: sinks/kafka_topic_c.sql

{{ config(materialized='sink') }}

FROM {{ ref('materialized_view_a') }}
INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic_c')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM

The sink above would be compiled to:

database.schema.kafka_topic_c

Configuration: clusters, databases and indexes

Clusters

Use the cluster option to specify the cluster in which a materialized_view, source, sink model, or index configuration is created. If unspecified, the default cluster for the connection is used.

{{ config(materialized='materialized_view', cluster='cluster_a') }}

To dynamically generate the name of a cluster (e.g., based on the target environment), you can override the generate_cluster_name macro with your custom logic under the directory defined by macro-paths in dbt_project.yml.

Filename: macros/generate_cluster_name.sql

{% macro generate_cluster_name(custom_cluster_name) -%}
    {%- if target.name == 'prod' -%}
        {{ custom_cluster_name }}
    {%- else -%}
        {{ target.name }}_{{ custom_cluster_name }}
    {%- endif -%}
{%- endmacro %}

Databases

Use the database option to specify the database in which a source, view, materialized_view or sink is created. If unspecified, the default database for the connection is used.

{{ config(materialized='materialized_view', database='database_a') }}

Indexes

Use the indexes configuration to define a list of indexes on source, view, table or materialized view materializations. In Materialize, indexes on a view maintain view results in memory within a cluster. As the underlying data changes, indexes incrementally update the view results in memory.

Each index configuration can have the following components:

Component Value Description
columns list One or more columns on which the index is defined. To create an index that uses all columns, use the default component instead.
name string The name for the index. If unspecified, Materialize will use the materialization name and column names provided.
cluster string The cluster to use to create the index. If unspecified, indexes will be created in the cluster used to create the materialization.
default bool Default: False. If set to True, creates a default index.
Creating a multi-column index
{{ config(materialized='view',
          indexes=[{'columns': ['col_a','col_b'], 'cluster': 'cluster_a'}]) }}
Creating a default index
{{ config(materialized='view',
    indexes=[{'default': True}]) }}

Configuration: refresh strategies

PREVIEW This feature is in private preview. It is under active development and may have stability or performance issues. It isn't subject to our backwards compatibility guarantees.

To enable this feature in your Materialize region, contact our team.

Minimum requirements: dbt-materialize v1.7.3+

Use the refresh_interval configuration to define refresh strategies for materialized view models.

The refresh_interval configuration can have the following components:

Component Value Description
at string The specific time to refresh the materialized view at, using the refresh at strategy.
at_creation bool Default: false. Whether to trigger a first refresh when the materialized view is created.
every string The regular interval to refresh the materialized view at, using the refresh every strategy.
aligned_to string The phase of the regular interval to refresh the materialized view at, using the refresh every strategy. If unspecified, defaults to the time when the materialized view is created.
on_commit bool Default: false. Whether to use the default refresh on commit strategy. Setting this component to true is equivalent to not specifying refresh_interval in the configuration block, so we recommend only using it for the special case of parametrizing the configuration option (e.g., in macros).

Configuration: model contracts and constraints

Model contracts

Minimum requirements: dbt-materialize v1.6.0+

You can enforce model contracts for view, materialized_view and table materializations to guarantee that there are no surprise breakages to your pipelines when the shape of the data changes.

    - name: model_with_contract
    config:
      contract:
        enforced: true
    columns:
      - name: col_with_constraints
        data_type: string
      - name: col_without_constraints
        data_type: int

Setting the contract configuration to enforced: true requires you to specify a name and data_type for every column in your models. If there is a mismatch between the defined contract and the model you’re trying to run, dbt will fail during compilation! Optionally, you can also configure column-level constraints.

Constraints

Minimum requirements: dbt-materialize v1.6.1+

Materialize supports enforcing column-level not_null constraints for materialized_view materializations. No other constraint or materialization types are supported.

    - name: model_with_constraints
    config:
      contract:
        enforced: true
    columns:
      - name: col_with_constraints
        data_type: string
        constraints:
          - type: not_null
      - name: col_without_constraints
        data_type: int

A not_null constraint will be compiled to an ASSERT NOT NULL option for the specified columns of the materialize view.

CREATE MATERIALIZED VIEW model_with_constraints
WITH (
        ASSERT NOT NULL col_with_constraints
     )
AS
SELECT NULL AS col_with_constraints,
       2 AS col_without_constraints;

Build and run dbt

  1. Run the dbt models:

    dbt run
    

    This command generates executable SQL code from any model files under the specified directory and runs it in the target environment. You can find the compiled statements under /target/run and target/compiled in the dbt project folder.

  2. Using the SQL Shell, or your preferred SQL client connected to Materialize, double-check that all objects have been created:

    SHOW SOURCES [FROM database.schema];
    

           name
    -------------------
     mysql_table_a
     mysql_table_b
     postgres_table_a
     postgres_table_b
     kafka_topic_a
    

    SHOW VIEWS;
    

           name
    -------------------
     view_a
    

    SHOW MATERIALIZED VIEWS;
    

           name
    -------------------
     materialized_view_a
    

That’s it! From here on, Materialize makes sure that your models are incrementally updated as new data streams in, and that you get fresh and correct results with millisecond latency whenever you query your views.

Test and document a dbt project

Configure continuous testing

Using dbt in a streaming context means that you’re able to run data quality and integrity tests non-stop. This is useful to monitor failures as soon as they happen, and trigger real-time alerts downstream.

  1. To configure your project for continuous testing, add a data_tests property to dbt_project.yml with the store_failures configuration:

    data_tests:
      dbt_project.name:
        models:
          +store_failures: true
          +schema: 'etl_failure'
    

    This will instruct dbt to create a materialized view for each configured test that can keep track of failures over time. By default, test views are created in a schema suffixed with dbt_test__audit. To specify a custom suffix, use the schema config.

    Note: As an alternative, you can specify the --store-failures flag when running dbt test.

  2. Add tests to your models using the data_tests property in the model configuration .yml files:

    models:
      - name: materialized_view_a
        description: 'materialized view a description'
        columns:
          - name: col_a
            description: 'column a description'
            data_tests:
              - not_null
              - unique
    

    The type of test and the columns being tested are used as a base for naming the test materialized views. For example, the configuration above would create views named not_null_col_a and unique_col_a.

  3. Run the tests:

    dbt test # use --select test_type:data to only run data tests!
    

    When configured to store_failures, this command will create a materialized view for each test using the respective SELECT statements, instead of doing a one-off check for failures as part of its execution.

    This guarantees that your tests keep running in the background as views that are automatically updated as soon as an assertion fails.

  4. Using the SQL Shell, or your preferred SQL client connected to Materialize, that the schema storing the tests has been created, as well as the test materialized views:

    SHOW SCHEMAS;
    

           name
    -------------------
     public
     public_etl_failure
    

    SHOW MATERIALIZED VIEWS FROM public_etl_failure;
    

           name
    -------------------
     not_null_col_a
     unique_col_a
    

With continuous testing in place, you can then build alerts off of the test materialized views using any common PostgreSQL-compatible client library and SUBSCRIBE(see the Python cheatsheet for a reference implementation).

Generate documentation

dbt can automatically generate documentation for your project as a shareable website. This brings data governance to your streaming pipelines, speeding up life-saving processes like data discovery (where to find what data) and lineage (the path data takes from source (s) to sink(s), as well as the transformations that happen along the way).

If you’ve already created .yml files with helpful properties about your project resources (like model and column descriptions, or tests), you are all set.

  1. To generate documentation for your project, run:

    dbt docs generate
    

    dbt will grab any additional project information and Materialize catalog metadata, then compile it into .json files (manifest.json and catalog.json, respectively) that can be used to feed the documentation website. You can find the compiled files under /target, in the dbt project folder.

  2. Launch the documentation website. By default, this command starts a web server on port 8000:

    dbt docs serve #--port <port>
    
  3. In a browser, navigate to localhost:8000. There, you can find an overview of your dbt project, browse existing models and metadata, and in general keep track of what’s going on.

    If you click View Lineage Graph in the lower right corner, you can even inspect the lineage of your streaming pipelines!

    dbt lineage graph

Persist documentation

Minimum requirements: dbt-materialize v1.6.1+

To persist model- and column-level descriptions as comments in Materialize, use the persist_docs configuration.

NOTE: Documentation persistence is tightly coupled with dbt run command invocations. For “use-at-your-own-risk” workarounds, see dbt-core #4226. πŸ‘»
  1. To enable docs persistence, add a models property to dbt_project.yml with the persist-docs configuration:

    models:
      +persist_docs:
        relation: true
        columns: true
    

    As an alternative, you can configure persist-docs in the config block of your models:

    {{ config(
        materialized=materialized_view,
        persist_docs={"relation": true, "columns": true}
    ) }}
    
  2. Once persist-docs is configured, any description defined in your .yml files is persisted to Materialize in the mz_internal.mz_comments system catalog table on every dbt run:

      SELECT * FROM mz_internal.mz_comments;
    

    
        id  |    object_type    | object_sub_id |              comment
      ------+-------------------+---------------+----------------------------------
       u622 | materialize-view  |               | materialized view a description
       u626 | materialized-view |             1 | column a description
       u626 | materialized-view |             2 | column b description
    
  • Development workflows
  • Back to top ↑