Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: Protobuf over Kinesis

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee. Available only in unstable builds.

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to Protobuf-formatted Kinesis stream.

WARNING! Kinesis sources are volatile. Please make sure you understand the limitations of volatile sources.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM KINESIS ARN arn with_options FORMAT PROTOBUF MESSAGE message_name USING SCHEMA FILE schema_file_path inline_schema ENVELOPE NONE

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KINESIS ARN arn The AWS ARN of the Kinesis Data Stream.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
message_name The top-level Protobuf message name, in the format <package>.<message name>. For example, billing.Batch. For more detail, see Top-level message.
schema_file_path The absolute path to a file containing the FileDescriptorSet.
inline_schema A string representing the FileDescriptorSet.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Key constraint details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token text The session token associated with the credentials, if the credentials are temporary

If you do not provide credentials via with options then materialized will examine the standard AWS authorization chain:

  1. Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  2. credential_process command in the AWS config file, usually located at ~/.aws/config.
  3. AWS credentials file. Usually located at ~/.aws/credentials.
  4. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails. For details about the IAM account whose details you provide, see Kinesis source details.

Details

Kinesis source details

Protobuf format details

Protobuf-formatted external sources require:

FileDescriptorSet

The FileDescriptorSet encodes the Protobuf messages’ schema, which Materialize needs to decode incoming Protobuf data.

You can generate the FileDescriptorSet with protoc, e.g.

protoc --include_imports --descriptor_set_out=SCHEMA billing.proto

Top-level message

Materialize needs to know which message from your FileDescriptorSet is the top-level message to decode, along with its package name, in the following format:

<package name>.<top-level message>

For example, if our FileDescriptorSet were from a .proto file in the billing package, and our top-level message was called Batch, our message_name value would be:

billing.Batch

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Key constraint details

Primary keys are automatically inferred for Kafka sources using Upsert or Debezium envelopes and Postgres sources.

For other source configurations, the key_constraint syntax allows to manually declare a set of columns as a primary key. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.

Examples

Receiving Protobuf messages

Assuming you’ve already generated a FileDescriptorSet named SCHEMA:

CREATE SOURCE batches
FROM KINESIS ARN ... WITH (
    access_key_id = ...,
    secret_access_key = ...
)
FORMAT PROTOBUF MESSAGE '.billing.Batch'
  USING SCHEMA FILE '[path to schema]';

This creates a source that…

Did this info help?
Yes No