Materialize Documentation
Join the Community github/materialize

CREATE SINK

CREATE SINK sends data from Materialize to an external sink.

Conceptual framework

Sinks let you stream data out of Materialize, using either sources or views.

Sink source type Description
Source Simply pass all data received from the source to the sink without modifying it.
Materialized view Stream all changes to the view to the sink. This lets you use Materialize to process a stream, and then stream the processed values. Note that this feature only works with materialized views, and does not work with non-materialized views.

Syntax

CREATE SINK IF NOT EXISTS sink_name FROM item_name INTO sink_kafka_connector AVRO OCF path sink_with_options FORMAT sink_format_spec ENVELOPE DEBEZIUM UPSERT WITH SNAPSHOT WITHOUT SNAPSHOT

sink_kafka_connector

KAFKA BROKER host TOPIC topic-prefix KEY ( key_column , ) CONSISTENCY ( TOPIC consistency_topic FORMAT consistency_format_spec )

sink_format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE schema_file_path JSON

consistency_format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE schema_file_path
Field Use
IF NOT EXISTS If specified, do not generate an error if a sink of the same name already exists.

If not specified, throw an error if a sink of the same name already exists. (Default)
sink_name A name for the sink. This name is only used within Materialize.
item_name The name of the source or view you want to send to the sink.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic_prefix The prefix used to generate the Kafka topic name to create and write to.
KEY ( key_column ) An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset.

New in v0.5.1.

TOPIC consistency_topic Makes the sink emit additional consistency metadata to the named topic. Only valid for Kafka sinks. If reuse_topic is true, a default consistency_topic will be used when not explicitly set. The default consistency topic name is formed by appending -consistency to the output topic name.

Available only in unstable builds.

AVRO OCF path The absolute path and file name of the Avro Object Container file (OCF) to create and write to. The filename will be modified to let Materialize create a unique file each time Materialize starts, but the file extension will not be modified. You can find more details here.
sink_with_options Options affecting sink creation. For more detail, see WITH options.
with_options Options affecting Materialize’s connection to Kafka. For more detail, see Format WITH options.
ENVELOPE DEBEZIUM The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source. This is the default.
ENVELOPE UPSERT The sink emits data with upsert semantics: updates and inserts for the given key are expressed as a value, and deletes are expressed as a null value payload in Kafka. For more detail, see Upsert source details.

Changed in v0.7.1: The AS OF option was removed.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
partition_count int Set the sink Kafka topic’s partition count. This defaults to -1 (use the broker default).
replication_factor int Set the sink Kafka topic’s replication factor. This defaults to -1 (use the broker default).
reuse_topic bool Use the existing Kafka topic after Materialize restarts, instead of creating a new one. The default is false. See Enabling topic reuse after restart for details.
consistency_topic text Makes the sink emit additional consistency metadata. Only valid for Kafka sinks. If reuse_topic is true, a default consistency_topic will be used when not explicitly set. The default consistency topic name is formed by appending -consistency to the output topic name.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
acks text Sets the number of Kafka replicas that must acknowledge Materialize writes. Accepts values [-1,1000]. -1 (the default) specifies all replicas.

SSL WITH options

Use the following options to connect Materialize to an SSL-encrypted Kafka cluster. For more detail, see SSL-encrypted Kafka details.

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.

This option stores the password in Materialize’s on-disk catalog. For an alternative, use ssl_key_password_env.
ssl_key_password_env text Use the value stored in the named environment variable as the value for ssl_key_password.

This option does not store the password on-disk in Materialize’s catalog, but requires the environment variable’s presence to boot Materialize.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Kerberos WITH options

Use the following options to connect Materialize using SASL.

For more detail, see Kerberized Kafka details.

Field Value Description
sasl_mechanisms text The SASL mechanism to use for authentication. Currently, the only supported mechanisms are GSSAPI (the default) and PLAIN.
sasl_username text Required if sasl_mechanisms is PLAIN.
sasl_password text Your SASL password, if any. Required if sasl_mechanisms is PLAIN.

This option stores the password in Materialize’s on-disk catalog. For an alternative, use sasl_password_env.
sasl_password_env text Use the value stored in the named environment variable as the value for sasl_password.

This option does not store the password on-disk in Materialize’s catalog, but requires
the environment variable’s presence to boot Materialize.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Format WITH options

The following options are valid within the Kafka connector’s WITH clause.

Field Value type Description
username text The Kafka username.
password text The Kafka password.

WITH SNAPSHOT or WITHOUT SNAPSHOT

By default, each SINK is created with a SNAPSHOT which contains the consolidated results of the query before the sink was created. Any further updates to these results are produced at the time when they occur. To only see results after the sink is created, specify WITHOUT SNAPSHOT.

Detail

Debezium envelope details

The Debezium envelope provides a “diff envelope”, which describes the decoded records’ old and new values; this is roughly equivalent to the notion of Change Data Capture, or CDC. Materialize can write the data in this diff envelope to represent inserts, updates, or deletes to the underlying source or view being written for the sink.

Format implications

Using the Debezium envelope changes the schema of your Avro-encoded data to include something akin to the following field:

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
        "name": "before",
        "type": [
            {
            "name": "row",
            "type": "record",
            "fields": [
                {"name": "a", "type": "long"},
                {"name": "b", "type": "long"}
            ]
            },
            "null"
        ]
        },
        { "name": "after", "type": ["row", "null"] }
    ]
}

Note that:

Kafka sinks

When creating sinks, Materialize will either reuse the last sink topic (if reuse_topic is true) or it will generate a new topic name using the format below.

{topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}

