CREATE SOURCE: Kafka

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

To connect to a Kafka broker (and optionally a schema registry), you first need to create a connection that specifies access and authentication parameters. Once created, a connection is reusable across multiple CREATE SOURCE and CREATE SINK statements.

NOTE: The same syntax, supported formats and features can be used to connect to a Redpanda broker.

Syntax

CREATE SOURCE IF NOT EXISTS src_name ( col_name , ) IN CLUSTER cluster_name FROM KAFKA CONNECTION connection_name ( TOPIC topic , connection_option ) KEY FORMAT format_spec VALUE FORMAT FORMAT format_spec INCLUDE KEY PARTITION OFFSET TIMESTAMP HEADERS AS name HEADER key AS name BYTES , ENVELOPE NONE DEBEZIUM UPSERT ( VALUE DECODING ERRORS = INLINE AS name ) EXPOSE PROGRESS AS progress_subsource_name with_options

format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION connection_name key_strat val_strat with_options PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION connection_name with_options MESSAGE message_name USING SCHEMA encoded_schema REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES JSON

key_strat

KEY STRATEGY strat

val_strat

VALUE STRATEGY strat

strat

INLINE avro_reader_schema ID schema_registry_id LATEST

with_options

WITH ( RETAIN HISTORY = FOR retention_period )
Field Use
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
IN CLUSTER cluster_name The cluster to maintain this source.
CONNECTION connection_name The name of the Kafka connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page.
INCLUDE KEY Include a column containing the Kafka message key. If the key is encoded using a format that includes schemas the column will take its name from the schema. For unnamed formats (e.g. TEXT), the column will be named key. The column can be renamed with the optional AS name statement.
INCLUDE PARTITION Include a partition column containing the Kafka message partition. The column can be renamed with the optional AS name clause.
INCLUDE OFFSET Include an offset column containing the Kafka message offset. The column can be renamed with the optional AS name clause.
INCLUDE TIMESTAMP Include a timestamp column containing the Kafka message timestamp. The column can be renamed with the optional AS name clause.

Note that the timestamp of a Kafka message depends on how the topic and its producers are configured. See the Confluent documentation for details.
INCLUDE HEADERS Include a headers column containing the Kafka message headers as a list of records of type (key text, value bytea). The column can be renamed with the optional AS name clause.
INCLUDE HEADER key AS name [BYTES] Include a name column containing the Kafka message header key parsed as a UTF-8 string. To expose the header value as bytea, use the BYTES option.
ENVELOPE DEBEZIUM Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. For more information, see Using Debezium.
ENVELOPE UPSERT [(VALUE DECODING ERRORS = INLINE)] Use the upsert envelope, which uses message keys to handle CRUD operations. To handle value decoding errors, use the (VALUE DECODING ERRORS = INLINE) option. For more information, see Handling upserts and Value decoding errors.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
EXPOSE PROGRESS AS progress_subsource_name The name of the progress subsource for the source. If this is not specified, the subsource will be named <src_name>_progress. For more information, see Monitoring source progress.
WITH ( option_list ) Options affecting source creation. For all available options, see with_options.

CONNECTION options

Field Value Description
TOPIC text The Kafka topic you want to subscribe to.
GROUP ID PREFIX text The prefix of the consumer group ID to use. See Monitoring consumer lag.
Default: materialize-{REGION-ID}-{CONNECTION-ID}-{SOURCE_ID}
RETAIN HISTORY FOR
retention_period
Private preview. This option has known performance or stability issues and is under active development. Duration for which Materialize retains historical data, which is useful to implement durable subscriptions. Accepts positive interval values (e.g. '1hr'). Default: 1s.

Supported formats

Format
Append-only envelope Upsert envelope Debezium envelope
Avro
JSON
Protobuf
Text/bytes
CSV

Key-value encoding

By default, the message key is decoded using the same format as the message value. However, you can set the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT syntax.

Features

Handling upserts

To create a source that uses the standard key-value convention to support inserts, updates, and deletes within Materialize, you can use ENVELOPE UPSERT:

CREATE SOURCE kafka_upsert
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Note that:

  • Using this envelope is required to consume log compacted topics.

  • This envelope can lead to high memory and disk utilization in the cluster maintaining the source. We recommend using a standard-sized cluster, rather than a legacy-sized cluster, to automatically spill the workload to disk. See spilling to disk for details.

Null keys

If a message with a NULL key is detected, Materialize sets the source into an error state. To recover an errored source, you must produce a record with a NULL value and a NULL key to the topic, to force a retraction.

