CREATE SOURCE: Kinesis Data Streams
CREATE SOURCE connects Materialize to an external data source and lets you interact
with its data as if the data were in a SQL table.
This page details how to connect Materialize to Kinesis Data Streams to read data from individual streams.
Sources represent connections to resources outside Materialize that it can read data from. For more information, see Key Concepts: Sources.
|MATERIALIZED||Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources.|
|src_name||The name for the source, which is used as its table name within SQL.|
|col_name||Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.|
|KINESIS ARN arn||The AWS ARN of the Kinesis Data Stream.|
|ENVELOPE NONE||(Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.|
|WITH ( option_list )||Options affecting source creation. For more detail, see
|Append-only envelope||Upsert envelope||Debezium envelope|
Using enhanced fan-out (EFO)
Not supported yet. If you’re interested in this feature, please leave a comment in #2192.
Setting start sequence numbers
Not supported yet. If you’re interested in this feature, please leave a comment in #5972.
AWS credentials can be provided on a per-source basis via the standard AWS credentials provider chain:
- Statically provided as part of the
WITHblock in source declaration. These credentials will be written to disk in plain text as well as being easily exposed via introspection commands and so this technique is only recommended for experimentation and development.
- Environment variables:
AWS_WEB_IDENTITY_TOKEN_FILE, and many more.
credential_processcommand in the AWS config file, by default located at
- AWS credentials file and profile files. By default located at
- The IAM instance profile provided by the Instance Metadata Service. This will only work if running on an EC2 instance with an instance profile.
Changed in v0.10.0: Materialize supports the standard AWS credentials provider chain as described above. Previously Materialize had support for only an unspecified subset of the standard credentials provider chain.
Static credentials can be configured by the following
||A valid access key ID for the AWS resource.|
||A valid secret access key for the AWS resource.|
||The session token associated with the credentials, if the credentials are temporary|
Alternatively, you can specify a named config profile to assume. This named profile must exist within the AWS
||An AWS config profile to assume. New in v0.20.0.|
WITH options can be set with either static credentials, a profile, or alone depending on the environment for credentials.
||An IAM role to assume. New in v0.20.0.|
||The region to use for all AWS requests.|
Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.
The IAM User or Role used by
kinesis-read permissions and access to
Creating a source
CREATE SOURCE json_source FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream' WITH ( access_key_id = 'access_key_id', secret_access_key = 'secret_access_key' ) FORMAT BYTES;
CREATE MATERIALIZED VIEW jsonified_kinesis_source AS SELECT data->>'field1' AS field_1, data->>'field2' AS field_2, data->>'field3' AS field_3 FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_source);
CREATE SOURCE proto_source FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream' WITH ( access_key_id = 'access_key_id', secret_access_key = 'secret_access_key' ) FORMAT PROTOBUF MESSAGE 'billing.Batch' USING SCHEMA FILE '[path to schema]';
CREATE SOURCE text_source FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream' WITH ( access_key_id = 'access_key_id', secret_access_key = 'secret_access_key' ) FORMAT TEXT;
CREATE SOURCE csv_source (col_foo, col_bar, col_baz) FROM KINESIS ARN 'arn:aws:kinesis:aws-region::stream/fake-stream' WITH ( access_key_id = 'access_key_id', secret_access_key = 'secret_access_key' ) FORMAT CSV WITH 3 COLUMNS;
- Resharding: adjusting the number of shards in the source stream is not supported (#8776). If you reshard the stream, you’ll need to drop and recreate the source.