Materialize Documentation
s
Join the Community github/materialize

CREATE SOURCE: S3

BETA! This feature is in beta. It may have performance or stability issues and is not subject to our backwards compatibility guarantee.

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

This page describes how to connect Materialize to an S3 bucket to ingest the objects stored in it and/or listen to new object created events.

Syntax

CREATE MATERIALIZED SOURCE IF NOT EXISTS src_name ( col_name , , key_constraint ) FROM S3 DISCOVER OBJECTS MATCHING pattern USING BUCKET SCAN bucket_name SQS NOTIFICATIONS queue_name , COMPRESSION NONE GZIP with_options FORMAT format_spec ENVELOPE NONE

format_spec

REGEX regex CSV WITH HEADER ( col_name , ) n COLUMNS DELIMITED BY char TEXT BYTES

with_options

WITH ( static_credentials profile_name role_arn region = val )

static_credentials

access_key_id = val , secret_access_key = val , token = val
Field Use
MATERIALIZED Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources.
src_name The name for the source, which is used as its table name within SQL.
col_name Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source.
DISCOVER OBJECTS Describes how to discover keys to download. See Listing bucket objects and Listening to SQS notifications.
BUCKET SCAN bucket_name Materialize will scan the bucket to find the set of objects to download. See Listing bucket objects.
SQS NOTIFICATIONS queue_name Materialize will subscribe to the specified queue and download new objects. See Listening to SQS notifications.
MATCHING pattern A glob-style pattern to filter objects to ingest. See Patterns. Default is to ingest all objects.
COMPRESSION NONE (Default) Decoding downloaded objects does not use a compression algorithm.
COMPRESSION algorithm The compression algorithm used to decode downloaded objects. Using GZIP compression requires the object is compressed using gzip or that it is a concatenation of multiple gzip member streams.
ENVELOPE NONE (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.
WITH ( option_list ) Options affecting source creation. For more detail, see WITH options.

WITH options

Field Value Description
timestamp_frequency_ms int Default: 1000. Sets the timestamping frequency in ms. Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput.

Supported formats

Format
Append-only envelope Upsert envelope Debezium envelope
JSON
Text/bytes
CSV

Features

Materialize supports two strategies for object discovery: listing bucket objects and listening to SQS notifications.

Both strategies follow the same basic pattern:

You may specify multiple strategies within a single CREATE SOURCE statement. For example, this is a valid DISCOVER OBJECTS clause:

DISCOVER OBJECTS USING
  BUCKET SCAN 'example-1',
  BUCKET SCAN 'example-2',
  SQS NOTIFICATIONS 'example-notifications'

Patterns

It’s possible to filter the list of object keys to download using Unix-style glob syntax as an argument in the MATCHING clause:

Pattern examples
Pattern Example matches Example excludes
** a , a/b/c.json none
2020/**/*.json 2020/11/uuid.json data/2020/uuid.json , 2020/11/uuid.csv
* a a/b
202{0,1}/*/*.csv 2020/11/data.csv 2022/11/data.csv , 2020/11/01/data.csv

Listing bucket objects

The BUCKET SCAN discovery strategy performs a single scan over the specified bucket at source creation time:

CREATE SOURCE csv_source
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
    BUCKET SCAN 'analytics'
  WITH (region = 'us-east-2')
  FORMAT CSV WITH 1 COLUMNS;

For an S3 source to ingest objects that are added to the bucket after the source is created, you must additionally configure an SQS NOTIFICATIONS discovery strategy on the source.

Listening to SQS notifications

Materialize can listen to new object created events in an S3 bucket through the S3 Event Notifications API.

To get new object notifications, the specified bucket must be configured to publish s3:ObjectCreated:* event notifications to an SQS queue. Once this is set up, you can point Materialize at the SQS queue using the DISCOVER OBJECTS USING SQS NOTIFICATIONS syntax:

CREATE SOURCE csv_source
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
    BUCKET SCAN 'analytics',
    SQS NOTIFICATIONS 'analytics'
  WITH (region = 'us-east-2')
  FORMAT CSV WITH 1 COLUMNS;

For more details on configuring a bucket for notifications, follow the step-by-step instructions in the AWS documentation.

Configuring an SNS topic

Materialize deletes SQS messages as soon as they are ingested. This means that the same SQS queue cannot be used in multiple sources.

If you’d like to have multiple sources listening to notifications from the same bucket, you must configure an SNS topic as an intermediary, with multiple SQS queues subscribed to it (one per source). These queues must be configured to use raw message delivery.

Since Materialize treats unmaterialized sources with multiple downstream views as separate sources, SQS notifications can not be shared across multiple materializations of the same source. You must create separate SQS queues for each S3 notification source.

Authentication

AWS credentials

AWS credentials can be provided on a per-source basis via the standard AWS credentials provider chain:

  1. Statically provided as part of the WITH block in source declaration. These credentials will be written to disk in plain text as well as being easily exposed via introspection commands and so this technique is only recommended for experimentation and development.
  2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_WEB_IDENTITY_TOKEN_FILE, and many more.
  3. credential_process command in the AWS config file, by default located at ~/.aws/config.
  4. AWS credentials file and profile files. By default located at ~/.aws/config and ~/.aws/credentials, respectively.
  5. The IAM instance profile provided by the Instance Metadata Service. This will only work if running on an EC2 instance with an instance profile.

Changed in v0.10.0: Materialize supports the standard AWS credentials provider chain as described above. Previously Materialize had support for only an unspecified subset of the standard credentials provider chain.

AWS credentials WITH options

Static credentials can be configured by the following WITH options:

Field Value type Description
access_key_id text A valid access key ID for the AWS resource.
secret_access_key text A valid secret access key for the AWS resource.
token (optional) text The session token associated with the credentials, if the credentials are temporary

Alternatively, you can specify a named config profile to assume. This named profile must exist within the AWS credentials or config file.

Field Value type Description
profile text An AWS config profile to assume. New in v0.20.0.

The following WITH options can be set with either static credentials, a profile, or alone depending on the environment for credentials.

Field Value type Description
role_arn text An IAM role to assume. New in v0.20.0.
region text The region to use for all AWS requests.

Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.

Permissions required

The permissions required for the IAM User or Role used by materialized depend on the strategy specified for object discovery.

For example, since the BUCKET SCAN strategy must perform repeated ListObjects actions to create a list of key names to download, you must grant the Materialize IAM User or Role the ListObjects permission before you specify DISCOVER OBJECTS USING BUCKET SCAN.

Object discovery strategy Permissions required
All GetObject permission for the objects to be downloaded
BUCKET SCAN ListObject permission for the buckets to be scanned, unless the MATCHING pattern can only match a single object. In such cases, Materialize will perform only the necessary GetObject API call.
SQS NOTIFICATIONS ChangeMessageVisibility, DeleteMessage, GetQueueUrl, ReceiveMessage SQS Permissions for the queue Materialize will listen to

Examples

Creating a source

Assuming there is an S3 bucket analytics that contains the following keys and associated content:

users/2021/usage.json

{"user_id": 9999, "disks_used": 2, "cpu_used_minutes": 3600}
{"user_id": 888, "disks_used": 0}
{"user_id": 777, "disks_used": 25, "cpu_used_minutes": 9200}

users/2020/usage.json

{"user_id": 9999, "disks_used": 150, "cpu_used_minutes": 400100}
{"user_id": 888, "disks_used": 0}
{"user_id": 777, "disks_used": 12, "cpu_used_minutes": 999900}

To load all the keys:

CREATE SOURCE json_source
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.json' USING
    BUCKET SCAN 'analytics'
  WITH (region = 'us-east-2')
  FORMAT BYTES;

This creates a source that…

  • Lazily scans the entire analytics bucket looking for objects that have keys that end with *.json – this source is not MATERIALIZED, so nothing has happened yet and it cannot be queried.
  • Has one text column text and one automatically-generated integer column mz_record which reflects the order that materialized first encountered that row in.

To access the data as JSON, you can then use standard JSON functions and operators:

CREATE MATERIALIZED VIEW jsonified_s3_source AS
  SELECT
    data->>'user_id' AS user_id,
    data->>'disks_used' AS disks_used,
    data->>'cpu_used_minutes' AS cpu_used_minutes
  FROM (SELECT text::jsonb AS data FROM json_source);

This creates a view that…

  • Has three string columns (user_id, disks_used, and cpu_used_minutes).
  • Is MATERIALIZED, so will be cached in memory and is immediately queryable.

Assuming there is an S3 bucket frontend that contains the following keys and associated content:

logs/2020/12/31/frontend.log

99.99.44.44 - - [12/31/2020:23:55:59 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
80.91.33.133 - - [12/31/2020:23:55:02 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
173.203.139.108 - - [12/31/2020:23:55:07 +0000] "GET /wpadmin HTTP/1.1" 404 332 "-" "Firefox 9000"
173.203.139.108 - - [12/31/2020:23:55:14 +0000] "GET /downloads/materialized HTTP/1.1" 404 334 "-" "Python/Requests 22"
99.99.44.44 - - [12/31/2020:23:55:01 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
80.91.33.133 - - [12/31/2020:23:55:41 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
37.26.93.214 - - [12/31/2020:23:55:52 +0000] "GET /updates HTTP/1.1" 200 3318 "-" "Go 1.1 package http"

logs/2021/01/01/frontend.log

99.99.44.44 - - [01/01/2021:00:00:41 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
188.138.60.101 - - [01/01/2021:00:00:48 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
46.50.21.83 - - [01/01/2021:00:00:02 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22.01"
99.99.44.44 - - [01/01/2021:00:00:25 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
91.239.186.133 - - [01/01/2021:00:00:04 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
173.203.139.108 - - [01/01/2021:00:00:08 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
80.91.33.133 - - [01/01/2021:00:00:04 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
93.190.71.150 - - [01/01/2021:00:00:33 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
91.234.194.89 - - [01/01/2021:00:00:57 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
46.4.83.163 - - [01/01/2021:00:00:20 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
173.203.139.108 - - [01/01/2021:00:00:39 +0000] "GET /downloads/materialized HTTP/1.1" 404 335 "-" "Python/Requests 22"

TEXT format

To create a source that ingests these logs and allows you to do some quick and dirty analysis:

CREATE MATERIALIZED SOURCE frontend_logs
  FROM S3 DISCOVER OBJECTS MATCHING 'logs/202?/**/*.log' USING
    BUCKET SCAN 'frontend'
  WITH (region = 'us-east-2')
  FORMAT TEXT;

From here, you can e.g. get all the lines that include the string updates, ordered by the original position of the line in the file (mz_record):

SELECT mz_record,
       text
FROM frontend_logs
WHERE text LIKE '%updates%'
ORDER BY mz_record;

 mz_record |                                     text
 1         | 99.99.44.44 - - [12/31/2020:23:55:59] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
 7         | 37.26.93.214 - - [12/31/2020:23:55:52] "GET /updates HTTP/1.1" 200 3318 "-" "Go_1.1_package_http"
 8         | 99.99.44.44 - - [01/01/2021:00:00:41] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
 9         | 188.138.60.101 - - [01/01/2021:00:00:48] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
 12        | 91.239.186.133 - - [01/01/2021:00:00:04] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
 15        | 93.190.71.150 - - [01/01/2021:00:00:33] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
 16        | 91.234.194.89 - - [01/01/2021:00:00:57] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"

REGEX format

It’s generally more convenient to work with well-structured columnar data, so you can use the REGEX format specifier instead:

CREATE MATERIALIZED SOURCE frontend_logs
  FROM S3 DISCOVER OBJECTS MATCHING 'logs/202?/**/*.log' USING
    BUCKET SCAN 'frontend'
  WITH (region = 'us-east-2')
  FORMAT REGEX '(?P<ip>[^ ]+) - - \[?P<dt>([^]]_)\] "(?P<method>\w+) (?P<path>[^ ]+)[^"]+" (?P<status>\d+) (?P<content_length>\d+) "-" "(?P<user_agent>[^"]+)"';

From here, you can e.g. get all the lines that have /updates as the exact path:

SELECT dt,
       ip,
       user_agent
FROM frontend_logs
WHERE path = '/updates';

       dt           |      ip        |    user_agent
01/01/2021:00:00:04 | 91.239.186.133 | Python/Requests 22
01/01/2021:00:00:33 | 93.190.71.150  | Python/Requests 22
01/01/2021:00:00:41 | 99.99.44.44    | Python/Requests 22
01/01/2021:00:00:48 | 188.138.60.101 | Python/Requests 22
01/01/2021:00:00:57 | 91.234.194.89  | Python/Requests 22
12/31/2020:23:55:52 | 37.26.93.214   | Go 1.1 package http
12/31/2020:23:55:59 | 99.99.44.44    | Python/Requests 22

With a CSV header

Assuming there is an S3 bucket analytics that contains the following keys and associated content:

users/2021/engagement-with-header.csv

id,status,active time
9999,active,8 hours
888,inactive,
777,active,3 hours

users/2020/engagement-with-header.csv

id,status,active time
9999,active,750 hours
888,inactive,
777,active,1002 hours

To validate and remove header rows when reading from an S3 bucket, you can use the FORMAT CSV WITH HEADER (column, column2, ...) syntax. The column names are required for S3 sources, unlike file sources. To load all the keys:

CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage) -- provide SQL names
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
    BUCKET SCAN 'analytics'
  WITH (region = 'us-east-2')
  FORMAT CSV WITH HEADER (id, status, "active time"); -- expect a header for each file with these names

