MySQL CDC using Kafka and Debezium
Change Data Capture (CDC) allows you to track and propagate changes in a MySQL database to downstream consumers based on its binary log (binlog
). In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time materialized views on top of CDC data.
Kafka + Debezium
You can use Debezium and the Kafka source to propagate CDC data from MySQL to Materialize. Debezium captures row-level changes resulting from INSERT
, UPDATE
and DELETE
operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.
Database setup
Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:
-
Check the
log_bin
andbinlog_format
settings:SHOW VARIABLES WHERE variable_name IN ('log_bin', 'binlog_format');
For CDC, binary logging must be enabled and use the
row
format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf
) to uselog_bin=mysql-bin
andbinlog_format=row
. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.Note: Additional steps may be required if you’re using MySQL on Amazon RDS.
-
Grant enough privileges to the replication user to ensure Debezium can operate in the database:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user"; FLUSH PRIVILEGES;
Deploy Debezium
Minimum requirements: Debezium 1.5+
Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.
After-state only
to false
.
-
Create a connector configuration file and save it as
register-mysql.json
:{ "name": "your-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "user", "database.password": "mysqlpwd", "database.server.id":"223344", "database.server.name": "dbserver1", "database.history.kafka.bootstrap.servers":"kafka:9092", "database.history.kafka.topic":"dbserver1.history", "database.include.list": "db1", "table.include.list": "table1", "include.schema.changes": false } }
You can read more about each configuration property in the Debezium documentation.
-
Start the Debezium MySQL connector using the configuration file:
export CURRENT_HOST='<your-host>' curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
-
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/your-connector/status
The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:
/usr/bin/kafka-avro-console-consumer \ --bootstrap-server kafka:9092 \ --from-beginning \ --topic dbserver1.db1.table1
Note: By default, the connector writes events for each table to a Kafka topic named
serverName.databaseName.tableName
.
Create a source
Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before
and after
values for each record. To create a source that interprets the Debezium envelope in Materialize:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver1.db1.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
Transaction support
Debezium provides transaction metadata that can be used to preserve transactional boundaries downstream. We are working on using this topic to support transaction-aware processing in Materialize (#7537)!
Create a materialized view
Any materialized view defined on top of this source will be incrementally updated as new change events stream in through Kafka, as a result of INSERT
, UPDATE
and DELETE
operations in the original MySQL database.
CREATE MATERIALIZED VIEW cnt_table1 AS
SELECT field1,
COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;