CREATE SINK: Kafka

CREATE SINK connects Materialize to an external system you want to write data to, and provides details about how to encode that data.

To use a Kafka broker (and optionally a schema registry) as a sink, make sure that a connection that specifies access and authentication parameters to that broker already exists; otherwise, you first need to create a connection. Once created, a connection is reusable across multiple CREATE SINK and CREATE SOURCE statements.

NOTE: The same syntax, supported formats and features can be used to connect to a Redpanda broker.
Sink source type Description
Source Simply pass all data received from the source to the sink without modifying it.
Table Stream all changes to the specified table out to the sink.
Materialized view Stream all changes to the view to the sink. This lets you use Materialize to process a stream, and then stream the processed values. Note that this feature only works with materialized views, and does not work with non-materialized views.

Syntax

CREATE SINK IF NOT EXISTS sink_name IN CLUSTER cluster_name FROM item_name INTO kafka_sink_connection KEY ( key_column , ) FORMAT sink_format_spec ENVELOPE DEBEZIUM UPSERT WITH with_options

sink_format_spec

AVRO USING csr_connection JSON

kafka_sink_connection

KAFKA CONNECTION connection_name ( TOPIC topic , connection_option )

csr_connection

CONFLUENT SCHEMA REGISTRY CONNECTION connection_name ( , connection_option )
Field Use
IF NOT EXISTS If specified, do not generate an error if a sink of the same name already exists.

If not specified, throw an error if a sink of the same name already exists. (Default)
sink_name A name for the sink. This name is only used within Materialize.
IN CLUSTER cluster_name The cluster to maintain this sink. If not specified, the SIZE option must be specified.
item_name The name of the source, table or materialized view you want to send to the sink.
CONNECTION connection_name The name of the connection to use in the sink. For details on creating connections, check the CREATE CONNECTION documentation page.
KEY ( key_column ) An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset.
ENVELOPE DEBEZIUM The generated schemas have a Debezium-style diff envelope to capture changes in the input view or source.
ENVELOPE UPSERT The sink emits data with upsert semantics: updates and inserts for the given key are expressed as a value, and deletes are expressed as a null value payload in Kafka. For more detail, see Handling upserts.

CONNECTION options

Field Value Description
TOPIC text The prefix used to generate the Kafka topic name to create and write to.

CSR CONNECTION options

Field Value Description
AVRO KEY FULLNAME text Default: row. Sets the Avro fullname on the generated key schema, if a KEY is specified. When used, a value must be specified for AVRO VALUE FULLNAME.
AVRO VALUE FULLNAME text Default: envelope. Sets the Avro fullname on the generated value schema. When KEY is specified, AVRO KEY FULLNAME must additionally be specified.

WITH options

Field Value Description
SNAPSHOT bool Default: true. Whether to emit the consolidated results of the query before the sink was created at the start of the sink. To see only results after the sink is created, specify WITH (SNAPSHOT = false).
SIZE text The size for the sink. Accepts values: 3xsmall, 2xsmall, xsmall, small, medium, large, xlarge. Required if the IN CLUSTER option is not specified.

Supported formats

Format
[Upsert envelope] [Debezium envelope]
[Avro]
[JSON]

Avro namespaces

For Avro-formatted sinks, you can specify the fullnames for the Avro schemas Materialize generates using the AVRO KEY FULLNAME and AVRO VALUE FULLNAME syntax.

Features

Handling upserts

To create a sink that uses the standard key-value convention to support inserts, updates, and deletes in the sink topic, you can use ENVELOPE UPSERT:

CREATE SINK avro_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_avro_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT
  WITH (SIZE = '3xsmall');

Exactly-once processing

By default, Kafka sinks provide exactly-once processing guarantees, which ensures that messages are not duplicated or dropped in failure scenarios.

To achieve this, Materialize stores some internal metadata in an additional progress topic. This topic is shared among all sinks that use a particular Kafka connection. The name of the progress topic can be specified when creating a connection; otherwise, a default is chosen based on the Materialize environment id and the connection id. In either case, Materialize will attempt to create the topic if it does not exist. The contents of this topic are not user-specified.

End-to-end exactly-once processing

Exactly-once semantics are an end-to-end property of a system, but Materialize only controls the initial produce step. To ensure end-to-end exactly-once message delivery, you should ensure that:

  • The broker is configured with replication factor greater than 3, with unclean leader election disabled (unclean.leader.election.enable=false).
  • All downstream consumers are configured to only read committed data (isolation.level=read_committed).
  • The consumers' processing is idempotent, and offsets are only committed when processing is complete.

For more details, see the Kafka documentation.

Examples

Creating a connection

A connection describes how to connect and authenticate to an external system you want Materialize to write data to.

Once created, a connection is reusable across multiple CREATE SINK statements. For more details on creating connections, check the CREATE CONNECTION documentation page.

Broker

CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';
CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'rp-f00000bar.data.vectorized.cloud:30365',
    SSL KEY = SECRET kafka_ssl_key,
    SSL CERTIFICATE = SECRET kafka_ssl_crt
);
CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
);

Confluent Schema Registry

CREATE SECRET csr_ssl_crt AS '<CSR_SSL_CRT>';
CREATE SECRET csr_ssl_key AS '<CSR_SSL_KEY>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_ssl TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://rp-f00000bar.data.vectorized.cloud:30993',
    SSL KEY = SECRET csr_ssl_key,
    SSL CERTIFICATE = SECRET csr_ssl_crt,
    USERNAME = 'foo',
    PASSWORD = SECRET csr_password
);
CREATE SECRET IF NOT EXISTS csr_username AS '<CSR_USERNAME>';
CREATE SECRET IF NOT EXISTS csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_basic_http
  FOR CONFLUENT SCHEMA REGISTRY
  URL '<CONFLUENT_REGISTRY_URL>',
  USERNAME = SECRET csr_username,
  PASSWORD = SECRET csr_password;

Creating a sink

When sinking into Kafka, Materialize will write all the changes from the specified source, table, or materialized view into the sink topic. If the topic does not exist, Materialize will attempt to create it.

NOTE: Dropping a Kafka sink doesn’t drop the corresponding topic. For more information, see the Kafka documentation.
CREATE SINK avro_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_avro_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE DEBEZIUM
  WITH (SIZE = '3xsmall');
CREATE SINK json_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_json_topic')
  FORMAT JSON
  ENVELOPE DEBEZIUM
  WITH (SIZE = '3xsmall');

Sizing a sink

To provision a specific amount of CPU and memory to a sink on creation, use the SIZE option:

CREATE SINK avro_sink
  FROM <source, table or mview>
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'test_avro_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE DEBEZIUM
  WITH (SIZE = '3xsmall');

To resize the sink after creation:

ALTER SINK avro_sink SET (SIZE = 'large');

The smallest sink size (3xsmall) is a resonable default to get started. For more details on sizing sources, check the CREATE SINK documentation page.

Back to top ↑