Materialize Logo

CREATE SOURCE: Avro over Kafka

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to an Avro-formatted Kafka topic.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , ) FROM KAFKA BROKER host TOPIC topic WITH ( field = val , ) FORMAT format_spec ENVELOPE NONE DEBEZIUM UPSERT UPSERT FORMAT format_spec

format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url WITH ( field = val , ) SCHEMA FILE schema_file_path inline_schema WITH ( inline_schema_with_options )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Materialized source details.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic The Kafka topic you want to subscribe to.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
CONFLUENT SCHEMA REGISTRY url The URL of the Confluent schema registry to get schema information from.
SCHEMA FILE schema_file_path The absolute path to a file containing the schema.
SCHEMA inline_schema A string representing the schema.
ENVELOPE DEBEZIUM Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. This option requires payloads have the appropriate fields, is generally only supported by sources published to Kafka by Debezium, and is incompatible with Kafka compaction. For more information, see Debezium envelope details.
ENVELOPE DEBEZIUM UPSERT Use the Debezium envelope with UPSERT enabled. This is required for Kafka sources with log compaction, but increases memory consumption.
ENVELOPE UPSERT Use the upsert envelope, which uses message keys to handle CRUD operations. For more information see Upsert envelope details.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
client_id text Use the supplied value as the Kafka client identifier.
cache boolean Cache data from this source to local files. Requires experimental mode.
group_id_prefix text Use the specified prefix in the consumer group ID. The resulting group.id looks like <group_id_prefix>materialize-X-Y, where X and Y are values that allow multiple concurrent Kafka consumers from the same topic.
ignore_source_keys boolean Default: false. If true, do not perform optimizations assuming uniqueness of primary keys in schemas.
isolation_level text Default: read_committed. Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
kafka_time_offset int Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.
statistics_interval_ms int librdkafka statistics emit interval in ms. Accepts values [0, 86400000]. The granularity is 1000ms. A value of 0 disables statistics. Statistics can be queried using the mz_kafka_consumer_partitions system table.
start_offset int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Values must be zero or positive integers. See Kafka source details for important warnings for this feature.
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.
topic_metadata_refresh_interval_ms int Default: 30000. Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000].
enable_auto_commit boolean Default: false. Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts.

SSL WITH options

Use the following options to connect Materialize to an SSL-encrypted Kafka cluster. For more detail, see SSL-encrypted Kafka details.

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Kerberos WITH options

Use the following options to connect Materialize to a Kerberized Kafka cluster. For more detail, see Kerberized Kafka details.

Field Value Description
sasl_mechanisms text The SASL mechanism to use for authentication. Currently, the only supported mechanisms are GSSAPI (the default) and PLAIN.
sasl_username text Required if sasl_mechanisms is PLAIN.
sasl_password text Required if sasl_mechanisms is PLAIN.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Inline schema WITH options

Field Value Description
confluent_wire_format boolean Whether to look for the Confluent Schema Registry schema ID within Avro messages.

If you specify a source with an inline schemas you may still have records that are generated by a client that inlines a confluent schema registry ID at the beginning of each record. If confluent_wire_format is false then materialized will not validate that a well-formatted schema-id is present at the beginning of each record.

Details

Materialized source details

Materializing a source keeps data it receives in an in-memory index, the presence of which makes the source directly queryable. In contrast, non-materialized sources cannot process queries directly; to access the data the source receives, you need to create materialized views that SELECT from the source.

For a mental model, materializing the source is approximately equivalent to creating a non-materialized source, and then creating a materialized view from all of the source’s columns:

CREATE SOURCE src ...;
CREATE MATERIALIZED VIEW src_view AS SELECT * FROM src;

The actual implementation of materialized sources differs, though, by letting you refer to the source’s name directly in queries.

For more details about the impact of materializing sources (and implicitly creating an index), see CREATE INDEX: Details — Memory footprint.

Kafka source details

Partition offsets

The start_offset option comes with some quirks to be aware of:

The kafka_time_offset option sets start_offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset was provided directly.

Cached Kafka sources

To avoid re-reading data from Kafka on restart, Materialize lets you create cached sources, which cache input messages from Kafka topics to files on the Materialize instance’s local hard drive. For more details on source caching, see Deployment: Source Caching.

Example

SSL-encrypted Kafka details

Enable connections to SSL-encrypted Kafka clusters using the appropriate WITH options.

Kerberized Kafka details

Enable connections to Kerberized Kafka clusters using the appropriate WITH options.

Avro format details

Avro-formatted external sources require you providing the schema in one of three ways:

Debezium envelope details

The Debezium envelope describes the decoded records’ old and new values; this is roughly equivalent to the notion of Change Data Capture (CDC).

