CREATE SOURCE: Kafka
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
This page describes how to connect Materialize to a Kafka broker to read data from individual topics.
Syntax
format_spec
key_constraint
with_options
Field | Use |
---|---|
MATERIALIZED | Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources. |
src_name | The name for the source, which is used as its table name within SQL. |
col_name | Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source. |
KAFKA BROKER host | The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093 . |
TOPIC topic | The Kafka topic you want to subscribe to. |
INCLUDE KEY | Include a column containing the Kafka message key. If the key is encoded using a format that includes schemas the column will take its name from the schema. For unnamed formats (e.g. TEXT ), the column will be named key . The column can be renamed with the optional AS name statement. |
INCLUDE PARTITION | Include a partition column containing the Kafka message partition. The column can be renamed with the optional AS name clause. New in v0.12.0. |
INCLUDE OFFSET | Include an offset column containing the Kafka message offset. The column can be renamed with the optional AS name clause. New in v0.12.0. |
INCLUDE TIMESTAMP | Include a timestamp column containing the Kafka message timestamp. The column can be renamed with the optional AS name clause. New in v0.12.0. Note that the timestamp of a Kafka message depends on how the topic and its producers are configured. See the Confluent documentation for details. |
PRIMARY KEY ( col_list ) NOT ENFORCED | Declare a set of columns as a primary key. For more information, see Defining primary keys . |
ENVELOPE DEBEZIUM | Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. For more information, see Using Debezium. |
ENVELOPE DEBEZIUM UPSERT | Use the Debezium envelope with UPSERT enabled. This is required for Kafka sources with log compaction, but increases memory consumption. |
ENVELOPE UPSERT | Use the upsert envelope, which uses message keys to handle CRUD operations. For more information, see Handling upserts. |
ENVELOPE NONE | (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. |
WITH ( option_list ) | Options affecting source creation. For more detail, see WITH options. |
WITH
options
Field | Value | Description |
---|---|---|
client_id |
text |
Use the supplied value as the Kafka client identifier. |
group_id_prefix |
text |
Use the specified prefix in the consumer group ID. The resulting group.id looks like <group_id_prefix>materialize-X-Y , where X and Y are values that allow multiple concurrent Kafka consumers from the same topic. |
ignore_source_keys |
boolean |
Default: false . If true , do not perform optimizations assuming uniqueness of primary keys in schemas. |
isolation_level |
text |
Default: read_committed . Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted. |
statistics_interval_ms |
int |
librdkafka statistics emit interval in ms . A value of 0 disables statistics. Statistics can be queried using the mz_kafka_source_statistics system table. Accepts values [0, 86400000]. |
timestamp_frequency_ms |
int |
Default: 1000 . Sets the timestamping frequency in ms . Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput. |
topic_metadata_refresh_interval_ms |
int |
Default: 300000 . Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000]. |
enable_auto_commit |
boolean |
Default: false . Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts. |
fetch_message_max_bytes |
int |
Default: 134217728 . Controls the initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. Accepts values [1, 1000000000]. |
Supported formats
Format |
Append-only envelope | Upsert envelope | Debezium envelope |
---|---|---|---|
Avro | ✓ | ✓ | ✓ |
JSON | ✓ | ✓ | |
Protobuf | ✓ | ✓ | |
Text/bytes | ✓ | ✓ | |
CSV | ✓ |
Key-value encoding
By default, the message key is decoded using the same format as the message value. However, you can set the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT
syntax.
Features
Handling upserts
To create a source that uses the standard key-value convention to support inserts, updates, and deletes within Materialize, you can use ENVELOPE UPSERT
:
CREATE SOURCE current_predictions
FROM KAFKA BROKER 'localhost:9092' TOPIC 'events'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
ENVELOPE UPSERT;
Note that:
- Using this envelope is required to consume log compacted topics.
Defining primary keys
Primary keys are automatically inferred for Kafka sources using the UPSERT
or DEBEZIUM
envelopes. For other source configurations, you can manually define a column (or set of columns) as a primary key using the PRIMARY KEY (...) NOT ENFORCED
syntax. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.
Using Debezium
Materialize provides a dedicated envelope (ENVELOPE DEBEZIUM
) to decode Kafka messages produced by Debezium. To create a source that interprets Debezium messages:
CREATE SOURCE kafka_repl
FROM KAFKA BROKER 'kafka:9092' TOPIC 'pg_repl.public.table1'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
ENVELOPE DEBEZIUM;
Note that:
- If log compaction is enabled for your Debezium topic, you must use
ENVELOPE DEBEZIUM UPSERT
.
Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT
, UPDATE
and DELETE
operations in the original database.
For more details and a step-by-step guide on using Kafka+Debezium for Change Data Capture (CDC), check out Using Debezium.
Exposing source metadata
In addition to the message value, Materialize can expose the message key, headers and other source metadata fields to SQL.
Key
The message key is exposed via the INCLUDE KEY
option. Composite keys are also supported (#7645).
CREATE SOURCE kafka_metadata
FROM 'localhost:9092' TOPIC 'data'
KEY FORMAT TEXT
VALUE FORMAT TEXT
INCLUDE KEY AS renamed_id;
Note that:
-
This option requires specifying the key and value encodings explicitly using the
KEY FORMAT ... VALUE FORMAT
syntax. -
The
UPSERT
envelope always includes keys. -
The
DEBEZIUM
envelope is incompatible with this option.
Headers
Message headers are exposed via the INCLUDE HEADERS
option, and are included as a column (named headers
by default) containing a list
of (text
, bytea
) pairs.
CREATE SOURCE kafka_metadata
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
INCLUDE HEADERS
ENVELOPE NONE;
To retrieve the headers in a message, you can unpack the value:
SELECT key,
field1,
field2,
headers[1].value AS kafka_header
FROM mv_kafka_metadata;
key | field1 | field2 | kafka_header
-------+----------+----------+----------------
foo | fooval | 1000 | hvalue
bar | barval | 5000 | <null>
, or lookup by key:
SELECT key,
field1,
field2,
thekey,
value
FROM (SELECT key,
field1,
field2,
unnest(headers).key AS thekey,
unnest(headers).value AS value
FROM mv_kafka_metadata) AS km
WHERE thekey = 'kvalue';
key | field1 | field2 | thekey | value
-------+----------+----------+----------+--------
foo | fooval | 1000 | kvalue | hvalue
Note that:
- The
DEBEZIUM
envelope is incompatible with this option.
Partition, offset, timestamp
These metadata fields are exposed via the INCLUDE PARTITION
, INCLUDE OFFSET
and INCLUDE TIMESTAMP
options.
CREATE SOURCE kafka_metadata
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081'
INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts
ENVELOPE NONE;
SELECT "offset" FROM kafka_metadata WHERE ts > '2021-01-01';
offset
------
15
14
13
Note that:
- Using the
INCLUDE OFFSET
option with Debezium requiresUPSERT
semantics.
Setting start offsets
To start consuming a Kafka stream from a specific offset, you can use the start_offset
option.
CREATE MATERIALIZED SOURCE kafka_offset
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
-- Start reading from the earliest offset in the first partition,
-- the second partition at 10, and the third partition at 100
WITH (start_offset=[0,10,100])
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081';
Note that:
- If fewer offsets than partitions are provided, the remaining partitions will start at offset 0. This is true if you provide
start_offset=1
orstart_offset=[1, ...]
. - If more offsets than partitions are provided, then any partitions added later will incorrectly be read from that offset. So, if you have a single partition, but you provide
start_offset=[1,2]
, when you add the second partition you will miss the first 2 records of data. - Using an offset with a source envelope that can supply updates or deletes requires that Materialize handle possibly nonsensical events (e.g. an update for a row that was never inserted). For that reason, starting at an offset requires either a
NONE
envelope or a(DEBEZIUM) UPSERT
envelope.
Time-based offsets
It’s also possible to set a start offset based on Kafka timestamps, using the kafka_time_offset
option. This approach sets the start offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset
was provided directly.
It’s important to note that kafka_time_offset
is a property of the source: it will be calculated once at the time the CREATE SOURCE
statement is issued. This means that the computed start offsets will be the same for all views depending on the source and stable across restarts.
If you need to limit the amount of data maintained as state after source creation, consider using temporal filters instead.
WITH
options
Field | Value | Description |
---|---|---|
start_offset |
int |
Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Offset values must be zero or positive integers, and the source must use either ENVELOPE NONE or (DEBEZIUM) UPSERT . |
kafka_time_offset |
int |
Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used. |
Authentication
SSL
To connect to a Kafka broker that requires SSL authentication, use the provided WITH
options.
CREATE MATERIALIZED SOURCE kafka_ssl
FROM KAFKA BROKER 'localhost:9092' TOPIC 'top-secret' WITH (
security_protocol = 'SSL',
ssl_key_location = '/secrets/materialized.key',
ssl_certificate_location = '/secrets/materialized.crt',
ssl_ca_location = '/secrets/ca.crt',
ssl_key_password = 'mzmzmz'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081' WITH (
ssl_key_location = '/secrets/materialized.key',
ssl_certificate_location = '/secrets/materialized.crt',
ssl_ca_location = '/secrets/ca.crt'
);
SSL WITH
options
Field | Value | Description |
---|---|---|
security_protocol |
text |
Use ssl to connect to the Kafka cluster. |
ssl_certificate_location |
text |
The absolute path to your SSL certificate. Required for SSL client authentication. |
ssl_key_location |
text |
The absolute path to your SSL certificate’s key. Required for SSL client authentication. |
ssl_key_password |
text |
Your SSL key’s password, if any. |
ssl_ca_location |
text |
The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates. |
Confluent Schema Registry SSL WITH
options
Field | Value | Description |
---|---|---|
ssl_certificate_location |
text |
The absolute path to your SSL certificate. Required for SSL client authentication. New in v0.17.0. |
ssl_key_location |
text |
The absolute path to your SSL certificate’s key. Required for SSL client authentication. New in v0.17.0. |
ssl_ca_location |
text |
The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. New in v0.17.0. |
username |
text |
The username used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR. |
password |
text |
The password used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR. |
SASL
To connect to a Kafka broker that requires SASL authentication, use the provided WITH
options.
SASL/PLAIN
CREATE SOURCE kafka_sasl
FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'top-secret' WITH (
security_protocol = 'SASL_SSL',
sasl_mechanisms = 'PLAIN',
sasl_username = '<BROKER_USERNAME>',
sasl_password = '<BROKER_PASSWORD>'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://schema-registry.tld' WITH (
username = '<SCHEMA_REGISTRY_USERNAME>',
password = '<SCHEMA_REGISTRY_PASSWORD>'
);
This is the configuration required to connect to Kafka brokers running on Confluent Cloud.
SASL/GSSAPI (Kerberos)
CREATE SOURCE kafka_kerberos
FROM KAFKA BROKER 'broker.tld:9092' TOPIC 'tps-reports' WITH (
security_protocol = 'sasl_plaintext',
sasl_kerberos_keytab = '/secrets/materialized.keytab',
sasl_kerberos_service_name = 'kafka',
sasl_kerberos_principal = 'materialized@CI.MATERIALIZE.IO'
)
FORMAT AVRO USING SCHEMA FILE '/tps-reports-schema.json';
Note that:
- Materialize does not support Kerberos authentication for the Confluent Schema Registry.
SASL WITH
options
Field | Value | Description |
---|---|---|
security_protocol |
text |
Use plaintext , ssl , sasl_plaintext or sasl_ssl to connect to the Kafka cluster. |
sasl_mechanisms |
text |
The SASL mechanism to use for authentication. Supported: GSSAPI (the default), PLAIN , SCRAM-SHA-256 , SCRAM-SHA-512 . |
sasl_username |
text |
Your SASL username, if any. Required if sasl_mechanisms is PLAIN . |
sasl_password |
text |
Your SASL password, if any. Required if sasl_mechanisms is PLAIN .This option stores the password in Materialize’s on-disk catalog. For an alternative, use sasl_password_env . |
sasl_password_env |
text |
Use the value stored in the named environment variable as the value for sasl_password . This option does not store the password on-disk in Materialize’s catalog, but requires the environment variable’s presence to boot Materialize. |
sasl_kerberos_keytab |
text |
The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI . |
sasl_kerberos_kinit_cmd |
text |
Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI . |
sasl_kerberos_min_time_before_relogin |
text |
Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI . |
sasl_kerberos_principal |
text |
Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI . |
sasl_kerberos_service_name |
text |
Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM . Required if sasl_mechanisms is GSSAPI . |
Examples
Creating a source
CREATE SOURCE avro_source
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'https://localhost:8081';
CREATE SOURCE json_source
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT BYTES;
CREATE MATERIALIZED VIEW jsonified_kafka_source AS
SELECT
data->>'field1' AS field_1,
data->>'field2' AS field_2,
data->>'field3' AS field_3
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_source);
CREATE SOURCE proto_source
FROM KAFKA BROKER 'localhost:9092' TOPIC 'billing'
WITH (cache = true)
FORMAT PROTOBUF MESSAGE '.billing.Batch'
USING SCHEMA FILE '[path to schema]';
CREATE SOURCE text_source
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT TEXT
USING SCHEMA FILE '/scratch/data.json'
ENVELOPE UPSERT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
FROM KAFKA BROKER 'localhost:9092' TOPIC 'data'
FORMAT CSV WITH 3 COLUMNS;