Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: CSV over Kafka

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to a CSV-formatted Kafka topic.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM KAFKA BROKER host TOPIC topic with_options KEY FORMAT format_spec VALUE FORMAT FORMAT CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char format_spec INCLUDE KEY AS key_name ENVELOPE NONE UPSERT

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED

format_spec

AVRO USING CONFLUENT SCHEMA REGISTRY url with_options SCHEMA FILE schema_file_path PROTOBUF MESSAGE message_name USING SCHEMA FILE schema_file_path USING CONFLUENT SCHEMA REGISTRY url with_options REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KAFKA BROKER host The Kafka broker’s host name without the security protocol, which is specified by the WITH options.) If you wish to specify multiple brokers (bootstrap servers) as an additional safeguard, use a comma-separated list. For example: localhost:9092, localhost:9093.
TOPIC topic The Kafka topic you want to subscribe to.
INCLUDE KEY Make the key portion of Kafka events available to Materialize SQL. If the key is encoded using a format that includes schemas it will take its name from the schema, for unnamed formats (e.g. TEXT) it will be named key. The key can be renamed with the optional AS name statement.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
HEADER Treat the first line of the CSV file as a header. See CSV format details.
HEADER (name_list) Treat the first line of the CSV file as a header, validate that the columns in the file match the provided column list. See CSV format details.
n COLUMNS Format the source’s data as a CSV with n columns. See CSV format details.
DELIMITED BY char Delimit the CSV by char. ASCII comma by default (','). This must be an ASCII character; other Unicode code points are not supported.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Key constraint details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
client_id text Use the supplied value as the Kafka client identifier.
group_id_prefix text Use the specified prefix in the consumer group ID. The resulting group.id looks like <group_id_prefix>materialize-X-Y, where X and Y are values that allow multiple concurrent Kafka consumers from the same topic.
ignore_source_keys boolean Default: false. If true, do not perform optimizations assuming uniqueness of primary keys in schemas.
isolation_level text Default: read_committed. Controls how to read messages that were transactionally written to Kafka. Supported options are read_committed to read only committed messages and read_uncommitted to read all messages, including those that are part of an open transaction or were aborted.
security_protocol text Use ssl or, for Kerberos, sasl_plaintext, sasl-scram-sha-256, or sasl-sha-512 to connect to the Kafka cluster.
kafka_time_offset int Use the specified value to set start_offset based on the Kafka timestamp. Negative values will be interpreted as relative to the current system time in milliseconds (e.g. -1000 means 1000 ms ago). The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If no such offset exists for a partition, the partition’s end offset will be used.
statistics_interval_ms int librdkafka statistics emit interval in ms. Accepts values [0, 86400000]. A value of 0 disables statistics. Statistics can be queried using the mz_kafka_consumer_partitions system table.
start_offset int Read partitions from the specified offset. You cannot update the offsets once a source has been created; you will need to recreate the source. Values must be zero or positive integers. See Kafka source details for important warnings for this feature.
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.
topic_metadata_refresh_interval_ms int Default: 30000. Sets the frequency in ms at which the system checks for new partitions. Accepts values [0,3600000].
enable_auto_commit boolean Default: false. Controls whether or not Materialize commits read offsets back into Kafka. This is purely for consumer progress monitoring and does not cause Materialize to resume reading from where it left off across restarts.

SSL WITH options

Use the following options to connect Materialize to an SSL-encrypted Kafka cluster. For more detail, see SSL-encrypted Kafka details.

Field Value Description
ssl_certificate_location text The absolute path to your SSL certificate. Required for SSL client authentication.
ssl_key_location text The absolute path to your SSL certificate’s key. Required for SSL client authentication.
ssl_key_password text Your SSL key’s password, if any.
ssl_ca_location text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Kerberos WITH options

Use the following options to connect Materialize to a Kerberized Kafka cluster. For more detail, see Kerberized Kafka details.

Field Value Description
sasl_mechanisms text The SASL mechanism to use for authentication. Currently, the only supported mechanisms are GSSAPI (the default) and PLAIN.
sasl_username text Your SASL username, if any. Required if sasl_mechanisms is PLAIN.
sasl_password text Your SASL password, if any. Required if sasl_mechanisms is PLAIN.

This option stores the password in Materialize’s on-disk catalog. For an alternative, use sasl_password_env.
sasl_password_env text Use the value stored in the named environment variable as the value for sasl_password.

This option does not store the password on-disk in Materialize’s catalog, but requires the environment variable’s presence to boot Materialize.
sasl_kerberos_keytab text The absolute path to your keytab. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_kinit_cmd text Shell command to refresh or acquire the client’s Kerberos ticket. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_min_time_before_relogin text Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_principal text Materialize Kerberos principal name. Required if sasl_mechanisms is GSSAPI.
sasl_kerberos_service_name text Kafka’s service name on its host, i.e. the service principal name not including /hostname@REALM. Required if sasl_mechanisms is GSSAPI.

Inline schema WITH options

Field Value Description
confluent_wire_format boolean Whether to look for the Confluent Schema Registry schema ID within Avro messages.

If you specify a source with an inline schemas you may still have records that are generated by a client that inlines a confluent schema registry ID at the beginning of each record. If confluent_wire_format is false then materialized will not validate that a well-formatted schema-id is present at the beginning of each record.

Details

Kafka source details

Partition offsets

The start_offset option comes with some quirks to be aware of:

The kafka_time_offset option sets start_offset for each available partition based on the Kafka timestamp and the source behaves as if start_offset was provided directly.

SSL-encrypted Kafka details

Enable connections to SSL-encrypted Kafka clusters using the appropriate WITH options.

Kerberized Kafka details

Enable connections to Kerberized Kafka clusters using the appropriate WITH options.

CSV format details

Materialize uses the format method you specify to determine the number of columns to create in the source, as well as the columns’ names.

Method Outcome
HEADER Materialize reads the first line of the file to determine:

• The number of columns in the file

• The name of each column

The first line of the file is not ingested as data.
HEADER (name_list) All of the same behaviors as bare HEADER with the additional features that:

• Header names from source objects will be validated to exactly match those specified in the name list.

• Specifying a column list allows using CSV format with sources that have headers but individual objects may not yet exist. Primarily this is intended for S3 sources.
n COLUMNS • Materialize treats the file as if it has n columns.

• Columns are named column1, column2columnN.

Note that:

Types

Materialize treats all columns in CSV sources as text. You can “type” this data using casts when creating views using this source, e.g.:

CREATE MATERIALIZED VIEW salaries AS
  SELECT (employee_id::int, salary::numeric(38, 2))
  FROM csv_employee_data;

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Key constraint details

Primary keys are automatically inferred for Kafka sources using Upsert or Debezium envelopes and Postgres sources.

For other source configurations, the key_constraint syntax allows to manually declare a set of columns as a primary key. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.

Example

Creating a source from a CSV-formatted Kafka topic

CREATE SOURCE csv_kafka (col_foo, col_bar, col_baz)
FROM KAFKA BROKER 'localhost:9092' TOPIC 'csv'
FORMAT CSV WITH 3 COLUMNS;

This creates a source that…

This creates a source that…

Setting partition offsets

CREATE SOURCE csv_kafka (col_foo, col_bar, col_baz)
FROM KAFKA BROKER 'localhost:9092' TOPIC 'csv'
WITH (start_offset=[0,10,100])
FORMAT CSV WITH 3 COLUMNS;

This creates a source that…

It is possible to set start_offset based on Kafka timestamps using the kafka_time_offset option.

Did this info help?
Yes No