CREATE SOURCE: Kafka
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
To connect to a Kafka broker (and optionally a schema registry), you first need to create a connection that specifies access and authentication parameters. Once created, a connection is reusable across multiple CREATE SOURCE
and CREATE SINK
statements.
Syntax
format_spec
key_strat
val_strat
strat
with_options
Field | Use |
---|---|
src_name | The name for the source, which is used as its table name within SQL. |
col_name | Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source. |
IN CLUSTER cluster_name | The cluster to maintain this source. If not specified, the SIZE option must be specified. |
CONNECTION connection_name | The name of the Kafka connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page. |
INCLUDE KEY | Include a column containing the Kafka message key. If the key is encoded using a format that includes schemas the column will take its name from the schema. For unnamed formats (e.g. TEXT ), the column will be named key . The column can be renamed with the optional AS name statement. |
INCLUDE PARTITION | Include a partition column containing the Kafka message partition. The column can be renamed with the optional AS name clause. |
INCLUDE OFFSET | Include an offset column containing the Kafka message offset. The column can be renamed with the optional AS name clause. |
INCLUDE TIMESTAMP | Include a timestamp column containing the Kafka message timestamp. The column can be renamed with the optional AS name clause. Note that the timestamp of a Kafka message depends on how the topic and its producers are configured. See the Confluent documentation for details. |
ENVELOPE DEBEZIUM | Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. For more information, see Using Debezium. |
ENVELOPE UPSERT | Use the upsert envelope, which uses message keys to handle CRUD operations. For more information, see Handling upserts. |
ENVELOPE NONE | (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. |
EXPOSE PROGRESS AS progress_subsource_name | The name of the progress subsource for the source. If this is not specified, the subsource will be named <src_name>_progress . For more information, see Monitoring source progress. |
WITH ( option_list ) | Options affecting source creation. For more information, see WITH options. |
CONNECTION
options
Field | Value | Description |
---|---|---|
TOPIC |
text |
The Kafka topic you want to subscribe to. |
WITH
options
Field | Value | Description |
---|---|---|
SIZE |
text |
The size for the source. Accepts values: 3xsmall , 2xsmall , xsmall , small , medium , large , xlarge . Required if the IN CLUSTER option is not specified. |
Supported formats
Format |
Append-only envelope | Upsert envelope | Debezium envelope |
---|---|---|---|
Avro | ✓ | ✓ | ✓ |
JSON | ✓ | ✓ | |
Protobuf | ✓ | ✓ | |
Text/bytes | ✓ | ✓ | |
CSV | ✓ |
Key-value encoding
By default, the message key is decoded using the same format as the message value. However, you can set the key and value encodings explicitly using the KEY FORMAT ... VALUE FORMAT
syntax.
Features
Handling upserts
To create a source that uses the standard key-value convention to support inserts, updates, and deletes within Materialize, you can use ENVELOPE UPSERT
:
CREATE SOURCE kafka_upsert
FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT
WITH (SIZE = '3xsmall');
Note that:
- Using this envelope is required to consume log compacted topics.
- This envelope can lead to high memory utilization in the cluster maintaining the source. To reduce memory utilization, consider enabling spill to disk.
Null keys
If a message with a NULL
key is detected, Materialize sets the source into an
error state. To recover an errored source, you must produce a record with a
NULL
value and a NULL
key to the topic, to force a retraction.
As an example, you can use kcat
to produce an empty message:
echo ":" | kcat -b $BROKER -t $TOPIC -Z -K: \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-256 \
-X sasl.username=$KAFKA_USERNAME \
-X sasl.password=$KAFKA_PASSWORD
Using Debezium
Materialize provides a dedicated envelope (ENVELOPE DEBEZIUM
) to decode Kafka messages produced by Debezium. To create a source that interprets Debezium messages:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'pg_repl.public.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT
, UPDATE
and DELETE
operations in the original database.
For more details and a step-by-step guide on using Kafka+Debezium for Change Data Capture (CDC), check Using Debezium.
Note that:
- This envelope can lead to high memory utilization in the cluster maintaining the source. To reduce memory utilization, consider enabling spill to disk.
Spilling to disk
Kafka sources that use ENVELOPE UPSERT
or ENVELOPE DEBEZIUM
require storing
the current value for each key in the source to produce retractions when
keys are updated. Depending on the size of the key space, this can lead to high
memory utilization in the cluster maintaining the source.
To avoid sizing up, you can attach disk storage to the cluster maintaining these sources. This allows Materialize to process data sets larger than memory by automatically offloading state to disk (aka spilling to disk).
Enabling disk storage
To create a new cluster with spill to disk enabled, use the DISK
option:
CREATE CLUSTER cluster_with_disk,
SIZE = '<size>',
DISK = true;
Alternatively, you can attach disk storage to an existing cluster using the ALTER CLUSTER
command:
ALTER CLUSTER cluster_with_disk SET (DISK = true);
Once a cluster is configured to spill to disk, any Kafka source being maintained
in that cluster that uses ENVELOPE UPSERT
or ENVELOPE DEBEZIUM
will
automatically benefit from this feature:
CREATE SOURCE kafka_repl
IN CLUSTER cluster_with_disk
FROM KAFKA CONNECTION kafka_connection (TOPIC 'pg_repl.public.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
Exposing source metadata
In addition to the message value, Materialize can expose the message key, headers and other source metadata fields to SQL.
Key
The message key is exposed via the INCLUDE KEY
option. Composite keys are also supported (#7645).
CREATE SOURCE kafka_metadata
FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
KEY FORMAT TEXT
VALUE FORMAT TEXT
INCLUDE KEY AS renamed_id
WITH (SIZE = '3xsmall');
Note that:
-
This option requires specifying the key and value encodings explicitly using the
KEY FORMAT ... VALUE FORMAT
syntax. -
The
UPSERT
envelope always includes keys. -
The
DEBEZIUM
envelope is incompatible with this option.
Headers
Message headers can be exposed via the INCLUDE HEADERS
option. They are included
as a column (named headers
by default) containing a list
of records of type (key text, value bytea)
.
The following example demonstrates use of the INCLUDE HEADERS
option.
CREATE SOURCE kafka_metadata
FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
INCLUDE HEADERS
ENVELOPE NONE
WITH (SIZE = '3xsmall');
To retrieve the value of an individual header in a message, you can use standard
SQL techniques for working with list
and
bytea
types. The following example parses the UTF-8
encoded client_id
header of the messages from the Kafka topic. Messages
without a client_id
header result in null values ("\N"
) for the parsed
attribute.
SELECT
id,
seller,
item,
(
SELECT convert_from((h).value, 'utf8') AS client_id
FROM unnest(headers) AS h
WHERE (h).key = 'client_id'
)
FROM kafka_metadata;
id | seller | item | client_id
----+--------+--------------------+-----------
2 | 1592 | Custom Art | 23
7 | 1509 | Custom Art | 42
3 | 1411 | City Bar Crawl | "\N"
Note that:
- The
DEBEZIUM
envelope is incompatible with this option.
Partition, offset, timestamp
These metadata fields are exposed via the INCLUDE PARTITION
, INCLUDE OFFSET
and INCLUDE TIMESTAMP
options.
CREATE SOURCE kafka_metadata
FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts
ENVELOPE NONE
WITH (SIZE = '3xsmall');
SELECT "offset" FROM kafka_metadata WHERE ts > '2021-01-01';
offset
------
15
14
13
Setting start offsets
To start consuming a Kafka stream from a specific offset, you can use the START OFFSET
option.
CREATE SOURCE kafka_offset
FROM KAFKA CONNECTION kafka_connection (
TOPIC 'data',
-- Start reading from the earliest offset in the first partition,
-- the second partition at 10, and the third partition at 100.
START OFFSET (0, 10, 100)
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
WITH (SIZE = '3xsmall');
Note that:
- If fewer offsets than partitions are provided, the remaining partitions will start at offset 0. This is true if you provide
START OFFSET (1)
orSTART OFFSET (1, ...)
. - If more offsets than partitions are provided, then any partitions added later will incorrectly be read from that offset. So, if you have a single partition, but you provide
START OFFSET (1, 2)
, when you add the second partition you will miss the first 2 records of data.
Time-based offsets
It’s also possible to set a start offset based on Kafka timestamps, using the START TIMESTAMP
option. This approach sets the start offset for each available partition based on the Kafka timestamp and the source behaves as if START OFFSET
was provided directly.
It’s important to note that START TIMESTAMP
is a property of the source: it will be calculated once at the time the CREATE SOURCE
statement is issued. This means that the computed start offsets will be the same for all views depending on the source and stable across restarts.
If you need to limit the amount of data maintained as state after source creation, consider using temporal filters instead.
CONNECTION
options
Field | Value | Description |
---|---|---|
START OFFSET |
int |
Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Offset values must be zero or positive integers. |
START TIMESTAMP |
int |
Use the specified value to set START OFFSET based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used. |
KEY STRATEGY
and VALUE STRATEGY
It is possible to define how an Avro reader schema will be chosen for Avro sources by
using the KEY STRATEGY
and VALUE STRATEGY
keywords, as shown in the syntax diagram.
A strategy of LATEST
(the default) will choose the latest writer schema from the schema registry to use as a reader schema. ID
or INLINE
will allow specifying a schema from the registry by ID or inline in the CREATE SOURCE
statement, respectively.
Monitoring source progress
By default, Kafka sources expose progress metadata as a subsource that you can
use to monitor source ingestion progress. The name of the progress
subsource can be specified when creating a source using the EXPOSE PROGRESS AS
clause; otherwise, it will be named <src_name>_progress
.
The following metadata is available for each source as a progress subsource:
Field | Type | Meaning |
---|---|---|
partition |
numrange |
The upstream Kafka partition. |
offset |
uint8 |
The greatest offset consumed from each upstream Kafka partition. |
And can be queried using:
SELECT
partition, "offset"
FROM
(
SELECT
-- Take the upper of the range, which is null for non-partition rows
-- Cast partition to u64, which is more ergonomic
upper(partition)::uint8 AS partition, "offset"
FROM
<src_name>_progress
)
WHERE
-- Remove all non-partition rows
partition IS NOT NULL;
As long as any offset continues increasing, Materialize is consuming data from the upstream Kafka broker. For more details on monitoring source ingestion progress and debugging related issues, see Troubleshooting.
Monitoring consumer lag
To support Kafka tools that monitor consumer lag, Kafka sources commit offsets once the messages up through that offset have been durably recorded in Materialize’s storage layer.
However, rather than relying on committed offsets, Materialize suggests using our native progress monitoring, which contains more up-to-date information.
Some Kafka monitoring tools may indicate that Materialize’s consumer groups have no active members. This is not a cause for concern.
Materialize does not participate in the consumer group protocol nor does it recover on restart by reading the committed offsets. The committed offsets are provided solely for the benefit of Kafka monitoring tools.
Committed offsets are associated with a consumer group specific to the source. The ID of the consumer group has a prefix with the following format:
materialize-{REGION-ID}-{CONNECTION-ID}-{SOURCE_ID}
You should not make assumptions about the number of consumer groups that Materialize will use to consume from a given source. The only guarantee is that the ID of each consumer group will begin with the above prefix.
The rendered consumer group ID prefix for each Kafka source in the system is
available in the group_id_base
column of the mz_kafka_sources
table. To
look up the group_id_base
for a source by name, use:
SELECT group_id_base
FROM mz_internal.mz_kafka_sources ks
JOIN mz_sources s ON s.id = ks.id
WHERE s.name = '<src_name>'
Required permissions
The access control lists (ACLs) on the Kafka cluster must allow Materialize to perform the following operations on the following resources:
Operation type | Resource type | Resource name |
---|---|---|
Read | Topic | The specified TOPIC option |
To allow Materialize to commit offsets to the Kafka broker, Materialize additionally requires access to the following operations:
Operation type | Resource type | Resource name |
---|---|---|
Read | Group | materialize-{REGION-ID}-{CONNECTION-ID}-{SOURCE_ID}* |
Examples
Creating a connection
A connection describes how to connect and authenticate to an external system you want Materialize to read data from.
Once created, a connection is reusable across multiple CREATE SOURCE
statements. For more details on creating connections, check the CREATE CONNECTION
documentation page.
Broker
CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';
CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER 'rp-f00000bar.data.vectorized.cloud:30365',
SSL KEY = SECRET kafka_ssl_key,
SSL CERTIFICATE = SECRET kafka_ssl_crt
);
CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
SASL MECHANISMS = 'SCRAM-SHA-256',
SASL USERNAME = 'foo',
SASL PASSWORD = SECRET kafka_password
);
If your Kafka broker is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host:
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE CONNECTION kafka_connection TO KAFKA (
BROKERS (
'broker1:9092' USING AWS PRIVATELINK privatelink_svc,
'broker2:9092' USING AWS PRIVATELINK privatelink_svc (PORT 9093)
)
);
For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST '<SSH_BASTION_HOST>',
USER '<SSH_BASTION_USER>',
PORT <SSH_BASTION_PORT>
);
CREATE CONNECTION kafka_connection TO KAFKA (
BROKERS (
'broker1:9092' USING SSH TUNNEL ssh_connection,
'broker2:9092' USING SSH TUNNEL ssh_connection
)
);
For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.
Confluent Schema Registry
CREATE SECRET csr_ssl_crt AS '<CSR_SSL_CRT>';
CREATE SECRET csr_ssl_key AS '<CSR_SSL_KEY>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL 'https://rp-f00000bar.data.vectorized.cloud:30993',
SSL KEY = SECRET csr_ssl_key,
SSL CERTIFICATE = SECRET csr_ssl_crt,
USERNAME = 'foo',
PASSWORD = SECRET csr_password
);
CREATE SECRET IF NOT EXISTS csr_username AS '<CSR_USERNAME>';
CREATE SECRET IF NOT EXISTS csr_password AS '<CSR_PASSWORD>';
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL '<CONFLUENT_REGISTRY_URL>',
USERNAME = SECRET csr_username,
PASSWORD = SECRET csr_password
);
If your Confluent Schema Registry server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host:
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL 'http://my-confluent-schema-registry:8081',
AWS PRIVATELINK privatelink_svc
);
For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST '<SSH_BASTION_HOST>',
USER '<SSH_BASTION_USER>',
PORT <SSH_BASTION_PORT>
);
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL 'http://my-confluent-schema-registry:8081',
SSH TUNNEL ssh_connection
);
For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.
Creating a source
Using Confluent Schema Registry
CREATE SOURCE avro_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
WITH (SIZE = '3xsmall');
CREATE SOURCE json_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT JSON
WITH (SIZE = '3xsmall');
CREATE MATERIALIZED VIEW typed_kafka_source AS
SELECT
(data->>'field1')::boolean AS field_1,
(data->>'field2')::int AS field_2,
(data->>'field3')::float AS field_3
FROM json_source;
Using Confluent Schema Registry
CREATE SOURCE proto_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
WITH (SIZE = '3xsmall');
Using an inline schema
If you’re not using a schema registry, you can use the MESSAGE...SCHEMA
clause to specify a Protobuf schema descriptor inline. Protobuf does not serialize a schema with the message, so before creating a source you must:
-
Compile the Protobuf schema into a descriptor file using
protoc
:// example.proto syntax = "proto3"; message Batch { int32 id = 1; // ... }
protoc --include_imports --descriptor_set_out=example.pb example.proto
-
Encode the descriptor file into a SQL byte string:
$ printf '\\x' && xxd -p example.pb | tr -d '\n' \x0a300a0d62696...
-
Create the source using the encoded descriptor bytes from the previous step (including the
\x
at the beginning):CREATE SOURCE proto_source FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic') FORMAT PROTOBUF MESSAGE 'Batch' USING SCHEMA '\x0a300a0d62696...' WITH (SIZE = '3xsmall');
For more details about Protobuf message names and descriptors, check the Protobuf format documentation.
CREATE SOURCE text_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT TEXT
ENVELOPE UPSERT
WITH (SIZE = '3xsmall');
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT CSV WITH 3 COLUMNS
WITH (SIZE = '3xsmall');
Sizing a source
To provision a specific amount of CPU and memory to a source on creation, use the SIZE
option:
CREATE SOURCE avro_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
WITH (SIZE = '3xsmall');
To resize the source after creation:
ALTER SOURCE avro_source SET (SIZE = 'large');
The smallest source size (3xsmall
) is a resonable default to get started. For more details on sizing sources, check the CREATE SOURCE
documentation page.