Materialize Logo

CREATE SOURCE: Text or bytes over Kafka

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to a text- or byte–formatted Kafka topic.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.


Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Materialized source details.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic The Kafka topic you want to subscribe to.
INCLUDE KEY Make the key portion of Kafka events available to Materialize SQL. If the key is encoded using a format that includes schemas it will take its name from the schema, for unnamed formats (e.g. TEXT) it will be named key. The key can be renamed with the optional AS name statement.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
TEXT Format the source’s data as ASCII-encoded text.
FORMAT BYTES Leave data received from the source as unformatted bytes stored in a column named data.
ENVELOPE UPSERT Use the upsert envelope, which uses message keys to handle CRUD operations. For more information see Upsert envelope details.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
client_id text Use the supplied value as the Kafka client identifier.
group_id_prefix text Use the specified prefix in the consumer group ID. The resulting looks like <group_id_prefix>materialize-X-Y, where X and Y are values that allow multiple concurrent Kafka consumers from the same topic.
ignore_source_keys boolean Default: false. If true, do not perform optimizations assuming uniqueness of primary keys in schemas.
isolation_level text Default: read_committed. Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
kafka_time_offset int Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.
statistics_interval_ms int librdkafka statistics emit interval in ms. Accepts values [0, 86400000]. A value of 0 disables statistics. Statistics can be queried using the mz_kafka_consumer_partitions system table.
start_offset int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Values must be zero or positive integers. See Kafka source details for important warnings for this feature.
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.
topic_metadata_refresh_interval_ms int Default: 30000. Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000].
enable_auto_commit boolean Default: false. Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts.

SSL WITH options

Use the following options to connect Materialize to an SSL-encrypted Kafka cluster. For more detail, see SSL-encrypted Kafka details.

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Kerberos WITH options

Use the following options to connect Materialize to a Kerberized Kafka cluster. For more detail, see Kerberized Kafka details.

Field Value Description
sasl_mechanisms text The SASL mechanism to use for authentication. Currently, the only supported mechanisms are GSSAPI (the default) and PLAIN.
sasl_username text Required if sasl_mechanisms is PLAIN.
sasl_password text Required if sasl_mechanisms is PLAIN.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Inline schema WITH options

Field Value Description
confluent_wire_format boolean Whether to look for the Confluent Schema Registry schema ID within Avro messages.

If you specify a source with an inline schemas you may still have records that are generated by a client that inlines a confluent schema registry ID at the beginning of each record. If confluent_wire_format is false then materialized will not validate that a well-formatted schema-id is present at the beginning of each record.


Materialized source details

Materializing a source keeps data it receives in an in-memory index, the presence of which makes the source directly queryable. In contrast, non-materialized sources cannot process queries directly; to access the data the source receives, you need to create materialized views that SELECT from the source.

For a mental model, materializing the source is approximately equivalent to creating a non-materialized source, and then creating a materialized view from all of the source’s columns:


The actual implementation of materialized sources differs, though, by letting you refer to the source’s name directly in queries.

For more details about the impact of materializing sources (and implicitly creating an index), see CREATE INDEX: Details — Memory footprint.

Kafka source details

Partition offsets

The start_offset option comes with some quirks to be aware of:

The kafka_time_offset option sets start_offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset was provided directly.

SSL-encrypted Kafka details

Enable connections to SSL-encrypted Kafka clusters using the appropriate WITH options.

Kerberized Kafka details

Enable connections to Kerberized Kafka clusters using the appropriate WITH options.

Text format details

Text-formatted sources reads lines from a file.

Raw byte format details

Raw byte-formatted sources provide Materialize the raw bytes received from the source without applying any formatting or decoding.

Raw byte-formatted sources have one column, which, by default, is named data.

Upsert envelope details

Specifying ENVELOPE UPSERT creates a source that supports Kafka’s standard key-value convention, and supports inserts, updates, and deletes within Materialize. The source is also compatible with Kafka’s log-compaction feature, and can be useful for users who want to compact CDC sources.

The upsert envelope format has slower data ingestion and is more memory intensive than other formats. To correctly handle data sources that do not publish their own retractions the upsert format must maintain state proportional to the number of unique rows in the source, and it must perform extra work to generate the implied retractions based on that state.

Inserts, updates, deletes

When Materialize receives a message, it checks the message’s key and offset.

Key columns

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.


Upsert on a Kafka topic

CREATE SOURCE current_predictions
FROM KAFKA BROKER 'localhost:9092' TOPIC 'kv_feed'
USING SCHEMA FILE '/scratch/kv_feed.json'

This creates a source that…

Setting partition offsets

  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  WITH (start_offset=[0,10,100])

This creates a source that…

It is also possible to set start_offset based on Kafka timestamps using the kafka_time_offset option.

Did this info help?
Yes No