MySQL CDC using Kafka and Debezium

WARNING! You can use Debezium to propagate Change Data Capture(CDC) data to Materialize from a MySQL database, but we strongly recommend using the native MySQL source instead.
For help getting started with your own data, you can schedule a [free guided trial](https://materialize.com/demo/?utm_campaign=General&utm_source=documentation).

Change Data Capture (CDC) allows you to track and propagate changes in a MySQL database to downstream consumers based on its binary log (binlog). In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time views with incrementally updated results on top of CDC data.

Kafka + Debezium

You can use Debezium and the Kafka source to propagate CDC data from MySQL to Materialize in the unlikely event that using the native MySQL source is not an option. Debezium captures row-level changes resulting from INSERT, UPDATE and DELETE operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.

A. Configure database

Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:

  1. Check the log_bin and binlog_format settings:

    SHOW VARIABLES
    WHERE variable_name IN ('log_bin', 'binlog_format');
    

    For CDC, binary logging must be enabled and use the row format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf) to use log_bin=mysql-bin and binlog_format=row. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.

    Note: Additional steps may be required if you’re using MySQL on Amazon RDS.

  2. Grant enough privileges to the replication user to ensure Debezium can operate in the database:

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user";
    
    FLUSH PRIVILEGES;
    

B. Deploy Debezium

Minimum requirements: Debezium 1.5+

Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.

WARNING! If you deploy the MySQL Debezium connector in Confluent Cloud, you must override the default value of After-state only to false.
  1. Create a connector configuration file and save it as register-mysql.json:

    {
      "name": "your-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "user",
          "database.password": "mysqlpwd",
          "database.server.id":"223344",
          "database.server.name": "dbserver1",
          "database.history.kafka.bootstrap.servers":"kafka:9092",
          "database.history.kafka.topic":"dbserver1.history",
          "database.include.list": "db1",
          "table.include.list": "table1",
          "include.schema.changes": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation.

  1. From Debezium 2.0, Confluent Schema Registry (CSR) support is not bundled in Debezium containers. To enable CSR, you must install the following Confluent Avro converter JAR files into the Kafka Connect plugin directory (by default, /kafka/connect):

    • kafka-connect-avro-converter
    • kafka-connect-avro-data
    • kafka-avro-serializer
    • kafka-schema-serializer
    • kafka-schema-registry-client
    • common-config
    • common-utils

    You can read more about this in the Debezium documentation.

  2. Create a connector configuration file and save it as register-mysql.json:

    {
      "name": "your-connector",
      "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",
          "database.hostname": "mysql",
          "database.port": "3306",
          "database.user": "user",
          "database.password": "mysqlpwd",
          "database.server.id":"223344",
          "topic.prefix": "dbserver1",
          "database.include.list": "db1",
          "database.history.kafka.topic":"dbserver1.history",
          "database.history.kafka.bootstrap.servers":"kafka:9092",
          "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
          "schema.history.internal.kafka.topic": "dbserver1.internal.history",
          "table.include.list": "table1",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://<scheme-registry>:8081",
          "value.converter.schema.registry.url": "http://<scheme-registry>:8081",
          "include.schema.changes": false
        }
    }
    

    You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named serverName.databaseName.tableName.

  1. Start the Debezium MySQL connector using the configuration file:

    export CURRENT_HOST='<your-host>'
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
    http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
    
  2. Check that the connector is running:

    curl http://$CURRENT_HOST:8083/connectors/your-connector/status
    

    The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:

    /usr/bin/kafka-avro-console-consumer \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --topic dbserver1.db1.table1
    

C. Create a source

NOTE: Currently, Materialize only supports Avro-encoded Debezium records. If you're interested in JSON support, please reach out in the community Slack or submit a feature request.

Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before and after values for each record. To create a source that interprets the Debezium envelope in Materialize:

CREATE SOURCE kafka_repl
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver1.db1.table1')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE DEBEZIUM;

By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

D. Create a view on the source

A view saves a query under a name to provide a shorthand for referencing the query. During view creation, the underlying query is not executed.

CREATE VIEW cnt_table1 AS
    SELECT field1,
           COUNT(*) AS cnt
    FROM kafka_repl
    GROUP BY field1;

E. Create an index on the view

In Materialize, indexes on views compute and, as new data arrives, incrementally update view results in memory within a cluster instead of recomputing the results from scratch.

Create an index on cnt_table1 view. Then, as new change events stream in through Kafka (as the result of INSERT, UPDATE and DELETE operations in the upstream database), the index incrementally updates the view results in memory, such that the in-memory up-to-date results are immediately available and computationally free to query.

CREATE INDEX idx_cnt_table1_field1 ON cnt_table1(field1);

For best practices on when to index a view, see Indexes and Views.

Back to top ↑