As an example, you can use kcat to produce an empty message:

echo ":" | kcat -b $BROKER -t $TOPIC -Z -K: \
  -X security.protocol=SASL_SSL \
  -X sasl.mechanisms=SCRAM-SHA-256 \
  -X sasl.username=$KAFKA_USERNAME \
  -X sasl.password=$KAFKA_PASSWORD

Value decoding errors

PREVIEW This feature is in private preview. It is under active development and may have stability or performance issues. It isn't subject to our backwards compatibility guarantees.

To enable this feature in your Materialize region, contact our team.

By default, if an error happens while decoding the value of a message for a specific key, Materialize sets the source into an error state. You can configure the source to continue ingesting data in the presence of value decoding errors using the VALUE DECODING ERRORS = INLINE option:

CREATE SOURCE kafka_upsert
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
  KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT (VALUE DECODING ERRORS = INLINE);

When this option is specified the source will include an additional column named error with type record(description: text).

This column and all value columns will be nullable, such that if the most recent value for the given Kafka message key cannot be decoded, this error column will contain the error message. If the most recent value for a key has been successfully decoded, this column will be NULL.

To use an alternative name for the error column, use INLINE AS .. to specify the column name to use:

ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE AS my_error_col))

It might be convenient to implement a parsing view on top of your Kafka upsert source that excludes keys with decoding errors:

CREATE VIEW kafka_upsert_parsed
SELECT *
FROM kafka_upsert
WHERE error IS NULL;

Using Debezium

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you're interested in JSON support, please reach out in the community Slack or submit a feature request.

Materialize provides a dedicated envelope (ENVELOPE DEBEZIUM) to decode Kafka messages produced by Debezium. To create a source that interprets Debezium messages:

CREATE SOURCE kafka_repl
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'pg_repl.public.table1')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE DEBEZIUM;

Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT, UPDATE and DELETE operations in the original database.

For more details and a step-by-step guide on using Kafka+Debezium for Change Data Capture (CDC), check Using Debezium.

Note that:

  • This envelope can lead to high memory utilization in the cluster maintaining the source. Materialize can automatically offload processing to disk as needed. See spilling to disk for details.

Spilling to disk

Kafka sources that use ENVELOPE UPSERT or ENVELOPE DEBEZIUM require storing the current value for each key in the source to produce retractions when keys are updated. When using standard cluster sizes, Materialize will automatically offload this state to disk, seamlessly handling key spaces that are larger than memory.

Spilling to disk is not available with legacy cluster sizes.

Exposing source metadata

In addition to the message value, Materialize can expose the message key, headers and other source metadata fields to SQL.

Key

The message key is exposed via the INCLUDE KEY option. Composite keys are also supported.

CREATE SOURCE kafka_metadata
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
  KEY FORMAT TEXT
  VALUE FORMAT TEXT
  INCLUDE KEY AS renamed_id;

Note that:

  • This option requires specifying the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT syntax.

  • The UPSERT envelope always includes keys.

  • The DEBEZIUM envelope is incompatible with this option.

Headers

Message headers can be retained in Materialize and exposed as part of the source data.

Note that:

  • The DEBEZIUM envelope is incompatible with this option.

All headers

All of a message’s headers can be exposed using INCLUDE HEADERS, followed by an AS <header_col>.

This introduces column with the name specified or headers if none was specified. The column has the type record(key: text, value: bytea?) list, i.e. a list of records containing key-value pairs, where the keys are text and the values are nullable byteas.

CREATE SOURCE kafka_metadata
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  INCLUDE HEADERS
  ENVELOPE NONE;

To simplify turning the headers column into a map (so individual headers can be searched), you can use the map_build function:

SELECT
    id,
    seller,
    item,
    convert_from(map_build(headers)->'client_id', 'utf-8') AS client_id,
    map_build(headers)->'encryption_key' AS encryption_key,
FROM kafka_metadata;

 id | seller |        item        | client_id |    encryption_key
----+--------+--------------------+-----------+----------------------
  2 |   1592 | Custom Art         |        23 | \x796f75207769736821
  3 |   1411 | City Bar Crawl     |        42 | \x796f75207769736821

Individual headers

Individual message headers can be exposed via the INCLUDE HEADER key AS name option.

The bytea value of the header is automatically parsed into an UTF-8 string. To expose the raw bytea instead, the BYTES option can be used.

