CREATE SOURCE: MySQL
CREATE SOURCE
connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
Materialize supports MySQL (5.7+) as a real-time data source. To connect to a MySQL database, you first need to tweak its configuration to enable GTID-based binary log (binlog) replication, and then create a connection in Materialize that specifies access and authentication parameters.
Syntax
schema
and database
are synonyms in MySQL,
the MySQL source documentation and syntax standardize on schema
as the
preferred keyword.
with_options
Field | Use |
---|---|
src_name | The name for the source. |
IF NOT EXISTS | Do nothing (except issuing a notice) if a source with the same name already exists. Default. |
IN CLUSTER cluster_name | The cluster to maintain this source. |
CONNECTION connection_name | The name of the MySQL connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page. |
FOR ALL TABLES | Create subsources for all tables in all schemas upstream. The mysql system schema is ignored. |
FOR SCHEMAS ( schema_list ) | Create subsources for specific schemas upstream. |
FOR TABLES ( table_list ) | Create subsources for specific tables upstream. Requires fully-qualified table names (<schema>.<table> ). |
EXPOSE PROGRESS AS progress_subsource_name | The name of the progress collection for the source. If this is not specified, the progress collection will be named <src_name>_progress . For more information, see Monitoring source progress. |
RETAIN HISTORY FOR retention_period |
Private preview. This option has known performance or stability issues and is under active development. Duration for which Materialize retains historical data, which is useful to implement durable subscriptions. Accepts positive interval values (e.g. '1hr' ). Default: 1s . |
CONNECTION
options
Field | Value | Description |
---|---|---|
EXCLUDE COLUMNS |
A list of fully-qualified names | Exclude specific columns that cannot be decoded or should not be included in the subsources created in Materialize. |
TEXT COLUMNS |
A list of fully-qualified names | Decode data as text for specific columns that contain MySQL types that are unsupported in Materialize. |
Features
Change data capture
The source uses MySQL’s binlog replication protocol to continually ingest
changes resulting from INSERT
, UPDATE
and DELETE
operations in the
upstream database. This process is known as change data capture.
The replication method used is based on global transaction identifiers (GTIDs), and guarantees transactional consistency — any operation inside a MySQL transaction is assigned the same timestamp in Materialize, which means that the source will never show partial results based on partially replicated transactions.
Before creating a source in Materialize, you must configure the upstream MySQL database for GTID-based binlog replication. This requires the following configuration changes:
Configuration parameter | Value | Details |
---|---|---|
log_bin |
ON |
|
binlog_format |
ROW |
This configuration is deprecated as of MySQL 8.0.34. Newer versions of MySQL default to row-based logging. |
binlog_row_image |
FULL |
|
gtid_mode |
ON |
|
enforce_gtid_consistency |
ON |
|
replica_preserve_commit_order |
ON |
Only required when connecting Materialize to a read-replica for replication, rather than the primary server. |
If you’re running MySQL using a managed service, further configuration changes might be required. For step-by-step instructions on enabling GTID-based binlog replication for your MySQL service, see the integration guides.
Binlog retention
By default, MySQL retains binlog files for 30 days (i.e., 2592000 seconds)
before automatically removing them. This is configurable via the
binlog_expire_logs_seconds
system variable. We recommend using the default value for this configuration in
order to not compromise Materialize’s ability to resume replication in case of
failures or restarts.
In some MySQL managed services, binlog expiration can be overriden by a service-specific configuration parameter. It’s important that you double-check if such a configuration exists, and ensure it’s set to the maximum interval available.
As an example, Amazon RDS for MySQL has its
own configuration parameter for binlog retention (binlog retention hours
)
that overrides binlog_expire_logs_seconds
and is set to NULL
by default.
Creating a source
Materialize ingests the raw replication stream data for all (or a specific set of) tables in your upstream MySQL database.
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection
FOR ALL TABLES;
When you define a source, Materialize will automatically:
-
Create a subsource for each original table upstream, and perform an initial, snapshot-based sync of the tables before it starts ingesting change events.
SHOW SOURCES;
name | type | cluster | ----------------------+-----------+------------ mz_source | mysql | mz_source_progress | progress | table_1 | subsource | table_2 | subsource |
-
Incrementally update any materialized or indexed views that depend on the source as change events stream in, as a result of
INSERT
,UPDATE
andDELETE
operations in the upstream MySQL database.
It’s important to note that the schema metadata is captured when the source is initially created, and is validated against the upstream schema upon restart. If you create new tables upstream after creating a MySQL source and want to replicate them to Materialize, the source must be dropped and recreated.
MySQL schemas
CREATE SOURCE
will attempt to create each upstream table in the same schema as
the source. This may lead to naming collisions if, for example, you are
replicating schema1.table_1
and schema2.table_1
. Use the FOR TABLES
clause to provide aliases for each upstream table, in such cases, or to specify
an alternative destination schema in Materialize.
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection
FOR TABLES (schema1.table_1 AS s1_table_1, schema2.table_1 AS s2_table_1);
Monitoring source progress
By default, MySQL sources expose progress metadata as a subsource that you
can use to monitor source ingestion progress. The name of the progress
subsource can be specified when creating a source using the EXPOSE PROGRESS AS
clause; otherwise, it will be named <src_name>_progress
.
The following metadata is available for each source as a progress subsource:
Field | Type | Details |
---|---|---|
source_id_lower |
uuid |
The lower-bound GTID source_id of the GTIDs covered by this range. |
source_id_upper |
uuid |
The upper-bound GTID source_id of the GTIDs covered by this range. |
transaction_id |
uint8 |
The transaction_id of the next GTID possible from the GTID source_id s covered by this range. |
And can be queried using:
SELECT transaction_id
FROM <src_name>_progress;
Progress metadata is represented as a GTID set
of future possible GTIDs, which is similar to the gtid_executed
system variable on a MySQL replica. The reported transaction_id
should
increase as Materialize consumes new binlog records from the upstream MySQL
database. For more details on monitoring source ingestion progress and
debugging related issues, see Troubleshooting.
Known limitations
Schema changes
Materialize supports schema changes in the upstream database as follows:
Compatible schema changes
-
Adding columns to tables. Materialize will not ingest new columns added upstream unless you use
DROP SOURCE
to first drop the affected subsource, and then add the table back to the source usingALTER SOURCE...ADD SUBSOURCE
. -
Dropping columns that were added after the source was created. These columns are never ingested, so you can drop them without issue.
-
Adding or removing
NOT NULL
constraints to tables that were nullable when the source was created.
Incompatible schema changes
All other schema changes to upstream tables will set the corresponding subsource into an error state, which prevents you from reading from the source.
To handle incompatible schema changes, use DROP SOURCE
and ALTER SOURCE...ADD SUBSOURCE
to first drop the
affected subsource, and then add the table back to the source. When you add the
subsource, it will have the updated schema from the corresponding upstream
table.
Supported types
Materialize natively supports the following MySQL types:
bigint
binary
blob
boolean
char
date
datetime
decimal
double
float
int
json
longblob
longtext
mediumblob
mediumint
mediumtext
numeric
real
smallint
text
time
timestamp
tinyblob
tinyint
tinytext
varbinary
varchar
Replicating tables that contain unsupported data types is
possible via the TEXT COLUMNS
option for the
following types:
enum
year
The specified columns will be treated as text
, and will thus not offer the
expected MySQL type features. For any unsupported data types not listed above,
use the EXCLUDE COLUMNS
option.
Truncation
Tables replicated into Materialize should not be truncated. If a table is
truncated while replicated, the whole source becomes inaccessible and will not
produce any data until it is recreated. Instead, remove all rows from a table
using an unqualified DELETE
.
DELETE FROM t;
Examples
Creating a connection
A connection describes how to connect and authenticate to an external system you want Materialize to read data from.
Once created, a connection is reusable across multiple CREATE SOURCE
statements. For more details on creating connections, check the
CREATE CONNECTION
documentation page.
CREATE SECRET mysqlpass AS '<MYSQL_PASSWORD>';
CREATE CONNECTION mysql_connection TO MYSQL (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 3306,
USER 'materialize',
PASSWORD SECRET mysqlpass
);
If your MySQL server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host SSH bastion host.
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
CREATE CONNECTION mysql_connection TO MYSQL (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
PORT 3306,
USER 'root',
PASSWORD SECRET mysqlpass,
AWS PRIVATELINK privatelink_svc
);
For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST 'bastion-host',
PORT 22,
USER 'materialize'
);
CREATE CONNECTION mysql_connection TO MYSQL (
HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
SSH TUNNEL ssh_connection
);
For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.
Creating a source
Create subsources for all tables in MySQL
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection
FOR ALL TABLES;
Create subsources for all tables from specific schemas in MySQL
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection
FOR SCHEMAS (mydb, project);
Create subsources for specific tables in MySQL
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection
FOR TABLES (mydb.table_1, mydb.table_2 AS alias_table_2);
Handling unsupported types
If you’re replicating tables that use data types unsupported
by Materialize, use the TEXT COLUMNS
option to decode data as text
for the
affected columns. This option expects the upstream fully-qualified names of the
replicated table and column (i.e. as defined in your MySQL database).
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection (
TEXT COLUMNS (mydb.table_1.column_of_unsupported_type)
)
FOR ALL TABLES;
Excluding columns
MySQL doesn’t provide a way to filter out columns from the replication stream.
To exclude specific upstream columns from being ingested, use the EXCLUDE COLUMNS
option.
CREATE SOURCE mz_source
FROM MYSQL CONNECTION mysql_connection (
EXCLUDE COLUMNS (mydb.table_1.column_to_ignore)
)
FOR ALL TABLES;
Handling errors and schema changes
To handle upstream schema changes or errored subsources, use
the DROP SOURCE
syntax to drop the affected
subsource, and then ALTER SOURCE...ADD SUBSOURCE
to add
the subsource back to the source.
-- List all subsources in mz_source
SHOW SUBSOURCES ON mz_source;
-- Get rid of an outdated or errored subsource
DROP SOURCE table_1;
-- Start ingesting the table with the updated schema or fix
ALTER SOURCE mz_source ADD SUBSOURCE table_1;
Related pages
CREATE SECRET
CREATE CONNECTION
CREATE SOURCE
- MySQL integration guides: