Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: CSV over Kinesis

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee. Available only in unstable builds.

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to CSV-formatted Kinesis streams.

WARNING! Kinesis sources are volatile. Please make sure you understand the limitations of volatile sources.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM KINESIS ARN arn with_options FORMAT CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char ENVELOPE NONE

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KINESIS ARN arn The AWS ARN of the Kinesis Data Stream.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
HEADER Treat the first line of the CSV file as a header. See CSV format details.
HEADER (name_list) Treat the first line of the CSV file as a header, validate that the columns in the file match the provided column list. See CSV format details.
n COLUMNS Format the source’s data as a CSV with n columns. See CSV format details.
DELIMITED BY char Delimit the CSV by char. ASCII comma by default (','). This must be an ASCII character; other Unicode code points are not supported.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Key constraint details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token text The session token associated with the credentials, if the credentials are temporary

If you do not provide credentials via with options then materialized will examine the standard AWS authorization chain:

  1. Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  2. credential_process command in the AWS config file, usually located at ~/.aws/config.
  3. AWS credentials file. Usually located at ~/.aws/credentials.
  4. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails. For details about the IAM account whose details you provide, see Kinesis source details.

Details

Kinesis source details

CSV format details

Materialize uses the format method you specify to determine the number of columns to create in the source, as well as the columns’ names.

Method Outcome
HEADER Materialize reads the first line of the file to determine:

• The number of columns in the file

• The name of each column

The first line of the file is not ingested as data.
HEADER (name_list) All of the same behaviors as bare HEADER with the additional features that:

• Header names from source objects will be validated to exactly match those specified in the name list.

• Specifying a column list allows using CSV format with sources that have headers but individual objects may not yet exist. Primarily this is intended for S3 sources.
n COLUMNS • Materialize treats the file as if it has n columns.

• Columns are named column1, column2columnN.

Note that:

Types

Materialize treats all columns in CSV sources as text. You can “type” this data using casts when creating views using this source, e.g.:

CREATE MATERIALIZED VIEW salaries AS
  SELECT (employee_id::int, salary::numeric(38, 2))
  FROM csv_employee_data;

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Key constraint details

Primary keys are automatically inferred for Kafka sources using Upsert or Debezium envelopes and Postgres sources.

For other source configurations, the key_constraint syntax allows to manually declare a set of columns as a primary key. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.

Example

Creating a source from a CSV-formatted Kinesis stream

CREATE SOURCE csv_kinesis (col_foo, col_bar, col_baz)
FROM KINESIS ARN ... WITH (
    access_key_id = ...,
    secret_access_key = ...
)
FORMAT CSV WITH 3 COLUMNS;

This creates a source that…

Did this info help?
Yes No