CREATE SOURCE: Kinesis Data Streams
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
This page describes how to connect Materialize to Kinesis Data Streams to read data from individual streams.
Syntax
format_spec
with_options
static_credentials
Field | Use |
---|---|
MATERIALIZED | Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources. |
src_name | The name for the source, which is used as its table name within SQL. |
col_name | Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source. |
KINESIS ARN arn | The AWS ARN of the Kinesis Data Stream. |
ENVELOPE NONE | (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. |
WITH ( option_list ) | Options affecting source creation. For more detail, see WITH options. |
WITH
options
Field | Value | Description |
---|---|---|
timestamp_frequency_ms |
int |
Default: 1000 . Sets the timestamping frequency in ms . Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput. |
Supported formats
Format |
Append-only envelope | Upsert envelope | Debezium envelope |
---|---|---|---|
JSON | ✓ | ||
Protobuf | ✓ | ||
Text/bytes | ✓ | ||
CSV | ✓ |
Features
Using enhanced fan-out (EFO)
Not supported yet. If you’re interested in this feature, please leave a comment in #2192.
Setting start sequence numbers
Not supported yet. If you’re interested in this feature, please leave a comment in #5972.
Authentication
AWS credentials
AWS credentials can be provided on a per-source basis via the standard AWS credentials provider chain:
- Statically provided as part of the
WITH
block in source declaration. These credentials will be written to disk in plain text as well as being easily exposed via introspection commands and so this technique is only recommended for experimentation and development. - Environment variables:
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
,AWS_WEB_IDENTITY_TOKEN_FILE
, and many more. credential_process
command in the AWS config file, by default located at~/.aws/config
.- AWS credentials file and profile files. By default located at
~/.aws/config
and~/.aws/credentials
, respectively. - The IAM instance profile provided by the Instance Metadata Service. This will only work if running on an EC2 instance with an instance profile.
Changed in v0.10.0: Materialize supports the standard AWS credentials provider chain as described above. Previously Materialize had support for only an unspecified subset of the standard credentials provider chain.
AWS credentials WITH
options
Static credentials can be configured by the following WITH
options:
Field | Value type | Description |
---|---|---|
access_key_id |
text |
A valid access key ID for the AWS resource. |
secret_access_key |
text |
A valid secret access key for the AWS resource. |
token (optional) |
text |
The session token associated with the credentials, if the credentials are temporary |
Alternatively, you can specify a named config profile to assume. This named profile must exist within the AWS credentials
or config
file.
Field | Value type | Description |
---|---|---|
profile |
text |
An AWS config profile to assume. New in v0.20.0. |
The following WITH
options can be set with either static credentials, a profile, or alone depending on the environment for credentials.
Field | Value type | Description |
---|---|---|
role_arn |
text |
An IAM role to assume. New in v0.20.0. |
region |
text |
The region to use for all AWS requests. |
Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.
Permissions Required
The IAM User or Role used by materialized
requires kinesis-read
permissions and access to ListStreams
and Read
.
Examples
Creating a source
CREATE SOURCE json_source
FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
WITH ( access_key_id = 'access_key_id',
secret_access_key = 'secret_access_key' )
FORMAT BYTES;
CREATE MATERIALIZED VIEW jsonified_kinesis_source AS
SELECT
data->>'field1' AS field_1,
data->>'field2' AS field_2,
data->>'field3' AS field_3
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_source);
CREATE SOURCE proto_source
FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
WITH ( access_key_id = 'access_key_id',
secret_access_key = 'secret_access_key' )
FORMAT PROTOBUF MESSAGE 'billing.Batch'
USING SCHEMA FILE '[path to schema]';
CREATE SOURCE text_source
FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
WITH ( access_key_id = 'access_key_id',
secret_access_key = 'secret_access_key' )
FORMAT TEXT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz)
FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream'
WITH ( access_key_id = 'access_key_id',
secret_access_key = 'secret_access_key' )
FORMAT CSV WITH 3 COLUMNS;
Known limitations
Resharding
Adjusting the number of shards in the source stream is not supported (#8776). If you reshard the stream, you’ll need to drop and recreate the source.