Materialize Documentation
s
Join the Community github/materialize

CREATE SINK

CREATE SINK sends data from Materialize to an external sink.

Conceptual framework

Sinks let you stream data out of Materialize, using either sources or views.

Sink source type Description
Source Simply pass all data received from the source to the sink without modifying it.
Materialized view Stream all changes to the view to the sink. This lets you use Materialize to process a stream, and then stream the processed values. Note that this feature only works with materialized views, and does not work with non-materialized views.

Syntax

CREATE SINK IF NOT EXISTS sink_name FROM item_name INTO sink_kafka_connector AVRO OCF path sink_with_options FORMAT sink_format_spec ENVELOPE DEBEZIUM UPSERT WITH SNAPSHOT WITHOUT SNAPSHOT

sink_kafka_connector

KAFKA BROKER host TOPIC topic-prefix KEY ( key_column , ) CONSISTENCY ( TOPIC consistency_topic FORMAT consistency_format_spec )

sink_format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE schema_file_path JSON

consistency_format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options
Field Use
IF NOT EXISTS If specified, do not generate an error if a sink of the same name already exists.

If not specified, throw an error if a sink of the same name already exists. (Default)
sink_name A name for the sink. This name is only used within Materialize.
item_name The name of the source or view you want to send to the sink.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options). If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic_prefix The prefix used to generate the Kafka topic name to create and write to.
KEY ( key_column ) An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset.

New in v0.5.1.

TOPIC consistency_topic Makes the sink emit additional consistency metadata to the named topic. Only valid for Kafka sinks. If reuse_topic is true, a default naming convention will be used when the topic name is not explicitly set. This is formed by appending -consistency to the output topic name.

Available only in unstable builds.

AVRO OCF path The absolute path and file name of the Avro Object Container file (OCF) to create and write to. The filename will be modified to let Materialize create a unique file each time Materialize starts, but the file extension will not be modified. You can find more details here.
sink_with_options Options affecting sink creation. For more detail, see WITH options.
with_options Options affecting Materialize’s connection to Kafka. For more detail, see Authentication.
ENVELOPE DEBEZIUM The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source. This is the default.
ENVELOPE UPSERT The sink emits data with upsert semantics: updates and inserts for the given key are expressed as a value, and deletes are expressed as a null value payload in Kafka. For more detail, see Handling upserts.

Changed in v0.7.1: The AS OF option was removed.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
partition_count int Set the sink Kafka topic’s partition count. This defaults to -1 (use the broker default).
replication_factor int Set the sink Kafka topic’s replication factor. This defaults to -1 (use the broker default).
reuse_topic bool Use the existing Kafka topic after Materialize restarts, instead of creating a new one. The default is false. See Enabling topic reuse after restart for details.
consistency_topic text This option is only available to support backwards-compatibility. Please use the new CONSISTENCY syntax to define a consistency topic for the sink.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
acks text Sets the number of Kafka replicas that must acknowledge Materialize writes. Accepts values [-1,1000]. -1 (the default) specifies all replicas.
retention_ms long Sets the maximum time Kafka will retain a log. Accepts values [-1, …]. -1 specifics no time limit. If not set, uses the broker default.

New in v0.9.7.

retention_bytes long Sets the maximum size a Kafka partition can grow before removing old logs. Accepts values [-1, …]. -1 specifics no size limit. If not set, uses the broker default.

New in v0.9.7.

avro_key_fullname text Sets the Avro fullname on the generated key schema, if a KEY is specified. When used, a value must be specified for avro_value_fullname. The default fullname is row.

New in v0.18.0.

avro_value_fullname text Sets the Avro fullname on the generated value schema. When KEY is specified, avro_key_fullname must additionally be specified. The default fullname is envelope.

New in v0.18.0.

Authentication

Kafka sinks support the same authentication scheme and options as Kafka sources (SSL, SASL). Check the Kafka source documentation for more details and examples.

WITH SNAPSHOT or WITHOUT SNAPSHOT

By default, each SINK is created with a SNAPSHOT which contains the consolidated results of the query before the sink was created. Any further updates to these results are produced at the time when they occur. To only see results after the sink is created, specify WITHOUT SNAPSHOT.

Detail

Debezium envelope details

The Debezium envelope provides a “diff envelope”, which describes the decoded records' old and new values; this is roughly equivalent to the notion of Change Data Capture, or CDC. Materialize can write the data in this diff envelope to represent inserts, updates, or deletes to the underlying source or view being written for the sink.

Format implications

Using the Debezium envelope changes the schema of your Avro-encoded data to include something akin to the following field:

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
        "name": "before",
        "type": [
            {
            "name": "row",
            "type": "record",
            "fields": [
                {"name": "a", "type": "long"},
                {"name": "b", "type": "long"}
            ]
            },
            "null"
        ]
        },
        { "name": "after", "type": ["row", "null"] }
    ]
}

Note that:

Kafka sinks

When creating sinks, Materialize will either reuse the last sink topic (if reuse_topic is true) or it will generate a new topic name using the format below.

{topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}