To use the Debezium envelope with Materialize, you must configure Debezium with your database.

WARNING! Debezium can produce duplicate records if the connector is interrupted. Materialize makes a best-effort attempt to detect and filter out duplicates generated by the MySQL and PostgreSQL connectors. It does not yet attempt to detect duplicates generated by other Debezium connectors.

The Debezium envelope is most easily supported by sources published to Kafka by Debezium.

Format implications

Using the Debezium envelopes changes the schema of your Avro-encoded Kafka topics to include something akin to the following field:

{
    "type": "record",
    "name": "envelope",
    "fields": [
        {
        "name": "before",
        "type": [
            {
            "name": "row",
            "type": "record",
            "fields": [
                {"name": "a", "type": "long"},
                {"name": "b", "type": "long"}
            ]
            },
            "null"
        ]
        },
        { "name": "after", "type": ["row", "null"] }
    ]
}

Note that:

Kafka topic requirements

ENVELOPE DEBEZIUM by itself is incompatible with Kafka’s log compaction. You must specify ENVELOPE DEBEZIUM UPSERT if you enable compaction of a topic carrying Debezium data. The DEBEZIUM UPSERT envelope uses memory proportional to the size of the upstream database table.

Upsert envelope details

Specifying ENVELOPE UPSERT creates a source that supports Kafka’s standard key-value convention, and supports inserts, updates, and deletes within Materialize. The source is also compatible with Kafka’s log-compaction feature, and can be useful for users who want to compact CDC sources.

The upsert envelope format has slower data ingestion and is more memory intensive than other formats. To correctly handle data sources that do not publish their own retractions the upsert format must maintain state proportional to the number of unique rows in the source, and it must perform extra work to generate the implied retractions based on that state.

Inserts, updates, deletes

When Materialize receives a message, it checks the message’s key and offset.

Key columns

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Examples

Using a Confluent schema registry

CREATE SOURCE events
FROM KAFKA BROKER 'localhost:9092' TOPIC 'events'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081';

This creates a source that…

Inlining the Avro schema

CREATE SOURCE user
FROM KAFKA BROKER 'localhost:9092' TOPIC 'user'
FORMAT AVRO USING SCHEMA '{
  "type": "record",
  "name": "envelope",
  "fields": [
    ...
  ],
}'
ENVELOPE DEBEZIUM;

This creates a source that…

Upsert on a Kafka topic with string keys and Avro values

CREATE SOURCE current_predictions
FROM KAFKA BROKER 'localhost:9092' TOPIC 'current_predictions'
FORMAT AVRO USING SCHEMA FILE '/scratch/current_predictions.json'
ENVELOPE UPSERT;

This creates a source that…

Connecting to a Kafka broker using SSL authentication

CREATE MATERIALIZED SOURCE data_v1
FROM KAFKA BROKER 'localhost:9092' TOPIC 'top-secret' WITH (
    security_protocol = 'SSL',
    ssl_key_location = '/secrets/materialized.key',
    ssl_certificate_location = '/secrets/materialized.crt',
    ssl_ca_location = '/secrets/ca.crt',
    ssl_key_password = 'mzmzmz'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081';

This creates a source that…

Connecting to a Kafka broker using SASL authentication

CREATE MATERIALIZED SOURCE data_v1
FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'top-secret' WITH (
    security_protocol = 'SASL_SSL',
    sasl_mechanisms = 'PLAIN',
    sasl_username = '<BROKER_USERNAME>',
    sasl_password = '<BROKER_PASSWORD>',
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry.tld' WITH (
    username = '<SCHEMA_REGISTRY_USERNAME>',
    password = '<SCHEMA_REGISTRY_PASSWORD>'
);

This creates a source that…

If you are connecting to a Kafka cluster on Confluent Cloud, this is the example to follow.

Connecting to a Kafka broker using Kerberos

CREATE MATERIALIZED SOURCE data_v1
FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'tps-reports' WITH (
    security_protocol = 'sasl_plaintext',
    sasl_kerberos_keytab = '/secrets/materialized.keytab',
    sasl_kerberos_service_name = 'kafka',
    sasl_kerberos_principal = 'materialized@CI.MATERIALIZE.IO'
)
FORMAT AVRO USING SCHEMA FILE '/tps-reports-schema.json'

This creates a source that…

Caching records to local disk

CREATE MATERIALIZED SOURCE cached_source
FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'data' WITH (
    cache = true
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry.tld'

This creates a source that…

Setting partition offsets

CREATE MATERIALIZED SOURCE data_offset
  FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
  WITH (start_offset=[0,10,100])
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry.tld';

This creates a source that…

It is possible to set start_offset based on Kafka timestamps using the kafka_time_offset option.

Did this info help?
Yes No