CREATE SOURCE: S3
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
This page describes how to connect Materialize to an S3 bucket to ingest the objects stored in it and/or listen to new object created events.
Syntax
format_spec
with_options
static_credentials
Field | Use |
---|---|
MATERIALIZED | Materializes the source’s data, which retains all data in memory and makes sources directly selectable. For more information, see Key Concepts: Materialized sources. |
src_name | The name for the source, which is used as its table name within SQL. |
col_name | Override default column name with the provided identifier. If used, a col_name must be provided for each column in the created source. |
DISCOVER OBJECTS | Describes how to discover keys to download. See Listing bucket objects and Listening to SQS notifications. |
BUCKET SCAN bucket_name | Materialize will scan the bucket to find the set of objects to download. See Listing bucket objects. |
SQS NOTIFICATIONS queue_name | Materialize will subscribe to the specified queue and download new objects. See Listening to SQS notifications. |
MATCHING pattern | A glob-style pattern to filter objects to ingest. See Patterns. Default is to ingest all objects. |
COMPRESSION NONE | (Default) Decoding downloaded objects does not use a compression algorithm. |
COMPRESSION algorithm | The compression algorithm used to decode downloaded objects. Using GZIP compression requires the object is compressed using gzip or that it is a concatenation of multiple gzip member streams. |
ENVELOPE NONE | (Default) Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. |
WITH ( option_list ) | Options affecting source creation. For more detail, see WITH options. |
WITH
options
Field | Value | Description |
---|---|---|
timestamp_frequency_ms |
int |
Default: 1000 . Sets the timestamping frequency in ms . Reflects how frequently the source advances its timestamp. This measure reflects how stale data in views will be. Lower values result in more-up-to-date views but may reduce throughput. |
Supported formats
Format |
Append-only envelope | Upsert envelope | Debezium envelope |
---|---|---|---|
JSON | ✓ | ||
Text/bytes | ✓ | ||
CSV | ✓ |
Features
Materialize supports two strategies for object discovery: listing bucket objects and listening to SQS notifications.
Both strategies follow the same basic pattern:
- Obtain a list of objects.
- Deduplicate objects so the same object is never downloaded twice.
- Filter the list of objects against any
patterns provided in the
MATCHING
clause. - Download the matching objects.
- Treat each object downloaded as a newline-delimited file for the purposes of record delineation.
You may specify multiple strategies within a single CREATE SOURCE
statement. For example, this is a valid DISCOVER OBJECTS
clause:
DISCOVER OBJECTS USING
BUCKET SCAN 'example-1',
BUCKET SCAN 'example-2',
SQS NOTIFICATIONS 'example-notifications'
Patterns
It’s possible to filter the list of object keys to download using Unix-style glob syntax as an
argument in the MATCHING
clause:
?
matches any single character except/
.*
matches zero or more characters except/
.**
recursively matches directories, but some other pattern must be specified. For example,a/**
matches anything inside of thea/
prefix, but nota/
itself); and**/a
matchesa
in any prefix, but nota
with no prefix.{a,b}
matchesa
orb
, wherea
andb
are arbitrary glob patterns.[ab]
matchesa
orb
wherea
andb
are characters. Prepend!
to the matched characters to invert the match. For example,[!ab]
matches any character besidesa
orb
.- You can escape metacharacters (such as
*
and?
) using character class notation. For example,[*]
matches*
.
Pattern examples
Pattern | Example matches | Example excludes |
---|---|---|
** |
a , a/b/c.json |
none |
2020/**/*.json |
2020/11/uuid.json |
data/2020/uuid.json , 2020/11/uuid.csv |
* |
a |
a/b |
202{0,1}/*/*.csv |
2020/11/data.csv |
2022/11/data.csv , 2020/11/01/data.csv |
Listing bucket objects
The BUCKET SCAN
discovery strategy performs a single scan over the specified bucket at source creation
time:
CREATE SOURCE csv_source
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 1 COLUMNS;
For an S3 source to ingest objects that are added to the bucket after the
source is created, you must additionally configure an SQS NOTIFICATIONS
discovery strategy on the source.
Listening to SQS notifications
Materialize can listen to new object created events in an S3 bucket through the S3 Event Notifications API.
To get new object notifications, the specified bucket must be configured to publish s3:ObjectCreated:*
event notifications to an SQS queue. Once this is set up, you can point Materialize at the SQS queue using the DISCOVER OBJECTS USING SQS NOTIFICATIONS
syntax:
CREATE SOURCE csv_source
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
BUCKET SCAN 'analytics',
SQS NOTIFICATIONS 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 1 COLUMNS;
For more details on configuring a bucket for notifications, follow the step-by-step instructions in the AWS documentation.
Configuring an SNS topic
Materialize deletes SQS messages as soon as they are ingested. This means that the same SQS queue cannot be used in multiple sources.
If you’d like to have multiple sources listening to notifications from the same bucket, you must configure an SNS topic as an intermediary, with multiple SQS queues subscribed to it (one per source). These queues must be configured to use raw message delivery.
Since Materialize treats unmaterialized sources with multiple downstream views as separate sources, SQS notifications can not be shared across multiple materializations of the same source. You must create separate SQS queues for each S3 notification source.
Authentication
AWS credentials
AWS credentials can be provided on a per-source basis via the standard AWS credentials provider chain:
- Statically provided as part of the
WITH
block in source declaration. These credentials will be written to disk in plain text as well as being easily exposed via introspection commands and so this technique is only recommended for experimentation and development. - Environment variables:
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
,AWS_WEB_IDENTITY_TOKEN_FILE
, and many more. credential_process
command in the AWS config file, by default located at~/.aws/config
.- AWS credentials file and profile files. By default located at
~/.aws/config
and~/.aws/credentials
, respectively. - The IAM instance profile provided by the Instance Metadata Service. This will only work if running on an EC2 instance with an instance profile.
Changed in v0.10.0: Materialize supports the standard AWS credentials provider chain as described above. Previously Materialize had support for only an unspecified subset of the standard credentials provider chain.
AWS credentials WITH
options
Static credentials can be configured by the following WITH
options:
Field | Value type | Description |
---|---|---|
access_key_id |
text |
A valid access key ID for the AWS resource. |
secret_access_key |
text |
A valid secret access key for the AWS resource. |
token (optional) |
text |
The session token associated with the credentials, if the credentials are temporary |
Alternatively, you can specify a named config profile to assume. This named profile must exist within the AWS credentials
or config
file.
Field | Value type | Description |
---|---|---|
profile |
text |
An AWS config profile to assume. New in v0.20.0. |
The following WITH
options can be set with either static credentials, a profile, or alone depending on the environment for credentials.
Field | Value type | Description |
---|---|---|
role_arn |
text |
An IAM role to assume. New in v0.20.0. |
region |
text |
The region to use for all AWS requests. |
Credentials fetched from a container or instance profile expire on a fixed schedule. Materialize will attempt to refresh the credentials automatically before they expire, but the source will become inoperable if the refresh operation fails.
Permissions required
The permissions required for the IAM User or Role used by materialized
depend on the strategy specified for object discovery.
For example, since the BUCKET SCAN
strategy must perform repeated ListObjects
actions to create a list of key names to download, you must grant the Materialize IAM User or Role the ListObjects
permission before you specify DISCOVER OBJECTS USING BUCKET SCAN
.
Object discovery strategy | Permissions required |
---|---|
All | GetObject permission for the objects to be downloaded |
BUCKET SCAN | ListObject permission for the buckets to be scanned, unless the MATCHING pattern can only match a single object. In such cases, Materialize will perform only the necessary GetObject API call. |
SQS NOTIFICATIONS | ChangeMessageVisibility , DeleteMessage , GetQueueUrl , ReceiveMessage SQS Permissions for the queue Materialize will listen to |
Examples
Creating a source
Assuming there is an S3 bucket analytics
that contains the following keys and
associated content:
users/2021/usage.json
{"user_id": 9999, "disks_used": 2, "cpu_used_minutes": 3600}
{"user_id": 888, "disks_used": 0}
{"user_id": 777, "disks_used": 25, "cpu_used_minutes": 9200}
users/2020/usage.json
{"user_id": 9999, "disks_used": 150, "cpu_used_minutes": 400100}
{"user_id": 888, "disks_used": 0}
{"user_id": 777, "disks_used": 12, "cpu_used_minutes": 999900}
To load all the keys:
CREATE SOURCE json_source
FROM S3 DISCOVER OBJECTS MATCHING '**/*.json' USING
BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT BYTES;
This creates a source that…
- Lazily scans the entire
analytics
bucket looking for objects that have keys that end with*.json
– this source is not MATERIALIZED, so nothing has happened yet and it cannot be queried. - Has one text column
text
and one automatically-generated integer columnmz_record
which reflects the order that materialized first encountered that row in.
To access the data as JSON, you can then use standard JSON functions and operators:
CREATE MATERIALIZED VIEW jsonified_s3_source AS
SELECT
data->>'user_id' AS user_id,
data->>'disks_used' AS disks_used,
data->>'cpu_used_minutes' AS cpu_used_minutes
FROM (SELECT text::jsonb AS data FROM json_source);
This creates a view that…
- Has three string columns (
user_id
,disks_used
, andcpu_used_minutes
). - Is MATERIALIZED, so will be cached in memory and is immediately queryable.
Assuming there is an S3 bucket frontend
that contains the following keys and associated
content:
logs/2020/12/31/frontend.log
99.99.44.44 - - [12/31/2020:23:55:59 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
80.91.33.133 - - [12/31/2020:23:55:02 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
173.203.139.108 - - [12/31/2020:23:55:07 +0000] "GET /wpadmin HTTP/1.1" 404 332 "-" "Firefox 9000"
173.203.139.108 - - [12/31/2020:23:55:14 +0000] "GET /downloads/materialized HTTP/1.1" 404 334 "-" "Python/Requests 22"
99.99.44.44 - - [12/31/2020:23:55:01 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
80.91.33.133 - - [12/31/2020:23:55:41 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
37.26.93.214 - - [12/31/2020:23:55:52 +0000] "GET /updates HTTP/1.1" 200 3318 "-" "Go 1.1 package http"
logs/2021/01/01/frontend.log
99.99.44.44 - - [01/01/2021:00:00:41 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
188.138.60.101 - - [01/01/2021:00:00:48 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
46.50.21.83 - - [01/01/2021:00:00:02 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22.01"
99.99.44.44 - - [01/01/2021:00:00:25 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
91.239.186.133 - - [01/01/2021:00:00:04 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
173.203.139.108 - - [01/01/2021:00:00:08 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
80.91.33.133 - - [01/01/2021:00:00:04 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Rust/reqwest 0.3"
93.190.71.150 - - [01/01/2021:00:00:33 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
91.234.194.89 - - [01/01/2021:00:00:57 +0000] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests 22"
46.4.83.163 - - [01/01/2021:00:00:20 +0000] "GET /downloads/materialized HTTP/1.1" 304 0 "-" "Python/Requests 22"
173.203.139.108 - - [01/01/2021:00:00:39 +0000] "GET /downloads/materialized HTTP/1.1" 404 335 "-" "Python/Requests 22"
TEXT
format
To create a source that ingests these logs and allows you to do some quick and dirty analysis:
CREATE MATERIALIZED SOURCE frontend_logs
FROM S3 DISCOVER OBJECTS MATCHING 'logs/202?/**/*.log' USING
BUCKET SCAN 'frontend'
WITH (region = 'us-east-2')
FORMAT TEXT;
From here, you can e.g. get all the lines that include the string updates
, ordered by the original position of the line in the file (mz_record
):
SELECT mz_record,
text
FROM frontend_logs
WHERE text LIKE '%updates%'
ORDER BY mz_record;
mz_record | text
1 | 99.99.44.44 - - [12/31/2020:23:55:59] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
7 | 37.26.93.214 - - [12/31/2020:23:55:52] "GET /updates HTTP/1.1" 200 3318 "-" "Go_1.1_package_http"
8 | 99.99.44.44 - - [01/01/2021:00:00:41] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
9 | 188.138.60.101 - - [01/01/2021:00:00:48] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
12 | 91.239.186.133 - - [01/01/2021:00:00:04] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
15 | 93.190.71.150 - - [01/01/2021:00:00:33] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
16 | 91.234.194.89 - - [01/01/2021:00:00:57] "GET /updates HTTP/1.1" 200 10020 "-" "Python/Requests_22"
REGEX
format
It’s generally more convenient to work with well-structured columnar data, so you
can use the REGEX
format specifier instead:
CREATE MATERIALIZED SOURCE frontend_logs
FROM S3 DISCOVER OBJECTS MATCHING 'logs/202?/**/*.log' USING
BUCKET SCAN 'frontend'
WITH (region = 'us-east-2')
FORMAT REGEX '(?P<ip>[^ ]+) - - \[?P<dt>([^]]_)\] "(?P<method>\w+) (?P<path>[^ ]+)[^"]+" (?P<status>\d+) (?P<content_length>\d+) "-" "(?P<user_agent>[^"]+)"';
From here, you can e.g. get all the lines that have /updates
as the exact path:
SELECT dt,
ip,
user_agent
FROM frontend_logs
WHERE path = '/updates';
dt | ip | user_agent
01/01/2021:00:00:04 | 91.239.186.133 | Python/Requests 22
01/01/2021:00:00:33 | 93.190.71.150 | Python/Requests 22
01/01/2021:00:00:41 | 99.99.44.44 | Python/Requests 22
01/01/2021:00:00:48 | 188.138.60.101 | Python/Requests 22
01/01/2021:00:00:57 | 91.234.194.89 | Python/Requests 22
12/31/2020:23:55:52 | 37.26.93.214 | Go 1.1 package http
12/31/2020:23:55:59 | 99.99.44.44 | Python/Requests 22
With a CSV header
Assuming there is an S3 bucket analytics
that contains the following keys and
associated content:
users/2021/engagement-with-header.csv
id,status,active time
9999,active,8 hours
888,inactive,
777,active,3 hours
users/2020/engagement-with-header.csv
id,status,active time
9999,active,750 hours
888,inactive,
777,active,1002 hours
To validate and remove header rows
when reading from an S3 bucket, you can use the FORMAT CSV WITH HEADER (column, column2, ...)
syntax. The column names are required for S3 sources, unlike file sources.
To load all the keys:
CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage) -- provide SQL names
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH HEADER (id, status, "active time"); -- expect a header for each file with these names
Without a CSV header
Assuming there is an S3 bucket analytics
that contains the following keys and
associated content:
users/2021/engagement.csv
9999,active,8 hours
888,inactive,
777,active,3 hours
users/2020/engagement.csv
9999,active,750 hours
888,inactive,
777,active,1002 hours
To load all the keys:
CREATE MATERIALIZED SOURCE csv_example (user_id, status, usage)
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 3 COLUMNS;
This creates a source that…
-
Scans the entire
analytics
bucket looking for objects that have keys ending with*.csv
-
Has three text columns:
user_id
,status
, andusage
; and one automatically-generated integer column (mz_record
) which reflects the order that materialized first encountered that row in.Lines in any object that do not have three columns will be ignored and an error-level message will be written to the Materialize log.
-
Materializes the contents in memory immediately upon issuing the command.
To handle well-typed data while stripping out some uninteresting columns, you can instead write an unmaterialized source and parse columns in a materialized view:
CREATE SOURCE csv_source (user_id, status, usage)
FROM S3 DISCOVER OBJECTS MATCHING '**/*.csv' USING
BUCKET SCAN 'analytics'
WITH (region = 'us-east-2')
FORMAT CSV WITH 3 COLUMNS;
CREATE MATERIALIZED VIEW csv_example AS
SELECT user_id::int4,
usage::interval
FROM csv_source;
This creates a view that has the same properties as above, except it…
- Has two columns (one integer, one interval)
- Does not store the string data in memory after it’s been parsed.
Known limitations
Supported envelopes
S3 sources are append-only, which means that Materialize silently ignores any deleted or updated objects.
Ordering guarantees
Object ingest order is not guaranteed, and Materialize may interleave multiple object ingestion to speed things up.