If the topic does not exist, Materialize will use the Kafka Admin API to create the topic.

For Avro-encoded sinks, Materialize will publish the sink’s Avro schema to the Confluent Schema Registry. Materialize will not publish schemas for JSON-encoded sinks.

You can find the topic name for each Kafka sink by querying mz_kafka_sinks.

NOTE: Dropping a Kafka sink doesn’t drop the corresponding topic. Recreating the sink will result in a new Kafka topic. For more information, see the Kafka documentation.

Exactly-once sinks (with topic reuse after restart)

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee.

New in v0.9.0.

By default, Materialize creates new, distinct topics for sinks on restart. To enable the reuse of an existing topic instead and achieve exactly-once processing guarantees, Materialize must:

In practice, each incoming event affects the final results exactly once, even if the stream is disrupted or Materialize is restarted. Because the implementation effectively relies on source events having replayable timestamps, only Kafka sources and materialized views defined on top of Kafka sources can be used in this context.

Syntax

CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost:9092' TOPIC 'quotes-eo-sink'
CONSISTENCY (TOPIC 'quotes-eo-sink-consistency'
             FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081')
WITH (reuse_topic=true)
FORMAT JSON;

To create a sink with exactly-once processing guarantees, you need to:

Note that:

This feature is still in beta, so we strongly recommend that you start with test data, rather than with production. Please let us know if you run into any issues!

Consistency metadata

When requested, Materialize will produce consistency metadata that describes timestamps and relates the change data stream to them.

Materialize sends two main pieces of information:

Materialize uses a simplified version of the Debezium transaction metadata protocol to send this information. The generated diff envelope schema used for data messages is decorated with a transaction field which has the following schema:

{
    "name": "transaction",
    "type": {
        "type": "record",
        "name": "transaction_metadata",
        "fields": [
            {
                "name": "id",
                "type": "string"
            }
        ]
    }
}

Each message sent to Kafka has a transaction field and a transaction id, in addition to it’s regular before / after data fields. The transaction id is equivalent to the Materialize timestamp for this record.

In addition to the inline information, Materialize creates an additional consistency topic that stores counts of how many changes were generated per transaction id. The name of the consistency topic uses the following convention:

{consistency_topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}
NOTE: If reuse_topic is enabled, the naming convention is ignored. Instead, the topic name specified via the CONSISTENCY option is used.

Each message in the consistency topic has the schema below.

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
            "name": "id",
            "type": "string"
        },
        {
            "name": "status",
            "type": "string"
        },
        {
            "name": "event_count",
            "type": [
                null,
                "long"
            ]
        },
        {
            "name": "data_collections",
            "type": [
                "null",
                {
                    "type": "array",
                    "items": {
                        "type": "record",
                        "name": "data_collection",
                        "fields": [
                            {
                                "name": "data_collection",
                                "type": "string"
                            },
                            {
                                "name": "event_count",
                                "type": "long"
                            },
                        ]
                    }
                }
            ],
            "default": null,
        }
    ]
}
Field Use
id The transaction id this record refers to.
status Either BEGIN or END. Materialize sends a record with BEGIN the first time it writes a data message for id, and an END record after it has written all data messages for id.
event_count This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id.
data_collections This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id and collection.
Consistency information details

Avro OCF sinks

When creating Avro Object Container File (OCF) sinks, Materialize creates a new sink OCF and appends the Avro schema data in its header. Materialize names the new file using the format below.

{path.base_directory}-{path.file_stem}-{sink_global_id}-{materialize-startup_time}-{nonce}-{path.file_extension}

You can query mz_avro_ocf_sinks to get file name information for each Avro OCF sink. Look here for a more concrete example.

Examples

Avro sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

Get actual Kafka topic names

SELECT sink_id, name, topic
FROM mz_sinks
JOIN mz_kafka_sinks ON mz_sinks.id = mz_kafka_sinks.sink_id
  sink_id  |              name                    |                        topic
-----------+--------------------------------------+------------------------------------------------------
 u5        | materialize.public.quotes_sink       | quotes-sink-u6-1586024632-15401700525642547992
 u6        | materialize.public.frank_quotes_sink | frank-quotes-sink-u5-1586024632-15401700525642547992

Avro OCF sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO AVRO OCF '/path/to/sink-file.ocf;'

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO AVRO OCF '/path/to/frank-sink-file.ocf;'

Get actual file names

Materialize stores the actual path as a byte array so we need to use the convert_from function to convert it to a UTF-8 string.

SELECT sink_id, name, convert_from(path, 'utf8')
FROM mz_sinks
JOIN mz_avro_ocf_sinks ON mz_sinks.id = mz_avro_ocf_sinks.sink_id
 sink_id   |        name       |                           path
-----------+-------------------+----------------------------------------------------------------
 u10       | quotes_sink       | /path/to/sink-file-u10-1586108399-8671224166353132585.ocf
 u11       | frank_quotes_sink | /path/to/frank-sink-file-u11-1586108399-8671224166353132585.ocf

JSON sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT JSON;

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT JSON;