If the topic does not exist, Materialize will use the Kafka Admin API to create the topic.

For Avro-encoded sinks, Materialize will publish the sink’s Avro schema to the Confluent Schema Registry. Materialize will not publish schemas for JSON-encoded sinks.

Note: With reuse_topic enabled, this schema for topic naming is ignored. Instead, the topic name specified in the sink definition is used as is.

You can find the topic name for each Kafka sink by querying mz_kafka_sinks.

Note: Dropping a Kafka sink doesn’t drop the corresponding topic. Recreating the sink will result in a new Kafka topic. For information about dropping Kafka topics, please see the Kafka documentation.

Enabling topic reuse after restart (exactly-once sinks)

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee. Available since v0.9.0.

By default, Materialize creates new, distinct topics for sinks after each restart. To enable the reuse of the existing topic instead and provide exactly-once processing guarantees, Materialize must be able to do two things:

This allows for exactly-once stream processing, meaning that each incoming event affects the final results only once, even if the stream is disrupted or Materialize is restarted.

Exactly-once stream processing is currently available only for Kafka sources and the views based on them.

When you create a sink, you must:

Additionally, the sink consistency topic cannot be written to by any other process, including another Materialize instance or another sink.

Because this feature is still in beta, we strongly suggest that you start with test data, rather than with production. Please escalate any issues to us.

Consistency metadata

When requested, Materialize will send consistency metadata that describes timestamps (also called transaction IDs) and relates the change data stream to them.

Materialize sends two main pieces of information:

Materialize uses a simplified version of the Debezium transaction metadata protocol to send this information. The generated diff envelope schema used for data messages is decorated with a transaction field which has the following schema.

{
    "name": "transaction",
    "type": {
        "type": "record",
        "name": "transaction_metadata",
        "fields": [
            {
                "name": "id",
                "type": "string"
            }
        ]
    }
}

Each message sent to Kafka has a transaction field, along with a transaction_id, in addition to it’s regular before / after data fields. The transaction_id is equivalent to the Materialize timestamp for this record.

In addition to the inline information, Materialize creates a new “consistency topic” that stores counts of how many changes were generated per transaction_id. This consistency topic is named using the format below.

{consistency_topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}

Note: With reuse_topic enabled, this schema for topic naming is ignored. Instead, the topic name specified via the consistency_topic option is used as is.

Each message in the consistency topic has the schema below.

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
            "name": "id",
            "type": "string"
        },
        {
            "name": "status",
            "type": "string"
        },
        {
            "name": "event_count",
            "type": [
                null,
                "long"
            ]
        },
        {
            "name": "data_collections",
            "type": [
                "null",
                {
                    "type": "array",
                    "items": {
                        "type": "record",
                        "name": "data_collection",
                        "fields": [
                            {
                                "name": "data_collection",
                                "type": "string"
                            },
                            {
                                "name": "event_count",
                                "type": "long"
                            },
                        ]
                    }
                }
            ],
            "default": null,
        }
    ]
}
Field Use
id The transaction id this record refers to.
status Either BEGIN or END. Materialize sends a record with BEGIN the first time it writes a data message for id, and it sends a END record after it has written all data messages for id.
event_count This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id.
data_collections This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id and collection.
Consistency information details

Avro OCF sinks

When creating Avro Object Container File (OCF) sinks, Materialize creates a new sink OCF and appends the Avro schema data in its header. Materialize names the new file using the format below.

{path.base_directory}-{path.file_stem}-{sink_global_id}-{materialize-startup_time}-{nonce}-{path.file_extension}

You can query mz_avro_ocf_sinks to get file name information for each Avro OCF sink. Look here for a more concrete example.

Examples

Avro sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

With topic reuse enabled after restart

CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost:9092' TOPIC 'quotes-eo-sink'
WITH (reuse_topic=true, consistency_topic='quotes-eo-sink-consistency')
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

Get actual Kafka topic names

SELECT sink_id, name, topic
FROM mz_sinks
JOIN mz_kafka_sinks ON mz_sinks.id = mz_kafka_sinks.sink_id
  sink_id  |              name                    |                        topic
-----------+--------------------------------------+------------------------------------------------------
 u5        | materialize.public.quotes_sink       | quotes-sink-u6-1586024632-15401700525642547992
 u6        | materialize.public.frank_quotes_sink | frank-quotes-sink-u5-1586024632-15401700525642547992

Avro OCF sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO AVRO OCF '/path/to/sink-file.ocf;'

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO AVRO OCF '/path/to/frank-sink-file.ocf;'

Get actual file names

Materialize stores the actual path as a byte array so we need to use the convert_from function to convert it to a UTF-8 string.

SELECT sink_id, name, convert_from(path, 'utf8')
FROM mz_sinks
JOIN mz_avro_ocf_sinks ON mz_sinks.id = mz_avro_ocf_sinks.sink_id
 sink_id   |        name       |                           path
-----------+-------------------+----------------------------------------------------------------
 u10       | quotes_sink       | /path/to/sink-file-u10-1586108399-8671224166353132585.ocf
 u11       | frank_quotes_sink | /path/to/frank-sink-file-u11-1586108399-8671224166353132585.ocf

JSON sinks

From sources

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT JSON;

With topic reuse enabled after restart

CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost:9092' TOPIC 'quotes-eo-sink'
    CONSISTENCY TOPIC 'quotes-eo-sink-consistency'
    CONSISTENCY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081'
WITH (reuse_topic=true)
FORMAT JSON;

From materialized views

CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
    CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
    SELECT * FROM quotes
    WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT JSON;
Did this info help?
Yes No