CREATE SOURCE kafka_metadata
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  INCLUDE HEADER 'c_id' AS client_id, HEADER 'key' AS encryption_key BYTES,
  ENVELOPE NONE;

Headers can be queried as any other column in the source:

SELECT
    id,
    seller,
    item,
    client_id::numeric,
    encryption_key
FROM kafka_metadata;

 id | seller |        item        | client_id |    encryption_key
----+--------+--------------------+-----------+----------------------
  2 |   1592 | Custom Art         |        23 | \x796f75207769736821
  3 |   1411 | City Bar Crawl     |        42 | \x796f75207769736821

Note that:

  • Messages that do not contain all header keys as specified in the source DDL will cause an error that prevents further querying the source.

  • Header values containing badly formed UTF-8 strings will cause an error in the source that prevents querying it, unless the BYTES option is specified.

Partition, offset, timestamp

These metadata fields are exposed via the INCLUDE PARTITION, INCLUDE OFFSET and INCLUDE TIMESTAMP options.

CREATE SOURCE kafka_metadata
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts
  ENVELOPE NONE;
SELECT "offset" FROM kafka_metadata WHERE ts > '2021-01-01';

offset
------
15
14
13

Setting start offsets

To start consuming a Kafka stream from a specific offset, you can use the START OFFSET option.

CREATE SOURCE kafka_offset
  FROM KAFKA CONNECTION kafka_connection (
    TOPIC 'data',
    -- Start reading from the earliest offset in the first partition,
    -- the second partition at 10, and the third partition at 100.
    START OFFSET (0, 10, 100)
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection;

Note that:

  • If fewer offsets than partitions are provided, the remaining partitions will start at offset 0. This is true if you provide START OFFSET (1) or START OFFSET (1, ...).

  • Providing more offsets than partitions is not supported.

Time-based offsets

It’s also possible to set a start offset based on Kafka timestamps, using the START TIMESTAMP option. This approach sets the start offset for each available partition based on the Kafka timestamp and the source behaves as if START OFFSET was provided directly.

It’s important to note that START TIMESTAMP is a property of the source: it will be calculated once at the time the CREATE SOURCE statement is issued. This means that the computed start offsets will be the same for all views depending on the source and stable across restarts.

If you need to limit the amount of data maintained as state after source creation, consider using temporal filters instead.

CONNECTION options

Field Value Description
START OFFSET int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Offset values must be zero or positive integers.
START TIMESTAMP int Use the specified value to set START OFFSET based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.

KEY STRATEGY and VALUE STRATEGY

It is possible to define how an Avro reader schema will be chosen for Avro sources by using the KEY STRATEGY and VALUE STRATEGY keywords, as shown in the syntax diagram.

A strategy of LATEST (the default) will choose the latest writer schema from the schema registry to use as a reader schema. ID or INLINE will allow specifying a schema from the registry by ID or inline in the CREATE SOURCE statement, respectively.

Monitoring source progress

By default, Kafka sources expose progress metadata as a subsource that you can use to monitor source ingestion progress. The name of the progress subsource can be specified when creating a source using the EXPOSE PROGRESS AS clause; otherwise, it will be named <src_name>_progress.

The following metadata is available for each source as a progress subsource:

Field Type Meaning
partition numrange The upstream Kafka partition.
offset uint8 The greatest offset consumed from each upstream Kafka partition.

And can be queried using:

SELECT
  partition, "offset"
FROM
  (
    SELECT
      -- Take the upper of the range, which is null for non-partition rows
      -- Cast partition to u64, which is more ergonomic
      upper(partition)::uint8 AS partition, "offset"
    FROM
      <src_name>_progress
  )
WHERE
  -- Remove all non-partition rows
  partition IS NOT NULL;

As long as any offset continues increasing, Materialize is consuming data from the upstream Kafka broker. For more details on monitoring source ingestion progress and debugging related issues, see Troubleshooting.

Monitoring consumer lag

To support Kafka tools that monitor consumer lag, Kafka sources commit offsets once the messages up through that offset have been durably recorded in Materialize’s storage layer.

However, rather than relying on committed offsets, Materialize suggests using our native progress monitoring, which contains more up-to-date information.

NOTE:

Some Kafka monitoring tools may indicate that Materialize’s consumer groups have no active members. This is not a cause for concern.

Materialize does not participate in the consumer group protocol nor does it recover on restart by reading the committed offsets. The committed offsets are provided solely for the benefit of Kafka monitoring tools.

Committed offsets are associated with a consumer group specific to the source. The ID of the consumer group consists of the prefix configured with the GROUP ID PREFIX option followed by a Materialize-generated suffix.

You should not make assumptions about the number of consumer groups that Materialize will use to consume from a given source. The only guarantee is that the ID of each consumer group will begin with the configured prefix.

The consumer group ID prefix for each Kafka source in the system is available in the group_id_prefix column of the mz_kafka_sources table. To look up the group_id_prefix for a source by name, use:

SELECT group_id_prefix
FROM mz_internal.mz_kafka_sources ks
JOIN mz_sources s ON s.id = ks.id
WHERE s.name = '<src_name>'

Required permissions

The access control lists (ACLs) on the Kafka cluster must allow Materialize to perform the following operations on the following resources:

Operation type Resource type Resource name
Read Topic The specified TOPIC option
Read Group All group IDs starting with the specified GROUP ID PREFIX option

Examples

Creating a connection

A connection describes how to connect and authenticate to an external system you want Materialize to read data from.

Once created, a connection is reusable across multiple CREATE SOURCE statements. For more details on creating connections, check the CREATE CONNECTION documentation page.

Broker

CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';
CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093',
    SSL KEY = SECRET kafka_ssl_key,
    SSL CERTIFICATE = SECRET kafka_ssl_crt
);
CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
);

