CREATE SOURCE: MySQL

PREVIEW This feature is in private preview. It is under active development and may have stability or performance issues. It isn't subject to our backwards compatibility guarantees.

You must contact us to enable this feature in your Materialize region.

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

Materialize supports MySQL (5.7+) as a real-time data source. To connect to a MySQL database, you first need to tweak its configuration to enable GTID-based binary log (binlog) replication, and then create a connection in Materialize that specifies access and authentication parameters.

Syntax

CREATE SOURCE IF NOT EXISTS src_name IN CLUSTER cluster_name FROM MYSQL CONNECTION connection_name ( TEXT COLUMNS ( column_name , ) , IGNORE COLUMNS ( column_name , ) ) FOR ALL TABLES FOR TABLES ( table_name AS subsrc_name , FOR SCHEMAS ( schema_name , ) EXPOSE PROGRESS AS progress_subsource_name
Field Use
src_name The name for the source.
IF NOT EXISTS Do nothing (except issuing a notice) if a source with the same name already exists. Default.
IN CLUSTER cluster_name The cluster to maintain this source.
CONNECTION connection_name The name of the MySQL connection to use in the source. For details on creating connections, check the CREATE CONNECTION documentation page.
FOR ALL TABLES Create subsources for all tables in all schemas upstream. The mysql system schema is ignored.
FOR SCHEMAS ( schema_list ) Create subsources for specific schemas upstream.
FOR TABLES ( table_list ) Create subsources for specific tables upstream. Requires fully-qualified table names (<schema>.<table>).
EXPOSE PROGRESS AS progress_subsource_name The name of the progress collection for the source. If this is not specified, the progress collection will be named <src_name>_progress. For more information, see Monitoring source progress.

CONNECTION options

Field Value Description
IGNORE COLUMNS A list of fully-qualified names Ignore specific columns that cannot be decoded or should not be included in the subsources created in Materialize.
TEXT COLUMNS A list of fully-qualified names Decode data as text for specific columns that contain MySQL types that are unsupported in Materialize.

Features

Change data capture

NOTE: For step-by-step instructions on enabling GTID-based binlog replication for your MySQL service, see the integration guides: Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted.

The source uses MySQL’s binlog replication protocol to continually ingest changes resulting from INSERT, UPDATE and DELETE operations in the upstream database. This process is known as change data capture.

The replication method used is based on global transaction identifiers (GTIDs), and guarantees transactional consistency — any operation inside a MySQL transaction is assigned the same timestamp in Materialize, which means that the source will never show partial results based on partially replicated transactions.

Before creating a source in Materialize, you must configure the upstream MySQL database for GTID-based binlog replication. This requires the following configuration changes:


Configuration parameter Value Details
log_bin ON
binlog_format ROW This configuration is deprecated as of MySQL 8.0.34. Newer versions of MySQL default to row-based logging.
binlog_row_image FULL
gtid_mode ON
enforce_gtid_consistency ON
replica_preserve_commit_order ON Only required when connecting Materialize to a read-replica for replication, rather than the primary server.

If you’re running MySQL using a managed service, further configuration changes might be required. For step-by-step instructions on enabling GTID-based binlog replication for your MySQL service, see the integration guides.

Binlog retention

WARNING! If Materialize tries to resume replication and finds GTID gaps due to missing binlog files, the source enters an errored state and you have to drop and recreate it.

By default, MySQL retains binlog files for 30 days (i.e., 2592000 seconds) before automatically removing them. This is configurable via the binlog_expire_logs_seconds system variable. We recommend using the default value for this configuration in order to not compromise Materialize’s ability to resume replication in case of failures or restarts.

In some MySQL managed services, binlog expiration can be overriden by a service-specific configuration parameter. It’s important that you double-check if such a configuration exists, and ensure it’s set to the maximum interval available.

As an example, Amazon RDS for MySQL has its own configuration parameter for binlog retention (binlog retention hours) that overrides binlog_expire_logs_seconds and is set to NULL by default.

Creating a source

Materialize ingests the raw replication stream data for all (or a specific set of) tables in your upstream MySQL database.

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection
  FOR ALL TABLES;

When you define a source, Materialize will automatically:

  1. Create a subsource for each original table upstream, and perform an initial, snapshot-based sync of the tables before it starts ingesting change events.

    SHOW SOURCES;
    
             name         |   type    |  size   |  cluster  |
    ----------------------+-----------+---------+------------
     mz_source            | mysql     |         |
     mz_source_progress   | progress  |         |
     table_1              | subsource |         |
     table_2              | subsource |         |
    
  2. Incrementally update any materialized or indexed views that depend on the source as change events stream in, as a result of INSERT, UPDATE and DELETE operations in the upstream MySQL database.

It’s important to note that the schema metadata is captured when the source is initially created, and is validated against the upstream schema upon restart. If you create new tables upstream after creating a MySQL source and want to replicate them to Materialize, the source must be dropped and recreated.

NOTE: Support for dropping and recreating individual subsources is planned for a future release (#24975).
MySQL schemas

CREATE SOURCE will attempt to create each upstream table in the same schema as the source. This may lead to naming collisions if, for example, you are replicating schema1.table_1 and schema2.table_1. Use the FOR TABLES clause to provide aliases for each upstream table, in such cases, or to specify an alternative destination schema in Materialize.

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection
  FOR TABLES (schema1.table_1 AS s1_table_1, schema2.table_1 AS s2_table_1);

Monitoring source progress

By default, MySQL sources expose progress metadata as a subsource that you can use to monitor source ingestion progress. The name of the progress subsource can be specified when creating a source using the EXPOSE PROGRESS AS clause; otherwise, it will be named <src_name>_progress.

The following metadata is available for each source as a progress subsource:

Field Type Details
source_id_lower uuid The lower-bound GTID source_id of the GTIDs covered by this range.
source_id_upper uuid The upper-bound GTID source_id of the GTIDs covered by this range.
transaction_id uint8 The transaction_id of the next GTID possible from the GTID source_ids covered by this range.

And can be queried using:

SELECT transaction_id
FROM <src_name>_progress;

Progress metadata is represented as a GTID set of future possible GTIDs, which is similar to the gtid_executed system variable on a MySQL replica. The reported transaction_id should increase as Materialize consumes new binlog records from the upstream MySQL database. For more details on monitoring source ingestion progress and debugging related issues, see Troubleshooting.

Known limitations

Schema changes

Materialize supports schema changes in the upstream MySQL database as follows:

Compatible schema changes

  • Adding columns to tables. Materialize will not ingest these columns unless you drop and recreate the source.

  • Dropping columns that were added after the source was created. These columns are never ingested, so you can drop them without issue.

  • Adding or removing NOT NULL constraints to tables that were nullable when the source was created.

Incompatible schema changes

All other schema changes to tables in MySQL will set the corresponding subsource into an error state, which prevents you from reading from the source.

To handle incompatible schema changes, use DROP SOURCE to drop and recreate the source. Support for dropping and recreating individual subsources is planned for a future release.

Supported types

Materialize natively supports the following MySQL types:

  • bigint
  • binary
  • blob
  • boolean
  • char
  • date
  • datetime
  • decimal
  • double
  • float
  • int
  • json
  • longblob
  • longtext
  • mediumblob
  • mediumint
  • mediumtext
  • numeric
  • real
  • smallint
  • text
  • time
  • timestamp
  • tinyblob
  • tinyint
  • tinytext
  • varbinary
  • varchar

Replicating tables that contain unsupported data types is possible via the TEXT COLUMNS option for the following types:

  • enum
  • year

The specified columns will be treated as text, and will thus not offer the expected MySQL type features. For any unsupported data types not listed above, use the IGNORE COLUMNS option.

Truncation

Tables replicated into Materialize should not be truncated. If a table is truncated while replicated, the whole source becomes inaccessible and will not produce any data until it is recreated. Instead, remove all rows from a table using an unqualified DELETE.

DELETE FROM t;

Examples

WARNING! Before creating a MySQL source, you must enable GTID-based binlog replication in the upstream database. For step-by-step instructions, see the integration guide for your MySQL service: Amazon RDS, Amazon Aurora, Azure DB, Google Cloud SQL, Self-hosted.

Creating a connection

A connection describes how to connect and authenticate to an external system you want Materialize to read data from.

Once created, a connection is reusable across multiple CREATE SOURCE statements. For more details on creating connections, check the CREATE CONNECTION documentation page.

CREATE SECRET mysqlpass AS '<MYSQL_PASSWORD>';

CREATE CONNECTION mysql_connection TO MYSQL (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 3306,
    USER 'materialize',
    PASSWORD SECRET mysqlpass
);

If your MySQL server is not exposed to the public internet, you can tunnel the connection through an SSH bastion host.

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize'
);
CREATE CONNECTION mysql_connection TO MYSQL (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    SSH TUNNEL ssh_connection
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.

Creating a source

Create subsources for all tables in MySQL

CREATE SOURCE mz_source
    FROM MYSQL CONNECTION mysql_connection
    FOR ALL TABLES;

Create subsources for all tables from specific schemas in MySQL

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection
  FOR SCHEMAS (mydb, project);

Create subsources for specific tables in MySQL

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection
  FOR TABLES (mydb.table_1, mydb.table_2 AS alias_table_2);

Handling unsupported types

If you’re replicating tables that use data types unsupported by Materialize, use the TEXT COLUMNS option to decode data as text for the affected columns. This option expects the upstream fully-qualified names of the replicated table and column (i.e. as defined in your MySQL database).

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection (
    TEXT COLUMNS (mydb.table_1.column_of_unsupported_type)
  )
  FOR ALL TABLES;

Ignoring columns

MySQL doesn’t provide a way to filter out columns from the replication stream. To exclude specific upstream columns from being ingested, use the IGNORE COLUMNS option.

CREATE SOURCE mz_source
  FROM MYSQL CONNECTION mysql_connection (
    IGNORE COLUMNS (mydb.table_1.column_to_ignore)
  )
  FOR ALL TABLES;

Handling schema changes

To handle upstream schema changes, drop and recreate the source.

NOTE: Support for dropping and recreating individual subsources is planned for a future release (#24975).
Back to top ↑