SQL Server CDC using Kafka and Debezium
Change Data Capture (CDC) allows you to track and propagate changes in a SQL Server database to downstream consumers. In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time materialized views on top of CDC data.
Kafka + Debezium
Use Debezium and the Kafka source
to propagate CDC data from SQL Server to Materialize. Debezium captures
row-level changes resulting from INSERT
, UPDATE
, and DELETE
operations in
the upstream database and publishes them as events to Kafka using Kafka
Connect-compatible connectors.
Database setup
Before deploying a Debezium connector, ensure that the upstream database is configured to support CDC.
-
Enable CDC on the SQL Server instance and database:
-- Enable CDC on the SQL Server instance USE MyDB GO EXEC sys.sp_cdc_enable_db GO
Deploy Debezium
Minimum requirements: Debezium 1.5+
Debezium is deployed as a set of Kafka Connect-compatible connectors. First, define a SQL Server connector configuration, then start the connector by adding it to Kafka Connect.
-
Create a connector configuration file and save it as
register-sqlserver.json
:{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.server.name": "server1", "database.hostname": "sqlserver", "database.port": "1433", "database.user": "sa", "database.password": "Password!", "database.names": "testDB", "database.history.kafka.bootstrap.servers": "<broker>:9092", "database.history.kafka.topic": "schema-changes.inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://<schema-registry>:8081", "value.converter.schema.registry.url": "http://<schema-registry>:8081", "database.encrypt": "false" } }
You can read more about each configuration property in the Debezium documentation.
-
Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:
kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-registry-client
common-config
common-utils
You can read more about this in the Debezium documentation.
-
Create a connector configuration file and save it as
register-sqlserver.json
:{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "topic.prefix": "server1", "database.hostname": "sqlserver", "database.port": "1433", "database.user": "sa", "database.password": "Password!", "database.names": "testDB", "schema.history.internal.kafka.bootstrap.servers": "<broker>:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://<schema-registry>:8081", "value.converter.schema.registry.url": "http://<schema-registry>:8081", "database.encrypt": "false" } }
You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named
serverName.databaseName.tableName
.
-
Start the Debezium SQL Server connector using the configuration file:
export CURRENT_HOST='<your-host>' curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://$CURRENT_HOST:8083/connectors/ -d @register-sqlserver.json
-
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/inventory-connector/status
Now, Debezium will capture changes from the SQL Server database and publish them to Kafka.
Create a source
Debezium emits change events using an envelope that contains detailed
information about upstream database operations, like the before
and after
values for each record. To create a source that interprets the
Debezium envelope in Materialize:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'server1.testDB.tableName')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a different
cluster, use the IN CLUSTER
clause.
Create a view
A view saves a query under a name to provide a shorthand for referencing the query. During view creation, the underlying query is not executed.
CREATE VIEW cnt_table1 AS
SELECT field1,
COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;
Create an index on the view
In Materialize, indexes on views compute and, as new data arrives, incrementally update view results in memory within a cluster instead of recomputing the results from scratch.
Create an index on cnt_table1
view. Then, as new change events stream in
through Kafka (as the result of INSERT
, UPDATE
and DELETE
operations in
the upstream database), the index incrementally updates the view
results in memory, such that the in-memory up-to-date results are immediately
available and computationally free to query.
CREATE INDEX idx_cnt_table1_field1 ON cnt_table1(field1);
For best practices on when to index a view, see Indexes and Views.
Known limitations
The official Microsoft documentation for SQL Server CDC lists a number of known limitations that we recommend carefully reading through. In addition to those listed, please also consider:
Debezium delivery guarantees
Due to an upstream bug in Debezium affecting snapshot isolation (DBZ-3915),
it is possible that rows inserted close to the initial snapshot time are
reflected twice in the downstream Kafka topic. This will lead to a rows didn't match
error in Materialize.
To work around this limitation, we recommend halting any updates to the tables marked for replication until the Debezium connector is fully configured and the initial snapshot for all tables has been completed.
Supported types
DATETIMEOFFSET
columns are replicated as text
, and
DATETIME2
columns are replicated as bigint
in Materialize.