CREATE SINK
CREATE SINK
sends data from Materialize to an external sink.
Conceptual framework
Sinks let you stream data out of Materialize, using either sources or views.
Sink source type | Description |
---|---|
Source | Simply pass all data received from the source to the sink without modifying it. |
Materialized view | Stream all changes to the view to the sink. This lets you use Materialize to process a stream, and then stream the processed values. Note that this feature only works with materialized views, and does not work with non-materialized views. |
Syntax
sink_kafka_connector
sink_format_spec
consistency_format_spec
Field | Use |
---|---|
IF NOT EXISTS | If specified, do not generate an error if a sink of the same name already exists. If not specified, throw an error if a sink of the same name already exists. (Default) |
sink_name | A name for the sink. This name is only used within Materialize. |
item_name | The name of the source or view you want to send to the sink. |
KAFKA BROKER host | The Kafka broker’s host name without the security protocol, which is specified by the WITH options). If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093 . |
TOPIC topic_prefix | The prefix used to generate the Kafka topic name to create and write to. |
KEY ( key_column ) | An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset. New in v0.5.1. |
TOPIC consistency_topic | Makes the sink emit additional consistency metadata to the named topic. Only valid for Kafka sinks. If reuse_topic is true , a default naming convention will be used when the topic name is not explicitly set. This is formed by appending -consistency to the output topic name. Available only in unstable builds. |
AVRO OCF path | The absolute path and file name of the Avro Object Container file (OCF) to create and write to. The filename will be modified to let Materialize create a unique file each time Materialize starts, but the file extension will not be modified. You can find more details here. |
sink_with_options | Options affecting sink creation. For more detail, see WITH options. |
with_options | Options affecting Materialize’s connection to Kafka. For more detail, see Authentication. |
ENVELOPE DEBEZIUM | The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source. This is the default. |
ENVELOPE UPSERT | The sink emits data with upsert semantics: updates and inserts for the given key are expressed as a value, and deletes are expressed as a null value payload in Kafka. For more detail, see Handling upserts. |
Changed in v0.7.1:
The AS OF
option was removed.
WITH
options
The following options are valid within the WITH
clause.
Field | Value type | Description |
---|---|---|
partition_count |
int |
Set the sink Kafka topic’s partition count. This defaults to -1 (use the broker default). |
replication_factor |
int |
Set the sink Kafka topic’s replication factor. This defaults to -1 (use the broker default). |
reuse_topic |
bool |
Use the existing Kafka topic after Materialize restarts, instead of creating a new one. The default is false. See Enabling topic reuse after restart for details. |
consistency_topic |
text |
This option is only available to support backwards-compatibility. Please use the new CONSISTENCY syntax to define a consistency topic for the sink. |
security_protocol |
text |
Use ssl or, for Kerberos, sasl_plaintext , sasl-scram-sha-256 , or sasl-sha-512 to connect to the Kafka cluster. |
acks |
text |
Sets the number of Kafka replicas that must acknowledge Materialize writes. Accepts values [-1,1000]. -1 (the default) specifies all replicas. |
retention_ms |
long |
Sets the maximum time Kafka will retain a log. Accepts values [-1, …]. -1 specifics no time limit. If not set, uses the broker default. New in v0.9.7. |
retention_bytes |
long |
Sets the maximum size a Kafka partition can grow before removing old logs. Accepts values [-1, …]. -1 specifics no size limit. If not set, uses the broker default. New in v0.9.7. |
avro_key_fullname |
text |
Sets the Avro fullname on the generated key schema, if a KEY is specified. When used, a value must be specified for avro_value_fullname . The default fullname is row . New in v0.18.0. |
avro_value_fullname |
text |
Sets the Avro fullname on the generated value schema. When KEY is specified, avro_key_fullname must additionally be specified. The default fullname is envelope . New in v0.18.0. |
Authentication
Kafka sinks support the same authentication scheme and options as Kafka sources (SSL
, SASL
).
Check the Kafka source documentation for more details and examples.
WITH SNAPSHOT
or WITHOUT SNAPSHOT
By default, each SINK
is created with a SNAPSHOT
which contains the consolidated results of the
query before the sink was created. Any further updates to these results are produced at the time when
they occur. To only see results after the sink is created, specify WITHOUT SNAPSHOT
.
Detail
- Materialize currently only supports the following sink formats:
- Avro-formatted sinks that write to either a topic or an Avro object container file.
- JSON-formatted sinks that write to a topic.
- For most sinks, Materialize creates new, distinct topics and files for each sink on restart. A beta feature enables the use of the same topic after restart. For details, see Exactly-once sinks.
- Materialize stores information about actual topic names and actual file names in the
mz_kafka_sinks
andmz_avro_ocf_sinks
log sources. See the examples below for more details. - For Avro-formatted sinks, Materialize generates Avro schemas for views and sources that are stored in the sink. If needed, the fullnames for these schemas can be specified with the
avro_key_fullname
andavro_value_fullname
options. - Materialize can also optionally emit transaction information for changes. This is only supported for Kafka sinks and adds transaction information inline with the data, and adds a separate transaction metadata topic.
Debezium envelope details
The Debezium envelope provides a “diff envelope”, which describes the decoded records' old and new values; this is roughly equivalent to the notion of Change Data Capture, or CDC. Materialize can write the data in this diff envelope to represent inserts, updates, or deletes to the underlying source or view being written for the sink.
Format implications
Using the Debezium envelope changes the schema of your Avro-encoded data to include something akin to the following field:
{
"type": "record",
"name": "envelope",
"fields": [
{
"name": "before",
"type": [
{
"name": "row",
"type": "record",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": "long"}
]
},
"null"
]
},
{ "name": "after", "type": ["row", "null"] }
]
}
Note that:
- You don’t need to manually create this schema. Materialize generates it for you.
- The following section depends on the column’s names and types, and is unlikely
to match our example:
... "fields": [ {"name": "a", "type": "long"}, {"name": "b", "type": "long"} ] ...
Kafka sinks
When creating sinks, Materialize will either reuse the last sink topic (if reuse_topic
is true
) or it will generate a new topic name using the format below.
{topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}
If the topic does not exist, Materialize will use the Kafka Admin API to create the topic.
For Avro-encoded sinks, Materialize will publish the sink’s Avro schema to the Confluent Schema Registry. Materialize will not publish schemas for JSON-encoded sinks.
You can find the topic name for each Kafka sink by querying mz_kafka_sinks
.
Exactly-once sinks (with topic reuse after restart)
New in v0.9.0.
By default, Materialize creates new, distinct topics for sinks on restart. To enable the reuse of an existing topic instead and achieve exactly-once processing guarantees, Materialize must:
-
Determine the point in time where it left off processing
Each sink writes an additional consistency topic with useful metadata that allows Materialize to keep track of the timestamps it has published data for. Following a crash, this consistency topic is used to determine the latest complete timestamp for a sink and resume processing.
-
Reconstruct the history of the sink and all the objects it depends on
For each source, timestamp/offset bindings (i.e. which timestamps were assigned to which offsets) are persisted to the on-disk system catalog. This ensures that Materialize can preserve correctness on restart and avoid publishing duplicate data to the sink.
NOTE: For some deployment setups, you may need to persist the system catalog to stable storage across restarts. Without that metadata, new timestamps will be reassigned to existing offsets and data will be republished to the sink.
In practice, each incoming event affects the final results exactly once, even if the stream is disrupted or Materialize is restarted. Because the implementation effectively relies on source events having replayable timestamps, only Kafka sources and materialized views defined on top of Kafka sources can be used in this context.
Syntax
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost:9092' TOPIC 'quotes-eo-sink'
CONSISTENCY (TOPIC 'quotes-eo-sink-consistency'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081')
WITH (reuse_topic=true)
FORMAT JSON;
To create a sink with exactly-once processing guarantees, you need to:
- Set the
reuse_topic
option totrue
; - Optionally name the consistency topic. This name must be unique across all sinks in the Materialize instance. If not specified, a default name will be created by appending
-consistency
to the sink topic name. - Specify the format of the consistency topic. Only Avro is supported. If you are using a JSON-formatted sink, you must use Avro as well (at least for now!).
Note that:
- The sink consistency topic shouldn’t be written to by any other process, including other sinks or Materialize instances;
- Key-based compaction is supported for the consistency topic and can be useful for controlling the growth of the topic.
This feature is still in beta, so we strongly recommend that you start with test data, rather than with production. Please let us know if you run into any issues!
Consistency metadata
When requested, Materialize will produce consistency metadata that describes timestamps and relates the change data stream to them.
Materialize sends two main pieces of information:
- A timestamp for each change. This is sent inline with the change itself.
- A count of how many changes occurred for each timestamp. This is sent as part of a separate consistency topic.
- A count of how many changes occurred for each collection covered by the topic. In Materialize, this will always correspond to the above count as each topic uniquely maps to a single collection.
Materialize uses a simplified version of the Debezium transaction metadata protocol to send this information.
The generated diff envelope schema used for data messages is decorated with a transaction
field which has the following schema:
{
"name": "transaction",
"type": {
"type": "record",
"name": "transaction_metadata",
"fields": [
{
"name": "id",
"type": "string"
}
]
}
}
Each message sent to Kafka has a transaction
field and a transaction id
, in addition to it’s regular before
/ after
data fields. The transaction id
is equivalent to the Materialize timestamp for this record.
In addition to the inline information, Materialize creates an additional consistency topic that stores counts of how many changes were generated per transaction id
. The name of the consistency topic uses the following convention:
{consistency_topic_prefix}-{sink_global_id}-{materialize-startup-time}-{nonce}
reuse_topic
is enabled, the naming convention is ignored. Instead, the topic name specified via the CONSISTENCY
option is used.
Each message in the consistency topic has the schema below.
{
"type": "record",
"name": "envelope",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "status",
"type": "string"
},
{
"name": "event_count",
"type": [
null,
"long"
]
},
{
"name": "data_collections",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "data_collection",
"fields": [
{
"name": "data_collection",
"type": "string"
},
{
"name": "event_count",
"type": "long"
},
]
}
}
],
"default": null,
}
]
}
Field | Use |
---|---|
id | The transaction id this record refers to. |
status | Either BEGIN or END . Materialize sends a record with BEGIN the first time it writes a data message for id , and an END record after it has written all data messages for id . |
event_count | This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id . |
data_collections | This field is null for BEGIN records, and for END records it contains the number of messages Materialize wrote for that id and collection. |
Consistency information details
- Materialize writes consistency output to a different topic per sink.
- There are no ordering guarantees on transaction
id
in the consistency topic. - Multiple transactions can be interleaved in the consistency topic, so it’s possible that some
ids
don’t have a correspondingBEGIN
orEND
record.
Avro OCF sinks
When creating Avro Object Container File (OCF) sinks, Materialize creates a new sink OCF and appends the Avro schema data in its header. Materialize names the new file using the format below.
{path.base_directory}-{path.file_stem}-{sink_global_id}-{materialize-startup_time}-{nonce}-{path.file_extension}
You can query mz_avro_ocf_sinks
to get file name information for each Avro OCF sink. Look here for a more concrete example.
Examples
Avro sinks
From sources
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
From materialized views
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
SELECT * FROM quotes
WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
Get actual Kafka topic names
SELECT sink_id, name, topic
FROM mz_sinks
JOIN mz_kafka_sinks ON mz_sinks.id = mz_kafka_sinks.sink_id
sink_id | name | topic
-----------+--------------------------------------+------------------------------------------------------
u5 | materialize.public.quotes_sink | quotes-sink-u6-1586024632-15401700525642547992
u6 | materialize.public.frank_quotes_sink | frank-quotes-sink-u5-1586024632-15401700525642547992
Avro OCF sinks
From sources
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO AVRO OCF '/path/to/sink-file.ocf;'
From materialized views
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
SELECT * FROM quotes
WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO AVRO OCF '/path/to/frank-sink-file.ocf;'
Get actual file names
Materialize stores the actual path as a byte array so we need to use the convert_from
function to convert it to a UTF-8 string.
SELECT sink_id, name, convert_from(path, 'utf8')
FROM mz_sinks
JOIN mz_avro_ocf_sinks ON mz_sinks.id = mz_avro_ocf_sinks.sink_id
sink_id | name | path
-----------+-------------------+----------------------------------------------------------------
u10 | quotes_sink | /path/to/sink-file-u10-1586108399-8671224166353132585.ocf
u11 | frank_quotes_sink | /path/to/frank-sink-file-u11-1586108399-8671224166353132585.ocf
JSON sinks
From sources
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE SINK quotes_sink
FROM quotes
INTO KAFKA BROKER 'localhost' TOPIC 'quotes-sink'
FORMAT JSON;
From materialized views
CREATE SOURCE quotes
FROM KAFKA BROKER 'localhost' TOPIC 'quotes'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';
CREATE MATERIALIZED VIEW frank_quotes AS
SELECT * FROM quotes
WHERE attributed_to = 'Frank McSherry';
CREATE SINK frank_quotes_sink
FROM frank_quotes
INTO KAFKA BROKER 'localhost' TOPIC 'frank-quotes-sink'
FORMAT JSON;