Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: Avro from local file

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to local Avro Object Container Files (OCFs).

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM AVRO OCF path with_options ENVELOPE NONE DEBEZIUM

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
AVRO OCF path The absolute path to the Avro OCF you want to use as the source.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
ENVELOPE DEBEZIUM Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. This option requires payloads have the appropriate fields, is generally only supported by sources published to Kafka by Debezium, and is incompatible with Kafka compaction. For more information, see Debezium envelope details.
ENVELOPE DEBEZIUM UPSERT Use the Debezium envelope with UPSERT enabled. This is required for Kafka sources with log compaction, but increases memory consumption.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Key constraint details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
tail boolean Continually check the file for new content; as new content arrives, process it using other WITH options.
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.

Details

Avro OCF source details

path values must be the absolute path to the Avro OCF, e.g.

CREATE SOURCE server_source FROM FILE '/Users/sean/server.ocf'...

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Debezium envelope details

The Debezium envelope describes the decoded records' old and new values; this is roughly equivalent to the notion of Change Data Capture (CDC).

To use the Debezium envelope with Materialize, you must configure Debezium with your database.

Note that Materialize currently does not support truncation for upstream sources. (7277 )

WARNING! Debezium can produce duplicate records if the connector is interrupted. Materialize makes a best-effort attempt to detect and filter out duplicates generated by the MySQL and PostgreSQL connectors. It does not yet attempt to detect duplicates generated by other Debezium connectors.

The Debezium envelope is most easily supported by sources published to Kafka by Debezium.

Format implications

Using the Debezium envelopes changes the schema of your Avro-encoded Kafka topics to include something akin to the following field:

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
        "name": "before",
        "type": [
            {
            "name": "row",
            "type": "record",
            "fields": [
                {"name": "a", "type": "long"},
                {"name": "b", "type": "long"}
            ]
            },
            "null"
        ]
        },
        { "name": "after", "type": ["row", "null"] }
    ]
}

Note that:

Kafka topic requirements

ENVELOPE DEBEZIUM by itself is incompatible with Kafka’s log compaction. You must specify ENVELOPE DEBEZIUM UPSERT if you enable compaction of a topic carrying Debezium data. The DEBEZIUM UPSERT envelope uses memory proportional to the size of the upstream database table.

Key constraint details

Primary keys are automatically inferred for Kafka sources using Upsert or Debezium envelopes and Postgres sources.

For other source configurations, the key_constraint syntax allows to manually declare a set of columns as a primary key. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.

Examples

CREATE SOURCE events
FROM AVRO OCF '[path to .ocf]'
WITH (tail = true)
ENVELOPE NONE;

This creates a source that…

Did this info help?
Yes No