Materialize Documentation
s
Join the Community github/materialize

CREATE SOURCE: Kafka

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

This page describes how to connect Materialize to a Kafka broker to read data from individual topics.

NOTE: The same syntax, supported formats and features can be used to connect to a Redpanda broker.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM KAFKA BROKER host TOPIC topic with_options KEY FORMAT format_spec VALUE FORMAT FORMAT format_spec INCLUDE KEY PARTITION OFFSET TIMESTAMP HEADERS AS name ENVELOPE NONE DEBEZIUM UPSERT UPSERT

format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE path PROTOBUF MESSAGE message_name USING SCHEMA FILE path USING CONFLUENT SCHEMA REGISTRY url with_options REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic The Kafka topic you want to subscribe to.
INCLUDE KEY Include a column containing the Kafka message key. If the key is encoded using a format that includes schemas the column will take its name from the schema. For unnamed formats (e.g. TEXT), the column will be named key. The column can be renamed with the optional AS name statement.
INCLUDE PARTITION Include a partition column containing the Kafka message partition. The column can be renamed with the optional AS name clause. New in v0.12.0.
INCLUDE OFFSET Include an offset column containing the Kafka message offset. The column can be renamed with the optional AS name clause. New in v0.12.0.
INCLUDE TIMESTAMP Include a timestamp column containing the Kafka message timestamp. The column can be renamed with the optional AS name clause. New in v0.12.0.

Note that the timestamp of a Kafka message depends on how the topic and its producers are configured. See the Confluent documentation for details.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Defining primary keys.
ENVELOPE DEBEZIUM Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. For more information, see Using Debezium.
ENVELOPE DEBEZIUM UPSERT Use the Debezium envelope with UPSERT enabled. This is required for Kafka sources with log compaction, but increases memory consumption.
ENVELOPE UPSERT Use the upsert envelope, which uses message keys to handle CRUD operations. For more information, see Handling upserts.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.

WITH options

Field Value Description
client_id text Use the supplied value as the Kafka client identifier.
group_id_prefix text Use the specified prefix in the consumer group ID. The resulting group.id looks like <group_id_prefix>materialize-X-Y, where X and Y are values that allow multiple concurrent Kafka consumers from the same topic.
ignore_source_keys boolean Default: false. If true, do not perform optimizations assuming uniqueness of primary keys in schemas.
isolation_level text Default: read_committed. Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted.
statistics_interval_ms int librdkafka statistics emit interval in ms. A value of 0 disables statistics. Statistics can be queried using the mz_kafka_source_statistics system table. Accepts values [0, 86400000].
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.
topic_metadata_refresh_interval_ms int Default: 300000. Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000].
enable_auto_commit boolean Default: false. Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts.
fetch_message_max_bytes int Default: 134217728. Controls the initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. Accepts values [1, 1000000000].

Supported formats

Format
Append-only envelope Upsert envelope Debezium envelope
Avro
JSON
Protobuf
Text/bytes
CSV

Key-value encoding

By default, the message key is decoded using the same format as the message value. However, you can set the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT syntax.

Features

Handling upserts

To create a source that uses the standard key-value convention to support inserts, updates, and deletes within Materialize, you can use ENVELOPE UPSERT:

CREATE SOURCE current_predictions
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'events'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
  ENVELOPE UPSERT;

Note that:

Defining primary keys

WARNING! Materialize will not enforce the constraint and will produce wrong results if it’s not unique.

Primary keys are automatically inferred for Kafka sources using the UPSERT or DEBEZIUM envelopes. For other source configurations, you can manually define a column (or set of columns) as a primary key using the PRIMARY KEY (...) NOT ENFORCED syntax. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

Using Debezium

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you're interested in JSON support, please reach out in the community Slack or leave a comment in this GitHub issue.

Materialize provides a dedicated envelope (ENVELOPE DEBEZIUM) to decode Kafka messages produced by Debezium. To create a source that interprets Debezium messages:

CREATE SOURCE kafka_repl
  FROM KAFKA BROKER 'kafka:9092' TOPIC 'pg_repl.public.table1'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
  ENVELOPE DEBEZIUM;

Note that:

Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT, UPDATE and DELETE operations in the original database.

For more details and a step-by-step guide on using Kafka+Debezium for Change Data Capture (CDC), check out Using Debezium.

Exposing source metadata

In addition to the message value, Materialize can expose the message key, headers and other source metadata fields to SQL.

Key

