SQL Server CDC using Kafka and Debezium

Change Data Capture (CDC) allows you to track and propagate changes in a SQL Server database to downstream consumers. In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time views with incrementally updated results on top of CDC data.

💡 Tip: For help getting started with your own data, you can schedule a free guided trial.

Kafka + Debezium

Use Debezium and the Kafka source to propagate CDC data from SQL Server to Materialize. Debezium captures row-level changes resulting from INSERT, UPDATE, and DELETE operations in the upstream database and publishes them as events to Kafka using Kafka Connect-compatible connectors.

Database setup

Before deploying a Debezium connector, ensure that the upstream database is configured to support CDC.

  1. Enable CDC on the SQL Server instance and database:

    -- Enable CDC on the SQL Server instance
    USE MyDB
    GO
    EXEC sys.sp_cdc_enable_db
    GO
    

Deploy Debezium

Minimum requirements: Debezium 1.5+

Debezium is deployed as a set of Kafka Connect-compatible connectors. First, define a SQL Server connector configuration, then start the connector by adding it to Kafka Connect.

  1. Create a connector configuration file and save it as register-sqlserver.json:

    {
        "name": "inventory-connector",
        "config": {
            "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
            "tasks.max": "1",
            "database.server.name": "server1",
            "database.hostname": "sqlserver",
            "database.port": "1433",
            "database.user": "sa",
            "database.password": "Password!",
            "database.names": "testDB",
            "database.history.kafka.bootstrap.servers": "<broker>:9092",
            "database.history.kafka.topic": "schema-changes.inventory",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://<schema-registry>:8081",
            "value.converter.schema.registry.url": "http://<schema-registry>:8081",
            "database.encrypt": "false"
        }
    }
    

    You can read more about each configuration property in the Debezium documentation.

  1. Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:

    • kafka-connect-avro-converter
    • kafka-connect-avro-data
    • kafka-avro-serializer
    • kafka-schema-serializer
    • kafka-schema-registry-client
    • common-config
    • common-utils

    You can read more about this in the Debezium documentation.

  2. Create a connector configuration file and save it as register-sqlserver.json:

    {
        "name": "inventory-connector",
        "config": {
            "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
            "tasks.max": "1",
            "topic.prefix": "server1",
            "database.hostname": "sqlserver",
            "database.port": "1433",
            "database.user": "sa",
            "database.password": "Password!",
            "database.names": "testDB",
            "schema.history.internal.kafka.bootstrap.servers": "<broker>:9092",
            "schema.history.internal.kafka.topic": "schema-changes.inventory",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://<schema-registry>:8081",
            "value.converter.schema.registry.url": "http://<schema-registry>:8081",
            "database.encrypt": "false"
        }
    }
    

    You can read more about each configuration property in the Debezium documentation. By default, the connector writes events for each table to a Kafka topic named serverName.databaseName.tableName.

  1. Start the Debezium SQL Server connector using the configuration file:

    export CURRENT_HOST='<your-host>'
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
    http://$CURRENT_HOST:8083/connectors/ -d @register-sqlserver.json
    
  2. Check that the connector is running:

    curl http://$CURRENT_HOST:8083/connectors/inventory-connector/status
    

Now, Debezium will capture changes from the SQL Server database and publish them to Kafka.

Create a source

Debezium emits change events using an envelope that contains detailed information about upstream database operations, like the before and after values for each record. To create a source that interprets the Debezium envelope in Materialize:

CREATE SOURCE kafka_repl
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'server1.testDB.tableName')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE DEBEZIUM;

By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

Create a view

A view saves a query under a name to provide a shorthand for referencing the query. During view creation, the underlying query is not executed.

CREATE VIEW cnt_table1 AS
    SELECT field1,
           COUNT(*) AS cnt
    FROM kafka_repl
    GROUP BY field1;

Create an index on the view

In Materialize, indexes on views compute and, as new data arrives, incrementally update view results in memory within a cluster instead of recomputing the results from scratch.

Create an index on cnt_table1 view. Then, as new change events stream in through Kafka (as the result of INSERT, UPDATE and DELETE operations in the upstream database), the index incrementally updates the view results in memory, such that the in-memory up-to-date results are immediately available and computationally free to query.

CREATE INDEX idx_cnt_table1_field1 ON cnt_table1(field1);

For best practices on when to index a view, see Indexes and Views.

Known limitations

The official Microsoft documentation for SQL Server CDC lists a number of known limitations that we recommend carefully reading through. In addition to those listed, please also consider:

Debezium delivery guarantees

Due to an upstream bug in Debezium affecting snapshot isolation (DBZ-3915), it is possible that rows inserted close to the initial snapshot time are reflected twice in the downstream Kafka topic. This will lead to a rows didn't match error in Materialize.

To work around this limitation, we recommend halting any updates to the tables marked for replication until the Debezium connector is fully configured and the initial snapshot for all tables has been completed.

Supported types

DATETIMEOFFSET columns are replicated as text, and DATETIME2 columns are replicated as bigint in Materialize.

Back to top ↑