Without a CSV header

Assuming there is an S3 bucket analytics that contains the following keys and associated content:

users/2021/engagement.csv

9999,active,8 hours
888,inactive,
777,active,3 hours

users/2020/engagement.csv

9999,active,750 hours
888,inactive,
777,active,1002 hours

To load all the keys:

CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage)
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
    BUCKET SCAN 'analytics'
  WITH (region = 'us-east-2')
  FORMAT CSV WITH 3 COLUMNS;

This creates a source that…

  • Scans the entire analytics bucket looking for objects that have keys ending with *.csv

  • Has three text columns: user_id, status, and usage; and one automatically-generated integer column (mz_record) which reflects the order that materialized first encountered that row in.

    Lines in any object that do not have three columns will be ignored and an error-level message will be written to the Materialize log.

  • Materializes the contents in memory immediately upon issuing the command.

To handle well-typed data while stripping out some uninteresting columns, you can instead write an unmaterialized source and parse columns in a materialized view:

CREATE SOURCE csv_source (user_id, status, usage)
  FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
    BUCKET SCAN 'analytics'
  WITH (region = 'us-east-2')
  FORMAT CSV WITH 3 COLUMNS;
CREATE MATERIALIZED VIEW csv_example AS
  SELECT user_id::int4,
         usage::interval
  FROM csv_source;

This creates a view that has the same properties as above, except it…

  • Has two columns (one integer, one interval)
  • Does not store the string data in memory after it’s been parsed.

Known limitations

Supported envelopes

S3 sources are append-only, which means that Materialize silently ignores any deleted or updated objects.

Ordering guarantees

Object ingest order is not guaranteed, and Materialize may interleave multiple object ingestion to speed things up.