If your Kafka broker is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS (
        'broker1:9092' USING AWS PRIVATELINK privatelink_svc,
        'broker2:9092' USING AWS PRIVATELINK privatelink_svc (PORT 9093)
    )
);

For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);
CREATE CONNECTION kafka_connection TO KAFKA (
BROKERS (
    'broker1:9092' USING SSH TUNNEL ssh_connection,
    'broker2:9092' USING SSH TUNNEL ssh_connection
    )
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.

Confluent Schema Registry

CREATE SECRET csr_ssl_crt AS '<CSR_SSL_CRT>';
CREATE SECRET csr_ssl_key AS '<CSR_SSL_KEY>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093',
    SSL KEY = SECRET csr_ssl_key,
    SSL CERTIFICATE = SECRET csr_ssl_crt,
    USERNAME = 'foo',
    PASSWORD = SECRET csr_password
);
CREATE SECRET IF NOT EXISTS csr_username AS '<CSR_USERNAME>';
CREATE SECRET IF NOT EXISTS csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
  URL '<CONFLUENT_REGISTRY_URL>',
  USERNAME = SECRET csr_username,
  PASSWORD = SECRET csr_password
);

If your Confluent Schema Registry server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
    URL 'http://my-confluent-schema-registry:8081',
    AWS PRIVATELINK privatelink_svc
);

For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
    URL 'http://my-confluent-schema-registry:8081',
    SSH TUNNEL ssh_connection
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.

Creating a source

Using Confluent Schema Registry

CREATE SOURCE avro_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection;
CREATE SOURCE json_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT JSON;
CREATE VIEW typed_kafka_source AS
  SELECT
    (data->>'field1')::boolean AS field_1,
    (data->>'field2')::int AS field_2,
    (data->>'field3')::float AS field_3
  FROM json_source;

JSON-formatted messages are ingested as a JSON blob. We recommend creating a parsing view on top of your Kafka source that maps the individual fields to columns with the required data types. To avoid doing this tedious task manually, you can use this JSON parsing widget!

Using Confluent Schema Registry

CREATE SOURCE proto_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection;

Using an inline schema

If you’re not using a schema registry, you can use the MESSAGE...SCHEMA clause to specify a Protobuf schema descriptor inline. Protobuf does not serialize a schema with the message, so before creating a source you must:

  • Compile the Protobuf schema into a descriptor file using protoc:

    // example.proto
    syntax = "proto3";
    message Batch {
        int32 id = 1;
        // ...
    }
    
    protoc --include_imports --descriptor_set_out=example.pb example.proto
    
  • Encode the descriptor file into a SQL byte string:

    $ printf '\\x' && xxd -p example.pb | tr -d '\n'
    \x0a300a0d62696...
    
  • Create the source using the encoded descriptor bytes from the previous step (including the \x at the beginning):

    CREATE SOURCE proto_source
      FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
      FORMAT PROTOBUF MESSAGE 'Batch' USING SCHEMA '\x0a300a0d62696...';
    

    For more details about Protobuf message names and descriptors, check the Protobuf format documentation.

CREATE SOURCE text_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT TEXT
  ENVELOPE UPSERT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT CSV WITH 3 COLUMNS;
Back to top ↑