CREATE SOURCE: CSV from an S3 bucket
CREATE SOURCE connects Materialize to an external data source and lets you interact
with its data as if the data were in a SQL table.
This document details how to connect Materialize to an S3 Bucket that contains multiple objects, and to listen for new object creation. Each S3 object can contain multiple records serialized as CSV, separated by newlines.
Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.
|MATERIALIZED||Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see API Components: Materialized sources.|
|src_name||The name for the source, which is used as its table name within SQL.|
|col_name||Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.|
|DISCOVER OBJECTS||Describes how to discover keys to download. See Object discovery strategies.|
|BUCKET SCAN bucket_name||Materialize will scan the bucket to find the set of objects to download.|
|SQS NOTIFICATIONS queue_name||Materialize will subscribe to the specified queue and download new objects. See Listening to SQS notifications.|
|MATCHING pattern||A glob-style pattern to filter objects to ingest. See Patterns. Default is to ingest all objects.|
|COMPRESSION algorithm||The compression algorithm used to decode downloaded objects. See Compression.|
|WITH ( option_list )||Options affecting source creation. For more detail, see
|HEADER||Treat the first line of the CSV file as a header. See CSV format details.|
|HEADER (name_list)||Treat the first line of the CSV file as a header, validate that the columns in the file match the provided column list. See CSV format details.|
|n COLUMNS||Format the source’s data as a CSV with n columns. See CSV format details.|
|DELIMITED BY char||Delimit the CSV by char. ASCII comma by default (
|ENVELOPE NONE||(Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.|
|PRIMARY KEY ( col_list ) NOT ENFORCED||Declare a set of columns as a primary key. For more information, see
The following options are valid within the
||required A valid AWS region.|
||A valid access key ID for the AWS resource.|
||A valid secret access key for the AWS resource.|
||The session token associated with the credentials, if the credentials are temporary|
If you do not provide credentials via with options then
materialized will examine the standard
AWS authorization chain:
- Environment variables:
credential_processcommand in the AWS config file, usually located at
- AWS credentials file. Usually located at
- IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.
Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.
The IAM User or Role used by
materialized requires permission to perform different
AWS actions depending on which actions are required to discover
the list of objects to download.
DISCOVER OBJECTS USING clause describes how Materialize will load objects, and so its parameters
determine which permissions
materialized requires. For example, since the
SCAN key name
source (as in
DISCOVER OBJECTS USING BUCKET SCAN) must perform repeated
ListObjects actions to create a list
of key names to download, you must grant the Materialize IAM User or Role the
ListObjects permission before you specify
DISCOVER OBJECTS USING BUCKET SCAN.
|Key name source||Permissions required|
The root AWS documentation for S3 permissions is available here.
S3 source details
The S3 source is designed to ingest a large volume of static data from AWS’s Simple Storage Service, Amazon Web Services’ cloud object store. It is important to note that an object store behaves differently than a file system does, and that those differences affect how Materialize interacts with objects in it.
Some key differences from file systems include:
- Latency on individual S3 operations is much higher than on even cloud-network filesystems.
- It is not possible to seek to a point in an object; Materialize must download the entire object from the beginning to read to a point.
- All object operations are atomic. It is not possible to modify just part of an S3 object; you can only entirely replace or delete objects.
The primary effect of this is that we do not handle updates to S3 objects, and we may interleave multiple object ingestion to speed it up.
- Currently S3 sources do not support Avro- or Protobuf- encoded objects. Implementation issues: Avro, Protobuf
- Object ingest order is not guaranteed.
- All S3 sources are append-only. Deleted and updated S3 objects are silently ignored.
Object discovery strategies
Materialize has several techniques to discover the objects to download, with more planned for the future.
All strategies follow the same basic pattern. Materialize will:
- Obtain a list of objects using AWS APIs.
- Deduplicate objects so the same object is never downloaded twice.
- Filter the list of objects against the MATCHING clause patterns.
- Download the matching objects.
- Treat each object downloaded as a newline-delimited file for the purposes of record delineation.
No guarantees are made about the relative order of records sent from the object discovery process and the Materialize SQL engine. As usual with SQL, you must impose your own order.
You may specify strategies multiple times within one
CREATE SOURCE statement. For example, this is a legal invocation:
DISCOVER OBJECTS USING BUCKET SCAN 'example', BUCKET SCAN 'other', SQS NOTIFICATIONS 'example-notifications'
Listing Bucket objects
BUCKET SCAN discovery performs a single scan over the specified bucket at source creation
time. If you would like an S3 source to ingest objects that are added to the bucket after the
source is created you must also configure an
SQS NOTIFICATIONS discovery mechanism on the source.
Listening to SQS notifications
AWS S3 has a built-in method for notifying downstream applications of bucket modification, the
Event Notification API. For Materialize, the only interesting modifications are
object creation, aka the
s3:ObjectCreated:* event namespace. Follow the AWS
Tutorial to configure a bucket for exactly this namespace.
Once you have configured S3 notifications to go to an SQS queue, you can point Materialize at that
queue with DISCOVER OBJECTS USING SQS NOTIFICATIONS
Materialize deletes SQS messages as soon as they are ingested. This means that the same SQS queue cannot be used for multiple sources. If you would like to have multiple sources listening to notifications from the same bucket you must configure an SNS topic as an intermediary, with multiple SQS queues subscribed to it, one SQS queue per source. Note that SQS queues subscribed to SNS topics intended for Materialize must be configured to use raw message delivery.
Since Materialize treats unmaterialized sources with multiple downstream views as separate sources,
SQS NOTIFICATIONS should not be used with unmaterialized sources. This behavior of unmaterialized
sources is considered a bug (#7423),
and will be improved in a future release.
It is possible to filter the list of object keys to download using unix-style glob syntax as an
argument to the
?matches any single character except
*matches zero or more characters except
**recursively matches directories, but some other pattern must be specified. For example,
a/**matches anything inside of the
a/prefix (but not
ain any prefix, but not
awith no prefix.
bare arbitrary glob patterns.
bare characters. Prepend
!to the matched characters to invert the match, e.g.
[!ab]matches any character besides
- You can escape metacharacters such as
?with character class notation. For example,
|Pattern||Example Matches||Example Excludes|
COMPRESSIONis equivalent to
GZIPcompression requires the object be compressed using the
gzipalgorithm or that it be a concatenation of multiple
CSV format details
Materialize uses the format method you specify to determine the number of columns to create in the source, as well as the columns’ names.
|HEADER||Materialize reads the first line of the file to determine:
• The number of columns in the file
• The name of each column
The first line of the file is not ingested as data.
|HEADER (name_list)||All of the same behaviors as bare HEADER with the additional features that:
• Header names from source objects will be validated to exactly match those specified in the name list.
• Specifying a column list allows using CSV format with sources that have headers but individual objects may not yet exist. Primarily this is intended for S3 sources.
|n COLUMNS||• Materialize treats the file as if it has n columns.
• Columns are named
- You can override these naming conventions by explicitly naming columns using
the col_name option in
- All rows without the number of columns determined by the format are dropped, and Materialize logs an error.
- To avoid case-sensitivity conflicts with Materialize identifiers, we recommend you double-quote all field names when working with Avro-formatted sources. For more information, see Identifiers: Case sensitivity.
CREATE MATERIALIZED VIEW salaries AS SELECT (employee_id::int, salary::numeric(38, 2)) FROM csv_employee_data;
Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.
Key constraint details
For other source configurations, the
allows to manually declare a set of columns as a primary key. This enables
optimizations and constructs that rely on a key to be present when it cannot be
WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.
Example without CSV header
Assuming there is an S3 bucket “analytics” that contains the following keys and associated content:
9999,active,8 hours 888,inactive, 777,active,3 hours
9999,active,750 hours 888,inactive, 777,active,1002 hours
We can load all these keys with the following command:
CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage) FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING BUCKET SCAN 'analytics' WITH (region = 'us-east-2') FORMAT CSV WITH 3 COLUMNS;
This creates a source that…
Scans the entire
analyticsbucket looking for objects that have keys that end with
Has three text columns:
usageand one automatically-generated integer column
mz_recordwhich reflects the order that materialized first encountered that row in.
Lines in any object that do not have three columns will be ignored and an error-level message will be written to the Materialize log.
Materializes the contents in memory immediately upon issuing the command.
If we want to handle well-typed data while stripping out some uninteresting columns, we can instead write an unmaterialized source and parse columns in a view materialization:
CREATE SOURCE csv_source (user_id, status, usage) FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING BUCKET SCAN 'analytics' WITH (region = 'us-east-2') FORMAT CSV WITH 3 COLUMNS;
CREATE MATERIALIZED VIEW csv_example AS SELECT user_id::int4, usage::interval FROM csv_source;
This creates a view that has the same properties as above, except it:
- Has two columns (one integer, one interval)
- Does not store the string data in memory after it has been parsed
Example with CSV header
FORMAT CSV WITH HEADER (column, column2, ...) syntax to validate and remove header rows
when reading from an S3 bucket. The column names are required for S3 sources, unlike file sources.
id,status,active time 9999,active,8 hours 888,inactive, 777,active,3 hours
id,status,active time 9999,active,750 hours 888,inactive, 777,active,1002 hours
Load all these keys, while renaming the columns from the headers provided in the CSV files using the following command:
CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage) -- provide SQL names FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING BUCKET SCAN 'analytics' WITH (region = 'us-east-2') FORMAT CSV WITH HEADER (id, status, "active time"); -- expect a header for each file with these names