The message key is exposed via the INCLUDE KEY option. Composite keys are also supported (#7645).

CREATE SOURCE kafka_metadata
  FROM 'localhost:9092' TOPIC 'data'
  KEY FORMAT TEXT
  VALUE FORMAT TEXT
  INCLUDE KEY AS renamed_id;

Note that:

Headers

Message headers are exposed via the INCLUDE HEADERS option, and are included as a column (named headers by default) containing a list of (text, bytea) pairs.

CREATE SOURCE kafka_metadata
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
  INCLUDE HEADERS
  ENVELOPE NONE;

To retrieve the headers in a message, you can unpack the value:

SELECT key,
       field1,
       field2,
       headers[1].value AS kafka_header
FROM mv_kafka_metadata;

  key  |  field1  |  field2  |  kafka_header
-------+----------+----------+----------------
  foo  |  fooval  |   1000   |     hvalue
  bar  |  barval  |   5000   |     <null>

, or lookup by key:

SELECT key,
       field1,
       field2,
       thekey,
       value
FROM (SELECT key,
             field1,
             field2,
             unnest(headers).key AS thekey,
             unnest(headers).value AS value
      FROM mv_kafka_metadata) AS km
WHERE thekey = 'kvalue';

  key  |  field1  |  field2  |  thekey  |  value
-------+----------+----------+----------+--------
  foo  |  fooval  |   1000   |  kvalue  |  hvalue

Note that:

Partition, offset, timestamp

These metadata fields are exposed via the INCLUDE PARTITION, INCLUDE OFFSET and INCLUDE TIMESTAMP options.

CREATE SOURCE kafka_metadata
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
  INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts
  ENVELOPE NONE;
SELECT "offset" FROM kafka_metadata WHERE ts > '2021-01-01';

offset
------
15
14
13

Note that:

Setting start offsets

To start consuming a Kafka stream from a specific offset, you can use the start_offset option.

CREATE MATERIALIZED SOURCE kafka_offset
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  -- Start reading from the earliest offset in the first partition,
  -- the second partition at 10, and the third partition at 100
  WITH (start_offset=[0,10,100])
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081';

Note that:

Time-based offsets

It’s also possible to set a start offset based on Kafka timestamps, using the kafka_time_offset option. This approach sets the start offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset was provided directly.

It’s important to note that kafka_time_offset is a property of the source: it will be calculated once at the time the CREATE SOURCE statement is issued. This means that the computed start offsets will be the same for all views depending on the source and stable across restarts.

If you need to limit the amount of data maintained as state after source creation, consider using temporal filters instead.

WITH options

Field Value Description
start_offset int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Offset values must be zero or positive integers, and the source must use either ENVELOPE NONE or (DEBEZIUM) UPSERT.
kafka_time_offset int Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.

Authentication

SSL

To connect to a Kafka broker that requires SSL authentication, use the provided WITH options.

CREATE MATERIALIZED SOURCE kafka_ssl
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'top-secret' WITH (
      security_protocol = 'SSL',
      ssl_key_location = '/secrets/materialized.key',
      ssl_certificate_location = '/secrets/materialized.crt',
      ssl_ca_location = '/secrets/ca.crt',
      ssl_key_password = 'mzmzmz'
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081' WITH (
      ssl_key_location = '/secrets/materialized.key',
      ssl_certificate_location = '/secrets/materialized.crt',
      ssl_ca_location = '/secrets/ca.crt'
  );

SSL WITH options

Field Value Description
security_protocol text Use ssl to connect to the Kafka cluster.
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Confluent Schema Registry SSL WITH options

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication. New in v0.17.0.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication. New in v0.17.0.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. New in v0.17.0.
username text The username used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR.
password text The password used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR.

SASL

To connect to a Kafka broker that requires SASL authentication, use the provided WITH options.

SASL/PLAIN

CREATE SOURCE kafka_sasl
  FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'top-secret' WITH (
      security_protocol = 'SASL_SSL',
      sasl_mechanisms = 'PLAIN',
      sasl_username = '<BROKER_USERNAME>',
      sasl_password = '<BROKER_PASSWORD>'
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry.tld' WITH (
      username = '<SCHEMA_REGISTRY_USERNAME>',
      password = '<SCHEMA_REGISTRY_PASSWORD>'
  );

This is the configuration required to connect to Kafka brokers running on Confluent Cloud.

SASL/GSSAPI (Kerberos)

CREATE SOURCE kafka_kerberos
  FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'tps-reports' WITH (
      security_protocol = 'sasl_plaintext',
      sasl_kerberos_keytab = '/secrets/materialized.keytab',
      sasl_kerberos_service_name = 'kafka',
      sasl_kerberos_principal = 'materialized@CI.MATERIALIZE.IO'
  )
  FORMAT AVRO USING SCHEMA FILE '/tps-reports-schema.json';

Note that:

SASL WITH options

Field Value Description
security_protocol text Use plaintext, ssl, sasl_plaintext or sasl_ssl to connect to the Kafka cluster.
sasl_mechanisms text The SASL mechanism to use for authentication. Supported: GSSAPI (the default), PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_username text Your SASL username, if any. Required if sasl_mechanisms is PLAIN.
sasl_password text Your SASL password, if any. Required if sasl_mechanisms is PLAIN.

This option stores the password in Materialize’s on-disk catalog. For an alternative, use sasl_password_env.
sasl_password_env text Use the value stored in the named environment variable as the value for sasl_password.

This option does not store the password on-disk in Materialize’s catalog, but requires the environment variable’s presence to boot Materialize.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Examples

Creating a source

CREATE SOURCE avro_source
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081';
CREATE SOURCE json_source
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT BYTES;
CREATE MATERIALIZED VIEW jsonified_kafka_source AS
  SELECT
    data->>'field1' AS field_1,
    data->>'field2' AS field_2,
    data->>'field3' AS field_3
  FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_source);
CREATE SOURCE proto_source
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'billing'
  WITH (cache = true)
  FORMAT PROTOBUF MESSAGE '.billing.Batch'
  USING SCHEMA FILE '[path to schema]';
CREATE SOURCE text_source
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT TEXT
  USING SCHEMA FILE '/scratch/data.json'
  ENVELOPE UPSERT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  FORMAT CSV WITH 3 COLUMNS;