Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: JSON over Kafka

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to JSON-formatted Kafka topics.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , ) FROM KAFKA BROKER host TOPIC topic with_options KEY FORMAT format_spec VALUE FORMAT FORMAT TEXT BYTES format_spec INCLUDE KEY AS key_name ENVELOPE NONE UPSERT

format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE schema_file_path PROTOBUF MESSAGE message_name USING SCHEMA FILE schema_file_path USING CONFLUENT SCHEMA REGISTRY url with_options REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic The Kafka topic you want to subscribe to.
INCLUDE KEY Make the key portion of Kafka events available to Materialize SQL. If the key is encoded using a format that includes schemas it will take its name from the schema, for unnamed formats (e.g. TEXT) it will be named key. The key can be renamed with the optional AS name statement.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
FORMAT BYTES Leave data received from the source as unformatted bytes, and store them in a column named data. However, you can easily transform the data to JSON. For more details, see Byte format details (JSON).
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
ENVELOPE UPSERT Use the upsert envelope, which uses message keys to handle CRUD operations. For more information see Upsert envelope details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
client_id text Use the supplied value as the Kafka client identifier.
group_id_prefix text Use the specified prefix in the consumer group ID. The resulting group.id looks like <group_id_prefix>materialize-X-Y, where X and Y are values that allow multiple concurrent Kafka consumers from the same topic.
ignore_source_keys boolean Default: false. If true, do not perform optimizations assuming uniqueness of primary keys in schemas.
isolation_level text Default: read_committed. Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
kafka_time_offset int Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.
statistics_interval_ms int librdkafka statistics emit interval in ms. Accepts values [0, 86400000]. A value of 0 disables statistics. Statistics can be queried using the mz_kafka_consumer_partitions system table.
start_offset int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Values must be zero or positive integers. See Kafka source details for important warnings for this feature.
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.
topic_metadata_refresh_interval_ms int Default: 30000. Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000].
enable_auto_commit boolean Default: false. Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts.

SSL WITH options

Use the following options to connect Materialize to an SSL-encrypted Kafka cluster. For more detail, see SSL-encrypted Kafka details.

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Kerberos WITH options

Use the following options to connect Materialize to a Kerberized Kafka cluster. For more detail, see Kerberized Kafka details.

Field Value Description
sasl_mechanisms text The SASL mechanism to use for authentication. Currently, the only supported mechanisms are GSSAPI (the default) and PLAIN.
sasl_username text Your SASL username, if any. Required if sasl_mechanisms is PLAIN.
sasl_password text Your SASL password, if any. Required if sasl_mechanisms is PLAIN.

This option stores the password in Materialize’s on-disk catalog. For an alternative, use sasl_password_env.
sasl_password_env text Use the value stored in the named environment variable as the value for sasl_password.

This option does not store the password on-disk in Materialize’s catalog, but requires the environment variable’s presence to boot Materialize.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Inline schema WITH options

Field Value Description
confluent_wire_format boolean Whether to look for the Confluent Schema Registry schema ID within Avro messages.

If you specify a source with an inline schemas you may still have records that are generated by a client that inlines a confluent schema registry ID at the beginning of each record. If confluent_wire_format is false then materialized will not validate that a well-formatted schema-id is present at the beginning of each record.

Details

Kafka source details

Partition offsets

The start_offset option comes with some quirks to be aware of:

The kafka_time_offset option sets start_offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset was provided directly.

SSL-encrypted Kafka details

Enable connections to SSL-encrypted Kafka clusters using the appropriate WITH options.

Kerberized Kafka details

Enable connections to Kerberized Kafka clusters using the appropriate WITH options.

Raw byte format details

Raw byte-formatted sources provide Materialize the raw bytes received from the source without applying any formatting or decoding.

Raw byte-formatted sources have one column, which, by default, is named data.

Extracting JSON data from bytes

Materialize cannot receive JSON data directly from a source. Instead, you must create a source that stores the data it receives as raw bytes (FORMAT BYTES), and then construct views that provides access to your JSON data by casting the source’s bytea column (named data) to text, and then to jsonb.

CREATE MATERIALIZED VIEW jsonified_bytes AS
SELECT CAST(data AS JSONB) AS data
FROM (
    SELECT CONVERT_FROM(data, 'utf8') AS data
    FROM bytea_source
)

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Upsert envelope details

Specifying ENVELOPE UPSERT creates a source that supports Kafka’s standard key-value convention, and supports inserts, updates, and deletes within Materialize. The source is also compatible with Kafka’s log-compaction feature, and can be useful for users who want to compact CDC sources.

The upsert envelope format has slower data ingestion and is more memory intensive than other formats. To correctly handle data sources that do not publish their own retractions the upsert format must maintain state proportional to the number of unique rows in the source, and it must perform extra work to generate the implied retractions based on that state.

Inserts, updates, deletes

When Materialize receives a message, it checks the message’s key and offset.

Key columns

Examples

CREATE SOURCE json_kafka
FROM KAFKA BROKER 'localhost:9092' TOPIC 'json'
FORMAT BYTES;

This creates a source that…

To use this data in views, you can decode its bytes into jsonb. For example:

CREATE MATERIALIZED VIEW jsonified_kafka_source AS
  SELECT CAST(data AS jsonb) AS data
  FROM (
      SELECT convert_from(data, 'utf8') AS data
      FROM json_kafka
  );

Setting partition offsets

CREATE MATERIALIZED SOURCE data_offset
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  WITH (start_offset=[0,10,100])
  FORMAT BYTES;

This creates a source that…

It is possible to set start_offset based on Kafka timestamps using the kafka_time_offset option.

Did this info help?
Yes No