MySQL CDC using Kafka and Debezium
Change Data Capture (CDC) allows you to track and propagate changes in a MySQL
database to downstream consumers based on its binary log (binlog
). In this
guide, we’ll cover how to use Materialize to create and efficiently maintain
real-time materialized views on top of CDC data.
Kafka + Debezium
You can use Debezium and the Kafka source
to propagate CDC data from MySQL to Materialize. Debezium captures row-level
changes resulting from INSERT
, UPDATE
and DELETE
operations in the
upstream database and publishes them as events to Kafka using Kafka
Connect-compatible connectors.
Database setup
Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:
-
Check the
log_bin
andbinlog_format
settings:SHOW VARIABLES WHERE variable_name IN ('log_bin', 'binlog_format');
For CDC, binary logging must be enabled and use the
row
format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf
) to uselog_bin=mysql-bin
andbinlog_format=row
. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.Note: Additional steps may be required if you’re using MySQL on Amazon RDS.
-
Grant enough privileges to the replication user to ensure Debezium can operate in the database:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user"; FLUSH PRIVILEGES;
Deploy Debezium
Minimum requirements: Debezium 1.5+
Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.
After-state only
to false
.
-
Create a connector configuration file and save it as
register-mysql.json
:{ "name": "your-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "user", "database.password": "mysqlpwd", "database.server.id":"223344", "database.server.name": "dbserver1", "database.history.kafka.bootstrap.servers":"kafka:9092", "database.history.kafka.topic":"dbserver1.history", "database.include.list": "db1", "table.include.list": "table1", "include.schema.changes": false } }
You can read more about each configuration property in the Debezium documentation.
-
From Debezium 2.0, Confluent Schema Registry (CSR) support is not bundled in Debezium containers. To enable CSR, you must install the following Confluent Avro converter JAR files into the Kafka Connect plugin directory (by default,
/kafka/connect
):kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-registry-client
common-config
common-utils
You can read more about this in the Debezium documentation.
-
Create a connector configuration file and save it as
register-mysql.json
:{ "name": "your-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "user", "database.password": "mysqlpwd", "database.server.id":"223344", "topic.prefix": "dbserver1", "database.include.list": "db1", "database.history.kafka.topic":"dbserver1.history", "database.history.kafka.bootstrap.servers":"kafka:9092", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbserver1.internal.history", "table.include.list": "table1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://<scheme-registry>:8081", "value.converter.schema.registry.url": "http://<scheme-registry>:8081", "include.schema.changes": false } }
You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named
serverName.databaseName.tableName
.
-
Start the Debezium MySQL connector using the configuration file:
export CURRENT_HOST='<your-host>' curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
-
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/your-connector/status
The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:
/usr/bin/kafka-avro-console-consumer \ --bootstrap-server kafka:9092 \ --from-beginning \ --topic dbserver1.db1.table1
Create a source
Debezium emits change events using an envelope that contains detailed
information about upstream database operations, like the before
and after
values for each record. To create a source that interprets the
Debezium envelope in Materialize:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver1.db1.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a different
cluster, use the IN CLUSTER
clause.
Transaction support
Debezium provides transaction metadata that can be used to preserve transactional boundaries downstream. We are working on using this topic to support transaction-aware processing in Materialize #7537!
Create a materialized view
Any materialized view defined on top of this source will be incrementally
updated as new change events stream in through Kafka, as a result of INSERT
,
UPDATE
and DELETE
operations in the original MySQL database.
CREATE MATERIALIZED VIEW cnt_table1 AS
SELECT field1,
COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;