Materialize Documentation
Join the Community github/materialize

CREATE SOURCE: JSON over Kinesis

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee.

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to JSON-formatted Kinesis streams.

WARNING! Kinesis sources are volatile. Please make sure you understand the limitations of volatile sources.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , ) FROM KINESIS ARN arn with_options FORMAT BYTES ENVELOPE NONE

with_options

WITH ( field = val , )
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
KINESIS ARN arn The AWS ARN of the Kinesis Data Stream.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
FORMAT BYTES Leave data received from the source as unformatted bytes, and store them in a column named data. However, you can easily transform the data to JSON. For more details, see Byte format details (JSON).
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token text The session token associated with the credentials, if the credentials are temporary
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.

If you do not provide credentials via with options then materialized will examine the standard AWS authorization chain:

  1. Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  2. credential_process command in the AWS config file, usually located at ~/.aws/config.
  3. AWS credentials file. Usually located at ~/.aws/credentials.
  4. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails. For details about the IAM account whose details you provide, see Kinesis source details.

Details

Kinesis source details

Raw byte format details

Raw byte-formatted sources provide Materialize the raw bytes received from the source without applying any formatting or decoding.

Raw byte-formatted sources have one column, which, by default, is named data.

Extracting JSON data from bytes

Materialize cannot receive JSON data directly from a source. Instead, you must create a source that stores the data it receives as raw bytes (FORMAT BYTES), and then construct views that provides access to your JSON data by casting the source’s bytea column (named data) to text, and then to jsonb.

CREATE MATERIALIZED VIEW jsonified_bytes AS
SELECT CAST(data AS JSONB) AS data
FROM (
    SELECT CONVERT_FROM(data, 'utf8') AS data
    FROM bytea_source
)

jsonb data expresses a JSON object similar to PostgreSQL’s implementation. For more information, see jsonb.

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Examples

CREATE SOURCE kinesis_source
FROM KINESIS ARN ... WITH (
    access_key_id = ...,
    secret_access_key = ...
)
FORMAT BYTES;

This creates a source that…

To use this data in views, you can decode its bytes into jsonb. For example:

CREATE MATERIALIZED VIEW jsonified_kinesis_source AS
  SELECT CAST(data AS jsonb) AS data
  FROM (
      SELECT convert_from(data, 'utf8') AS data
      FROM kinesis_source
  )
Did this info help?
Yes No