MySQL CDC using Kafka and Debezium
Change Data Capture (CDC) allows you to track and propagate changes in a MySQL
database to downstream consumers based on its binary log (binlog
). In this
guide, we’ll cover how to use Materialize to create and efficiently maintain
real-time views with incrementally updated results on top of CDC data.
Kafka + Debezium
You can use Debezium and the Kafka source
to propagate CDC data from MySQL to Materialize in the unlikely event that using
the native MySQL source is not an option. Debezium
captures row-level changes resulting from INSERT
, UPDATE
and DELETE
operations in the upstream database and publishes them as events to Kafka using
Kafka Connect-compatible connectors.
A. Configure database
Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support row-based replication. As root:
-
Check the
log_bin
andbinlog_format
settings:SHOW VARIABLES WHERE variable_name IN ('log_bin', 'binlog_format');
For CDC, binary logging must be enabled and use the
row
format. If your settings differ, you can adjust the database configuration file (/etc/mysql/my.cnf
) to uselog_bin=mysql-bin
andbinlog_format=row
. Keep in mind that changing these settings requires a restart of the MySQL instance and can affect database performance.Note: Additional steps may be required if you’re using MySQL on Amazon RDS.
-
Grant enough privileges to the replication user to ensure Debezium can operate in the database:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user"; FLUSH PRIVILEGES;
B. Deploy Debezium
Minimum requirements: Debezium 1.5+
Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a MySQL connector configuration and then start the connector by adding it to Kafka Connect.
After-state only
to false
.
-
Create a connector configuration file and save it as
register-mysql.json
:{ "name": "your-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "user", "database.password": "mysqlpwd", "database.server.id":"223344", "database.server.name": "dbserver1", "database.history.kafka.bootstrap.servers":"kafka:9092", "database.history.kafka.topic":"dbserver1.history", "database.include.list": "db1", "table.include.list": "table1", "include.schema.changes": false } }
You can read more about each configuration property in the Debezium documentation.
-
From Debezium 2.0, Confluent Schema Registry (CSR) support is not bundled in Debezium containers. To enable CSR, you must install the following Confluent Avro converter JAR files into the Kafka Connect plugin directory (by default,
/kafka/connect
):kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-registry-client
common-config
common-utils
You can read more about this in the Debezium documentation.
-
Create a connector configuration file and save it as
register-mysql.json
:{ "name": "your-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "user", "database.password": "mysqlpwd", "database.server.id":"223344", "topic.prefix": "dbserver1", "database.include.list": "db1", "database.history.kafka.topic":"dbserver1.history", "database.history.kafka.bootstrap.servers":"kafka:9092", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbserver1.internal.history", "table.include.list": "table1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://<scheme-registry>:8081", "value.converter.schema.registry.url": "http://<scheme-registry>:8081", "include.schema.changes": false } }
You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named
serverName.databaseName.tableName
.
-
Start the Debezium MySQL connector using the configuration file:
export CURRENT_HOST='<your-host>' curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://$CURRENT_HOST:8083/connectors/ -d @register-mysql.json
-
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/your-connector/status
The first time it connects to a MySQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:
/usr/bin/kafka-avro-console-consumer \ --bootstrap-server kafka:9092 \ --from-beginning \ --topic dbserver1.db1.table1
C. Create a source
Debezium emits change events using an envelope that contains detailed
information about upstream database operations, like the before
and after
values for each record. To create a source that interprets the
Debezium envelope in Materialize:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver1.db1.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a different
cluster, use the IN CLUSTER
clause.
D. Create a view on the source
A view saves a query under a name to provide a shorthand for referencing the query. During view creation, the underlying query is not executed.
CREATE VIEW cnt_table1 AS
SELECT field1,
COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;
E. Create an index on the view
In Materialize, indexes on views compute and, as new data arrives, incrementally update view results in memory within a cluster instead of recomputing the results from scratch.
Create an index on cnt_table1
view. Then, as new change events stream in
through Kafka (as the result of INSERT
, UPDATE
and DELETE
operations in
the upstream database), the index incrementally updates the view
results in memory, such that the in-memory up-to-date results are immediately
available and computationally free to query.
CREATE INDEX idx_cnt_table1_field1 ON cnt_table1(field1);
For best practices on when to index a view, see Indexes and Views.