CockroachDB CDC using Kafka and Changefeeds
Change Data Capture (CDC) allows you to track and propagate changes in a CockroachDB database to downstream consumers. In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time views with incrementally updated results on top of CockroachDB CDC data.
A. Configure CockroachDB
1. Enable rangefeeds
As a first step, you must ensure rangefeeds are enabled in your CockroachDB instance so you can create changefeeds for the tables you want to replicate to Materialize.
-
As a user with the
admin
role, enable thekv.rangefeed.enabled
cluster setting:SET CLUSTER SETTING kv.rangefeed.enabled = true;
2. Configure per-table changefeeds
Changefeeds
capture row-level changes resulting from INSERT
, UPDATE
, and DELETE
operations against CockroachDB tables and publish them as events to Kafka
(or other Kafka API-compatible broker). You can then use the Kafka source
to consume these changefeed events into Materialize, making the data available
for transformation.
-
Create a changefeed for each table you want to replicate:
CREATE CHANGEFEED FOR TABLE my_table INTO 'kafka://broker:9092' WITH format = avro, confluent_schema_registry = 'http://registry:8081', diff, envelope = wrapped
We recommend creating changefeeds using the Avro format (
format = avro
) and the default diff envelope (envelope = wrapped
), which is compatible with the message format Materialize expects. Each table will produce data to a dedicated Kafka topic, which can then be consumed by Materialize.
For detailed instructions on configuring your CockroachDB instance for CDC, refer to the CockroachDB documentation.
B. Ingest data in Materialize
1. (Optional) Create a cluster
quickstart
), you can skip this step. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to external clients, or a combination.
In this step, you’ll create a dedicated cluster for ingesting source data from topics in your Kafka (or Kafka-API compatible) broker.
-
In the SQL Shell, or your preferred SQL client connected to Materialize, use the
CREATE CLUSTER
command to create the new cluster:CREATE CLUSTER ingest_kafka (SIZE = '100cc'); SET CLUSTER = ingest_kafka;
A cluster of size
100cc
should be enough to accommodate multiple Kafka sources, depending on the source characteristics (e.g., sources withENVELOPE UPSERT
orENVELOPE DEBEZIUM
will be more memory-intensive) and the upstream traffic patterns. You can readjust the size of the cluster at any time using theALTER CLUSTER
command:ALTER CLUSTER <cluster_name> SET ( SIZE = <new_size> );
2. Start ingesting data
Now that you’ve created an ingestion cluster, you can connect Materialize to
your Kafka broker and start ingesting data. The exact steps depend on your
authentication and networking configurations, so refer to the
CREATE CONNECTION
documentation for further
guidance.
-
In the SQL Shell, or your preferred SQL client connected to Materialize, use the
CREATE SECRET
command to securely store the credentials to connect to your Kafka broker and, optionally, schema registry:CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>'; CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>'; CREATE SECRET csr_password AS '<CSR_PASSWORD>';
-
Use the
CREATE CONNECTION
command to create a connection object with access and authentication details for Materialize to use:CREATE CONNECTION kafka_connection TO KAFKA ( BROKER '<host>', SSL KEY = SECRET kafka_ssl_key, SSL CERTIFICATE = SECRET kafka_ssl_crt );
If you’re using a schema registry, create an additional connection object:
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY ( URL '<csr_url>', SSL KEY = SECRET csr_ssl_key, SSL CERTIFICATE = SECRET csr_ssl_crt, USERNAME = 'foo', PASSWORD = SECRET csr_password );
-
Use the
CREATE SOURCE
command to connect Materialize to your Kafka broker and start ingesting data from the target topic:CREATE SOURCE kafka_repl IN CLUSTER ingest_kafka FROM KAFKA CONNECTION kafka_connection (TOPIC 'my_table') -- CockroachDB's default envelope structure for changefeed messages is -- compatible with the Debezium format, so you can use ENVELOPE DEBEZIUM -- to interpret the data. ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a different cluster, use the
IN CLUSTER
clause.
3. Monitor the ingestion status
When a new source is created, Materialize performs a sync of all data available in the upstream Kafka topic before it starts ingesting new data — an operation known as snapshotting. Because the initial snapshot is persisted in the storage layer atomically (i.e., at the same ingestion timestamp), you will not able to query the source until snapshotting is complete.
In this step, you’ll monitor the progress of the initial snapshot using the observability features in the Materialize Console.
-
If not already logged in, log in to the Materialize Console.
-
Navigate to Monitoring > Sources and click through to the source you created in the previous step. In the source overview page, you will see a progress bar with the status and progress of the snapshot.
-
For the duration of the snapshotting operation, the source status will show as
Snapshotting
. Once the source status transitions fromSnapshotting
toRunning
, the source is ready for querying and you can move on to the next step.If the source fails to transition to this state, check the ingestion troubleshooting guide.
4. Create a view
A view saves a query under a name to provide a shorthand for referencing the query. During view creation, the underlying query is not executed.
CREATE VIEW cnt_table1 AS
SELECT field1,
COUNT(*) AS cnt
FROM kafka_repl
GROUP BY field1;
5. Create an index on the view
In Materialize, indexes on views compute and, as new data arrives, incrementally update view results in memory within a cluster instead of recomputing the results from scratch.
Create an index on cnt_table1
view. Then, as new change events stream in
through Kafka (as the result of INSERT
, UPDATE
and DELETE
operations in
the upstream database), the index incrementally updates the view
results in memory, such that the in-memory up-to-date results are immediately
available and computationally free to query.
CREATE INDEX idx_cnt_table1_field1 ON cnt_table1(field1);
For best practices on when to index a view, see Indexes and Views.
Next steps
With Materialize ingesting your CockroachDB data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.
-
Explore your data with
SHOW SOURCES
andSELECT
. -
Compute real-time results in memory with
CREATE VIEW
andCREATE INDEX
or in durable storage withCREATE MATERIALIZED VIEW
. -
Serve results to a PostgreSQL-compatible SQL client or driver with
SELECT
orSUBSCRIBE
or to an external message broker withCREATE SINK
. -
Check out the tools and integrations supported by Materialize.