Materialize Logo

CREATE SOURCE: CSV from an S3 bucket

CREATE SOURCE connects Materialize to an external data source and lets you interact with its data as if the data were in a SQL table.

This document details how to connect Materialize to an S3 Bucket that contains multiple objects, and to listen for new object creation. Each S3 object can contain multiple records serialized as CSV, separated by newlines.

Conceptual framework

Sources represent connections to resources outside Materialize that it can read data from. For more information, see API Components: Sources.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , ) , key_constraint FROM S3 DISCOVER OBJECTS MATCHING pattern USING BUCKET SCAN bucket_name SQS NOTIFICATIONS queue_name , COMPRESSION NONE GZIP WITH ( region = region aws_credentials ) FORMAT CSV WITH column_count COLUMNS

key_constraint

PRIMARY KEY ( col_name , ) NOT ENFORCED
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Materialized source details.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
DISCOVER OBJECTS Describes how to discover keys to download. See Object discovery strategies.
BUCKET SCAN bucket_name Materialize will scan the bucket to find the set of objects to download.
SQS NOTIFICATIONS queue_name Materialize will subscribe to the specified queue and download new objects. See Listening to SQS notifications.
MATCHING pattern A glob-style pattern to filter objects to ingest. See Patterns. Default is to ingest all objects.
COMPRESSION algorithm The compression algorithm used to decode downloaded objects. See Compression.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.
HEADER Treat the first line of the CSV file as a header. See CSV format details.
n COLUMNS Format the source’s data as a CSV with n columns. See CSV format details.
DELIMITED BY char Delimit the CSV by char. ASCII comma by default (','). This must be an ASCII character; other Unicode code points are not supported.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
PRIMARY KEY ( col_list ) NOT ENFORCED Declare a set of columns as a primary key. For more information, see Key constraint details.

WITH options

The following options are valid within the WITH clause.

Field Value type Description
region text required A valid AWS region.

AWS Credentials WITH options

Field Value Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token text The session token associated with the credentials, if the credentials are temporary

If you do not provide credentials via with options then materialized will examine the standard AWS authorization chain:

  1. Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  2. credential_process command in the AWS config file, usually located at ~/.aws/config.
  3. AWS credentials file. Usually located at ~/.aws/credentials.
  4. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.

Permissions Required

The IAM User or Role used by materialized requires permission to perform different AWS actions depending on which actions are required to discover the list of objects to download.

The DISCOVER OBJECTS USING clause describes how Materialize will load objects, and so its parameters determine which permissions materialized requires. For example, since the SCAN key name source (as in DISCOVER OBJECTS USING BUCKET SCAN) must perform repeated ListObjects actions to create a list of key names to download, you must grant the Materialize IAM User or Role the ListObjects permission before you specify DISCOVER OBJECTS USING BUCKET SCAN.

Key name source Permissions required
All GetObject permission for the objects to be downloaded
BUCKET SCAN ListObject permission for the buckets to be scanned, unless the MATCHING pattern can only match a single object. In such cases, Materialize will perform only the necessary GetObject API call.
SQS NOTIFICATIONS ChangeMessageVisibility, DeleteMessage, GetQueueUrl, ReceiveMessage SQS Permissions for the queue Materialize will listen to

The root AWS documentation for S3 permissions is available here.

Details

Materialized source details

Materializing a source keeps data it receives in an in-memory index, the presence of which makes the source directly queryable. In contrast, non-materialized sources cannot process queries directly; to access the data the source receives, you need to create materialized views that SELECT from the source.

For a mental model, materializing the source is approximately equivalent to creating a non-materialized source, and then creating a materialized view from all of the source’s columns:

CREATE SOURCE src ...;
CREATE MATERIALIZED VIEW src_view AS SELECT * FROM src;

The actual implementation of materialized sources differs, though, by letting you refer to the source’s name directly in queries.

For more details about the impact of materializing sources (and implicitly creating an index), see CREATE INDEX: Details — Memory footprint.

S3 source details

The S3 source is designed to ingest a large volume of static data from AWS’s Simple Storage Service, Amazon Web Services’ cloud object store. It is important to note that an object store behaves differently than a file system does, and that those differences affect how Materialize interacts with objects in it.

Some key differences from file systems include:

