Materialize Documentation
Join the Community github/materialize

Change Data Capture (MySQL)

Change Data Capture (CDC) allows you to track and propagate changes in a MySQL database to downstream consumers based on its binary log (binlog). In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time materialized views on top of CDC data.

Kafka + Debezium

You can use Debezium and the Kafka source to propagate CDC data from MySQL to Materialize. Debezium captures row-level changes resulting from INSERT, UPDATE and DELETE operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.

Database Setup

Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:

  1. Check the log_bin and binlog_format settings:

    SHOW VARIABLES
    WHERE variable_name IN ('log_bin', 'binlog_format');
    

    For CDC, binary logging must be enabled and use the row format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf) to use log_bin=mysql-bin and binlog_format=row. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.

    Note: Additional steps may be required if you’re using MySQL on Amazon RDS.

  2. Grant enough privileges to the replication user to ensure Debezium can operate in the database:

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user";
    
    FLUSH PRIVILEGES;
    

Deploy Debezium

Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you’re interested in JSON support, please reach out in the community Slack or leave a comment on this GitHub issue.
  1. Create a connector configuration file and save it as register-mysql.json:

    {
      "name": "your-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "user",
          "database.password": "mysqlpwd",
          "database.server.id":"223344",
          "database.server.name": "dbserver1",
          "database.history.kafka.bootstrap.servers":"kafka:9092",
          "database.history.kafka.topic":"dbserver1.history",
          "database.include.list": "db1",
          "table.include.list": "table1",
          "include.schema.changes": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation.

  2. Start the Debezium MySQL connector using the configuration file:

    export CURRENT_HOST='<your-host>'
    
    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
    http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
    
  3. Check that the connector is running:

    curl http://$CURRENT_HOST:8083/connectors/your-connector/status
    

    The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:

    /usr/bin/kafka-avro-console-consumer \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --topic dbserver1.db1.table1
    

    Note: By default, the connector writes events for each table to a Kafka topic named serverName.databaseName.tableName.

Create a source

Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before and after values for each record. To create a source that interprets the Debezium envelope in Materialize:

CREATE SOURCE kafka_repl
FROM KAFKA BROKER 'kafka:9092' TOPIC 'dbserver1.db1.table1'
KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
ENVELOPE DEBEZIUM;

Transaction support

Debezium provides transaction metadata that can be used to preserve transactional boundaries downstream. We are working on using this topic to support transaction-aware processing in Materialize (#7537)!

Create a materialized view

Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT, UPDATE and DELETE operations in the original MySQL database.

CREATE MATERIALIZED VIEW cnt_table1 AS
SELECT field1,
       COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;
Did this info help?
Yes No