Materialize Documentation
s
Join the Community github/materialize

CREATE SOURCE: Kinesis Data Streams

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee.

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

This page describes how to connect Materialize to Kinesis Data Streams to read data from individual streams.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM KINESIS ARN arn with_options FORMAT format_spec ENVELOPE NONE

format_spec

PROTOBUF MESSAGE message_name USING SCHEMA FILE path REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES

with_options

WITH ( static_credentials profile_name role_arn region = val )

static_credentials

access_key_id = val , secret_access_key = val , token = val
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KINESIS ARN arn The AWS ARN of the Kinesis Data Stream.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.

WITH options

Field Value Description
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.

Supported formats

Format
Append-only envelope Upsert envelope Debezium envelope
JSON
Protobuf
Text/bytes
CSV

Features

Using enhanced fan-out (EFO)

Not supported yet. If you’re interested in this feature, please leave a comment in #2192.

Setting start sequence numbers

Not supported yet. If you’re interested in this feature, please leave a comment in #5972.

Authentication

AWS credentials

AWS credentials can be provided on a per-source basis via the standard AWS credentials provider chain:

  1. Statically provided as part of the WITH block in source declaration. These credentials will be written to disk in plain text as well as being easily exposed via introspection commands and so this technique is only recommended for experimentation and development.
  2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_WEB_IDENTITY_TOKEN_FILE, and many more.
  3. credential_process command in the AWS config file, by default located at ~/.aws/config.
  4. AWS credentials file and profile files. By default located at ~/.aws/config and ~/.aws/credentials, respectively.
  5. The IAM instance profile provided by the Instance Metadata Service. This will only work if running on an EC2 instance with an instance profile.

Changed in v0.10.0: Materialize supports the standard AWS credentials provider chain as described above. Previously Materialize had support for only an unspecified subset of the standard credentials provider chain.

AWS credentials WITH options

Static credentials can be configured by the following WITH options:

Field Value type Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token (optional) text The session token associated with the credentials, if the credentials are temporary

Alternatively, you can specify a named config profile to assume. This named profile must exist within the AWS credentials or config file.

Field Value type Description
profile text An AWS config profile to assume. New in v0.20.0.

The following WITH options can be set with either static credentials, a profile, or alone depending on the environment for credentials.

Field Value type Description
role_arn text An IAM role to assume. New in v0.20.0.
region text The region to use for all AWS requests.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.

Permissions Required

The IAM User or Role used by materialized requires kinesis-read permissions and access to ListStreams and Read.

Examples

Creating a source

CREATE SOURCE json_source
  FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
  WITH ( access_key_id = 'access_key_id',
         secret_access_key = 'secret_access_key' )
  FORMAT BYTES;
CREATE MATERIALIZED VIEW jsonified_kinesis_source AS
  SELECT
    data->>'field1' AS field_1,
    data->>'field2' AS field_2,
    data->>'field3' AS field_3
  FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_source);
CREATE SOURCE proto_source
  FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
  WITH ( access_key_id = 'access_key_id',
         secret_access_key = 'secret_access_key' )
  FORMAT PROTOBUF MESSAGE 'billing.Batch'
    USING SCHEMA FILE '[path to schema]';
CREATE SOURCE text_source
  FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
  WITH ( access_key_id = 'access_key_id',
         secret_access_key = 'secret_access_key' )
  FORMAT TEXT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
  FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
  WITH ( access_key_id = 'access_key_id',
         secret_access_key = 'secret_access_key' )
  FORMAT CSV WITH 3 COLUMNS;

Known limitations

Resharding

Adjusting the number of shards in the source stream is not supported (#8776). If you reshard the stream, you’ll need to drop and recreate the source.