The primary effect of this is that we do not handle updates to S3 objects, and we may interleave multiple object ingestion to speed it up.

S3 limitations

Object discovery strategies

Materialize has several techniques to discover the objects to download, with more planned for the future.

All strategies follow the same basic pattern. Materialize will:

No guarantees are made about the relative order of records sent from the object discovery process and the Materialize SQL engine. As usual with SQL, you must impose your own order.

You may specify strategies multiple times within one CREATE SOURCE statement. For example, this is a legal invocation:

DISCOVER OBJECTS USING
  BUCKET SCAN 'example',
  BUCKET SCAN 'other',
  SQS NOTIFICATIONS 'example-notifications'
Listing Bucket objects

The BUCKET SCAN discovery performs a single scan over the specified bucket at source creation time. If you would like an S3 source to ingest objects that are added to the bucket after the source is created you must also configure an SQS NOTIFICATIONS discovery mechanism on the source.

Listening to SQS notifications

AWS S3 has a built-in method for notifying downstream applications of bucket modification, the Event Notification API. For Materialize, the only interesting modifications are object creation, aka the s3:ObjectCreated:* event namespace. Follow the AWS Tutorial to configure a bucket for exactly this namespace.

Once you have configured S3 notifications to go to an SQS queue, you can point Materialize at that queue with DISCOVER OBJECTS USING SQS NOTIFICATIONS 'queue-name'.

Materialize deletes SQS messages as soon as they are ingested. This means that the same SQS queue cannot be used for multiple sources. Since Materialize treats unmaterialized sources with multiple downstream views as separate sources, SQS NOTIFICATIONS should not be used with unmaterialized sources. If you would like to have multiple sources listening to notifications from the same bucket you must configure an SNS topic as an intermediary, with multiple SQS queues subscribed to it. Note that SQS queues subscribed to SNS topics intended for Materialize must be configured to use raw message delivery.

Patterns

It is possible to filter the list of object keys to download using unix-style glob syntax as an argument to the MATCHING clause:

Pattern examples
Pattern Example Matches Example Excludes
** a , a/b/c.json none
2020/**/*.json 2020/11/uuid.json data/2020/uuid.json , 2020/11/uuid.csv
* a a/b
202{0,1}/*/*.csv 2020/11/data.csv 2022/11/data.csv , 2020/11/01/data.csv

Compression

CSV format details

Materialize uses the format method you specify to determine the number of columns to create in the source, as well as the columns’ names.

Method Outcome
HEADER Materialize reads the first line of the file to determine:

• The number of columns in the file

• The name of each column

The first line of the file is not ingested as data.
n COLUMNS • Materialize treats the file as if it has n columns.

• Columns are named column1, column2columnN.

Note that:

Types

Materialize treats all columns in CSV sources as text. You can “type” this data using casts when creating views using this source, e.g.:

CREATE MATERIALIZED VIEW salaries AS
  SELECT (employee_id::int, salary::numeric(38, 2))
  FROM csv_employee_data;

Append-only envelope

Append-only envelope means that all records received by the source is treated as an insert. This is Materialize’s default envelope (i.e. if no envelope is specified), and can be specified with ENVELOPE NONE.

Key constraint details

Primary keys are automatically inferred for Kafka sources using Upsert or Debezium envelopes and Postgres sources.

For other source configurations, the key_constraint syntax allows to manually declare a set of columns as a primary key. This enables optimizations and constructs that rely on a key to be present when it cannot be inferred.

WARNING! Materialize will not enforce the constraint and will produce wrong results if it is not correct.

Example

Assuming there is an S3 bucket “analytics” that contains the following keys and associated content:

users/2021/engagement.csv

9999,active,8 hours
888,inactive,
777,active,3 hours

users/2020/engagement.csv

9999,active,750 hours
888,inactive,
777,active,1002 hours

We can load all these keys with the following command:

CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage)
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 3 COLUMNS;

This creates a source that…

If we want to handle well-typed data while stripping out some uninteresting columns, we can instead write an unmaterialized source and parse columns in a view materialization:

CREATE SOURCE csv_source (user_id, status, usage)
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 3 COLUMNS;
CREATE MATERIALIZED VIEW csv_example AS
SELECT user_id::int4, usage::interval FROM csv_source;

This creates a view that has the same properties as above, except it:

Did this info help?
Yes No