MySQL CDC using Kafka and Debezium

Change Data Capture (CDC) allows you to track and propagate changes in a MySQL database to downstream consumers based on its binary log (binlog). In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time materialized views on top of CDC data.

Kafka + Debezium

You can use Debezium and the Kafka source to propagate CDC data from MySQL to Materialize. Debezium captures row-level changes resulting from INSERT, UPDATE and DELETE operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.

Database setup

Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:

  1. Check the log_bin and binlog_format settings:

    SHOW VARIABLES
    WHERE variable_name IN ('log_bin', 'binlog_format');
    

    For CDC, binary logging must be enabled and use the row format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf) to use log_bin=mysql-bin and binlog_format=row. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.

    Note: Additional steps may be required if you’re using MySQL on Amazon RDS.

  2. Grant enough privileges to the replication user to ensure Debezium can operate in the database:

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user";
    
    FLUSH PRIVILEGES;
    

Deploy Debezium

Minimum requirements: Debezium 1.5+

Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.

WARNING! If you deploy the MySQL Debezium connector in Confluent Cloud, you must override the default value of After-state only to false.
  1. Create a connector configuration file and save it as register-mysql.json:

    {
      "name": "your-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "user",
          "database.password": "mysqlpwd",
          "database.server.id":"223344",
          "database.server.name": "dbserver1",
          "database.history.kafka.bootstrap.servers":"kafka:9092",
          "database.history.kafka.topic":"dbserver1.history",
          "database.include.list": "db1",
          "table.include.list": "table1",
          "include.schema.changes": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation.

  1. From Debezium 2.0, Confluent Schema Registry (CSR) support is not bundled in Debezium containers. To enable CSR, you must install the following Confluent Avro converter JAR files into the Kafka Connect plugin directory (by default, /kafka/connect):

    • kafka-connect-avro-converter
    • kafka-connect-avro-data
    • kafka-avro-serializer
    • kafka-schema-serializer
    • kafka-schema-registry-client
    • common-config
    • common-utils

    You can read more about this in the Debezium documentation.

  2. Create a connector configuration file and save it as register-mysql.json:

    {
      "name": "your-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "user",
          "database.password": "mysqlpwd",
          "database.server.id":"223344",
          "topic.prefix": "dbserver1",
          "database.include.list": "db1",
          "database.history.kafka.topic":"dbserver1.history",
          "database.history.kafka.bootstrap.servers":"kafka:9092",
          "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
          "schema.history.internal.kafka.topic": "dbserver1.internal.history",
          "table.include.list": "table1",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://<scheme-registry>:8081",
          "value.converter.schema.registry.url": "http://<scheme-registry>:8081",
          "include.schema.changes": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named serverName.databaseName.tableName.

  1. Start the Debezium MySQL connector using the configuration file:

    export CURRENT_HOST='<your-host>'
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
    http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
    
  2. Check that the connector is running:

    curl http://$CURRENT_HOST:8083/connectors/your-connector/status
    

    The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:

    /usr/bin/kafka-avro-console-consumer \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --topic dbserver1.db1.table1
    

Create a source

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you're interested in JSON support, please reach out in the community Slack or leave a comment in this GitHub issue.

Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before and after values for each record. To create a source that interprets the Debezium envelope in Materialize:

CREATE SOURCE kafka_repl
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver1.db1.table1')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE DEBEZIUM;

By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

Transaction support

Debezium provides transaction metadata that can be used to preserve transactional boundaries downstream. We are working on using this topic to support transaction-aware processing in Materialize #7537!

Create a materialized view

Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT, UPDATE and DELETE operations in the original MySQL database.

CREATE MATERIALIZED VIEW cnt_table1 AS
    SELECT field1,
           COUNT(*) AS cnt
    FROM kafka_repl
    GROUP BY field1;
Back to top ↑