MongoDB
Using Change Data Capture (CDC), you can track and propagate changes from
MongoDB to downstream consumers. This guide outlines how to ingest data from a
MongoDB replica set (rs0) into Materialize; specifically from the collections
items and orders in the test database.
High-level architecture
flowchart LR
subgraph "Source"
Mongo[("MongoDB (Replica Set/Sharded Cluster)")]
end
subgraph "Streaming Ingestion: Debezium + Kafka"
Debezium["Debezium Connector"]
Kafka{{"Kafka Broker"}}
Schema["Schema Registry"]
end
subgraph "Materialize"
MZ[("Materialize")]
end
Mongo -- "Change stream oplog data" --> Debezium
Debezium -- "Writes Avro Events" --> Kafka
Schema -. "Validates Schemas" .- Kafka
Kafka -- "Consumes Topic" --> MZ
Schema -. "Decodes Data" .- MZ
The architecture consists of the following components:
-
Source Component: MongoDB
- MongoDB: MongoDB must be deployed as a replica set or a sharded cluster to generate an oplog (operations log), which records all data modifications. To deploy as a replica set or a sharded cluster, refer to the official MongoDB documentation on replica sets and sharded clusters.
-
Streaming Ingestion Component: Debezium + Kafka (with Schema Registry)
- Debezium: A connector that obtains the oplog data via MongoDB change streams.
- Kafka & Schema Registry: The streaming transport layer. Debezium pushes the events to a Kafka topic, while the Schema Registry ensures the data structure (schema) is consistent and readable.
-
Materialize: Materialize uses connects to Kafka to ingests data from MongoDB.
Prerequisites
-
MongoDB: Version 6.0 or later. Must be deployed as a replica set or a sharded cluster.
-
Kafka Cluster: A running Kafka broker and Schema Registry (e.g., Confluent Platform or Redpanda).
-
Debezium Connect: Version 2.5.4+ or later. A Kafka Connect cluster with the MongoDB connector plugin installed.
A. Create a Debezium user in MongoDB.
Create a user (e.g., debezium_materialize_user) with the necessary permissions
to read from the database. Depending on the connector’s
capture.scope
property, the user may need read on a specific database or all databases.
For this guide, the example will ingest items and orders collections in the
test database. As such, the example uses capture.scope of database to
read from the test database only:
db.getSiblingDB("test").createUser({
user: "debezium_materialize_user",
pwd: passwordPrompt(), // Or "<cleartext password>"
roles: [
{ role: "read", db: "test" }
]
});
For simplicity, the user is created in the test database; however, the user
can be created in a different database.
B. Configure the streaming layer
The streaming layer requires Kafka, Schema Registry, and Kafka Connect with the Debezium MongoDB connector. If you have not set up these services, see:
-
Confluent Platform Quickstart(Recommended for Testing): Docker-based setup for Kafka, Schema Registry, and Kafka Connect.
-
Debezium Tutorial: Example using Docker Compose.
For this tutorial, ensure your Debezium container allows outbound traffic to your external MongoDB host.
1. Configure Kafka Connect worker
Configure your Kafka Connect worker to use Avro serialization for Schema Registry integration; specifically, set the following environment variables/properties are set on your Connect worker:
| Property | Value |
|---|---|
KEY_CONVERTER |
io.confluent.connect.avro.AvroConverter |
VALUE_CONVERTER |
io.confluent.connect.avro.AvroConverter |
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL |
http://schema-registry:8081 |
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL |
http://schema-registry:8081 |
2. Configure the Debezium connector for MongoDB
Register the MongoDB connector with the following configuration:
| Parameter | Description |
|---|---|
name |
A unique name for the connector instance. |
connector.class |
The Debezium MongoDB connector class, "io.debezium.connector.mongodb.MongoDbConnector". |
mongodb.connection.string |
Your MongoDB connection string with the debezium_materialize_user. |
topic.prefix |
A unique prefix for Kafka topics. Topics are created as <prefix>.<db>.<collection>. Recommended Once set, do not change the value of this property. See the official documentation for details. |
collection.include.list |
Comma-separated list of collections to capture in the format <db>.<collection>, e.g., "test.orders,test.items". |
capture.mode |
"change_streams_update_full" (the default). This captures full document state on updates, which is required for Materialize’s UPSERT envelope. |
capture.scope |
The scope of the change stream: "database" or "deployment". This value affects the required MongoDB user permissions. |
capture.target |
The database to monitor for changes. Required only when capture.scope is "database". |
transforms |
Optional. Set to "unwrap" to extract the document state from Debezium’s change event envelope. |
transforms.unwrap.type |
Optional. Set to "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" to use the MongoDB-specific unwrap transform. |
capture.mode must be change_streams_update_full, the default. This
forces Debezium to send the entire document state for every change, which allows
Materialize to use the UPSERT envelope.
-
Create a
dbz_mongodb_connector.jsonfile with your connector configuration:{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.connection.string": "mongodb://debezium_materialize_user:<associated_pwd>@host1:27017,host2:27017,host3:27017/?replicaSet=rs0&authSource=test", "topic.prefix": "mdb-prod-rs0", "collection.include.list": "test.orders,test.items", "capture.mode": "change_streams_update_full", "capture.scope": "database", "capture.target": "test", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" } }Required updates:
-
Update
mongodb.connection.stringwith your MongoDB connection string for thedebezium_materialize_user. For example:"mongodb+srv://debezium_materialize_user:<associated_pwd>@host[/[defaultauthdb][?options]]""mongodb://debezium_materialize_user:<associated_pwd>@host1[:port1][,...hostN[:portN]]/?replicaSet=<replica_set_name>&authSource=test"
-
Update
collection.include.listwith the collections you want to capture. -
Update
topic.prefixwith unique prefix for your environment.
Optional modifications:
capture.scope: To monitor changes across all databases, you can change to"deployment"to monitor all databases (requires broader MongoDB permissions for your debezium user).capture.target: Only needed ifcapture.scopeis"database".transformsandtransforms.unwrap.type: You can omit or customize.
💡 Tip:When using the
unwraptransform of typeExtractNewDocumentStatewith MongoDB and Avro serialization, the Avro schema is inferred from the first document processed. Because MongoDB allows the same field to have different BSON types across documents, ensure that same field uses a consistent BSON type across the collection to avoid schema inference errors.For example, if the first document has
price: 40, the registered Avro schema infers thepricefield type to beint. If a subsequent document hasprice: 2.25(a decimal value), it will cause a schema mismatch error. To avoid this, explicitly specify the BSON type in your documents usingNumberDecimal(), e.g.,price: NumberDecimal("40.00")andprice: NumberDecimal("2.25").If you cannot enforce a consistent BSON type, you can omit the unwrap.
-
-
Register the connector with Kafka Connect:
curl -X POST -H "Content-Type:application/json" \ http://<your-host>:8083/connectors \ -d @dbz_mongodb_connector.jsonReplace
<your-host>with your Kafka Connect hostname or IP address.
C. Ingest data in Materialize
1. Create the Kafka and CSR connections.
In Materialize, Create the Kafka connection and the Confluent Schema Registry connection
-
Create the Kafka connection. For example, if using
SASL_PLAINTEXT:CREATE SECRET IF NOT EXISTS kafka_secret AS '<kafka-password>'; CREATE CONNECTION IF NOT EXISTS kafka_connection TO KAFKA ( BROKER '<kafka-bootstrap-server>:9092', SECURITY PROTOCOL = 'SASL_PLAINTEXT', SASL MECHANISMS = 'SCRAM-SHA-256', SASL USERNAME = '<kafka-SASL-username>', SASL PASSWORD = SECRET kafka_secret ); -
Create the Confluent Schema Registry connection:
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY ( URL 'http://schema-registry:8081' );
2. Start ingesting data
Create the sources for the specific Kafka topic
(<topic.prefix>.<database>.<collection>).
CREATE SOURCE mdb_items
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
CREATE SOURCE mdb_orders
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
3. Query the data
Query the data using standard SQL.
-
To query the mdb_items:
SELECT * FROM mdb_items;NOTE: The query includes the Kafka messageidfield as a column in the table.id | _id | item_id | item | price | currency --------------------------+--------------------------+---------+-----------------------+-------+---------- 693c608a8de2a41c2ba365f3 | 693c608a8de2a41c2ba365f3 | 8 | donut | 1.25 | USD 693c608a8de2a41c2ba365ec | 693c608a8de2a41c2ba365ec | 1 | brownie | 2.25 | USD 693c608a8de2a41c2ba365f2 | 693c608a8de2a41c2ba365f2 | 7 | cupcake | 3.00 | USD 693c608a8de2a41c2ba365f4 | 693c608a8de2a41c2ba365f4 | 9 | egg tart | 2.50 | USD💡 Tip: If you did not use theunwraptransform, the document is stored as a JSON string in theafterfield inmdb_items. You can create a parsing view to map the individual document fields to columns instead. -
To query the mdb_orders:
SELECT * FROM mdb_orders;NOTE: The query includes the Kafka messageidfield as a column in the table.id | _id | order_id | order_date | item | quantity | status --------------------------+--------------------------+----------+-------------------------+-----------------------+----------+------------ 693c608a8de2a41c2ba365d4 | 693c608a8de2a41c2ba365d4 | 3 | 2025-12-12 18:05:54.648 | donut | 36 | Pending 693c608a8de2a41c2ba365e3 | 693c608a8de2a41c2ba365e3 | 8 | 2025-12-10 18:50:54.648 | donut | 12 | Shipped 693c608a8de2a41c2ba365cc | 693c608a8de2a41c2ba365cc | 1 | 2025-12-12 18:35:54.648 | brownie | 10 | Pending 693c608a8de2a41c2ba365d2 | 693c608a8de2a41c2ba365d2 | 2 | 2025-12-12 18:20:54.648 | brownie | 20 | Pending💡 Tip: If you did not use theunwraptransform, the document is stored as a JSON string in theafterfield inmdb_orders. You can create a parsing view to map the individual document fields to columns instead.