MongoDB

Using Change Data Capture (CDC), you can track and propagate changes from MongoDB to downstream consumers. This guide outlines how to ingest data from a MongoDB replica set (rs0) into Materialize; specifically from the collections items and orders in the test database.

High-level architecture

    flowchart LR
    subgraph "Source"
        Mongo[("MongoDB (Replica Set/Sharded Cluster)")]
    end

    subgraph "Streaming Ingestion: Debezium + Kafka"
        Debezium["Debezium Connector"]
        Kafka{{"Kafka Broker"}}
        Schema["Schema Registry"]
    end

    subgraph "Materialize"
        MZ[("Materialize")]
    end

    Mongo -- "Change stream oplog data" --> Debezium
    Debezium -- "Writes Avro Events" --> Kafka
    Schema -. "Validates Schemas" .- Kafka
    Kafka -- "Consumes Topic" --> MZ
    Schema -. "Decodes Data" .- MZ

The architecture consists of the following components:

  • Source Component: MongoDB

    • MongoDB: MongoDB must be deployed as a replica set or a sharded cluster to generate an oplog (operations log), which records all data modifications. To deploy as a replica set or a sharded cluster, refer to the official MongoDB documentation on replica sets and sharded clusters.
  • Streaming Ingestion Component: Debezium + Kafka (with Schema Registry)

    • Debezium: A connector that obtains the oplog data via MongoDB change streams.
    • Kafka & Schema Registry: The streaming transport layer. Debezium pushes the events to a Kafka topic, while the Schema Registry ensures the data structure (schema) is consistent and readable.
  • Materialize: Materialize uses connects to Kafka to ingests data from MongoDB.

Prerequisites

  • MongoDB: Version 6.0 or later. Must be deployed as a replica set or a sharded cluster.

  • Kafka Cluster: A running Kafka broker and Schema Registry (e.g., Confluent Platform or Redpanda).

  • Debezium Connect: Version 2.5.4+ or later. A Kafka Connect cluster with the MongoDB connector plugin installed.

A. Create a Debezium user in MongoDB.

! Important: For Debezium to capture changes, your MongoDB deployment must be configured as a replica set or a sharded cluster.

Create a user (e.g., debezium_materialize_user) with the necessary permissions to read from the database. Depending on the connector’s capture.scope property, the user may need read on a specific database or all databases.

For this guide, the example will ingest items and orders collections in the test database. As such, the example uses capture.scope of database to read from the test database only:

db.getSiblingDB("test").createUser({
    user: "debezium_materialize_user",
    pwd: passwordPrompt(),         // Or  "<cleartext password>"
    roles: [
      { role: "read", db: "test" }
    ]
});

For simplicity, the user is created in the test database; however, the user can be created in a different database.

B. Configure the streaming layer

The streaming layer requires Kafka, Schema Registry, and Kafka Connect with the Debezium MongoDB connector. If you have not set up these services, see:

  • Confluent Platform Quickstart(Recommended for Testing): Docker-based setup for Kafka, Schema Registry, and Kafka Connect.

  • Debezium Tutorial: Example using Docker Compose.

    For this tutorial, ensure your Debezium container allows outbound traffic to your external MongoDB host.

1. Configure Kafka Connect worker

Configure your Kafka Connect worker to use Avro serialization for Schema Registry integration; specifically, set the following environment variables/properties are set on your Connect worker:

Property Value
KEY_CONVERTER io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL http://schema-registry:8081

2. Configure the Debezium connector for MongoDB

Register the MongoDB connector with the following configuration:

Parameter Description
name A unique name for the connector instance.
connector.class The Debezium MongoDB connector class, "io.debezium.connector.mongodb.MongoDbConnector".
mongodb.connection.string Your MongoDB connection string with the debezium_materialize_user.
topic.prefix A unique prefix for Kafka topics. Topics are created as <prefix>.<db>.<collection>. Recommended Once set, do not change the value of this property. See the official documentation for details.
collection.include.list Comma-separated list of collections to capture in the format <db>.<collection>, e.g., "test.orders,test.items".
capture.mode Must be "change_streams_update_full" (the default). This captures full document state on updates, which is required for Materialize’s UPSERT envelope.
capture.scope The scope of the change stream: "database" or "deployment". This value affects the required MongoDB user permissions.
capture.target The database to monitor for changes. Required only when capture.scope is "database".
transforms Optional. Set to "unwrap" to extract the document state from Debezium’s change event envelope.
transforms.unwrap.type Optional. Set to "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" to use the MongoDB-specific unwrap transform.
! Important: The capture.mode must be change_streams_update_full, the default. This forces Debezium to send the entire document state for every change, which allows Materialize to use the UPSERT envelope.
  1. Create a dbz_mongodb_connector.json file with your connector configuration:

    {
       "name": "mongodb-connector",
       "config": {
         "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
         "mongodb.connection.string": "mongodb://debezium_materialize_user:<associated_pwd>@host1:27017,host2:27017,host3:27017/?replicaSet=rs0&authSource=test",
         "topic.prefix": "mdb-prod-rs0",
         "collection.include.list": "test.orders,test.items",
         "capture.mode": "change_streams_update_full",
         "capture.scope": "database",
         "capture.target": "test",
         "transforms": "unwrap",
         "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"
       }
    }
    

    Required updates:

    • Update mongodb.connection.string with your MongoDB connection string for the debezium_materialize_user. For example:

      • "mongodb+srv://debezium_materialize_user:<associated_pwd>@host[/[defaultauthdb][?options]]"
      • "mongodb://debezium_materialize_user:<associated_pwd>@host1[:port1][,...hostN[:portN]]/?replicaSet=<replica_set_name>&authSource=test"
    • Update collection.include.list with the collections you want to capture.

    • Update topic.prefix with unique prefix for your environment.

    Optional modifications:

    • capture.scope: To monitor changes across all databases, you can change to "deployment" to monitor all databases (requires broader MongoDB permissions for your debezium user).
    • capture.target: Only needed if capture.scope is "database".
    • transforms and transforms.unwrap.type: You can omit or customize.
    💡 Tip:

    When using the unwrap transform of type ExtractNewDocumentState with MongoDB and Avro serialization, the Avro schema is inferred from the first document processed. Because MongoDB allows the same field to have different BSON types across documents, ensure that same field uses a consistent BSON type across the collection to avoid schema inference errors.

    For example, if the first document has price: 40, the registered Avro schema infers the price field type to be int. If a subsequent document has price: 2.25 (a decimal value), it will cause a schema mismatch error. To avoid this, explicitly specify the BSON type in your documents using NumberDecimal(), e.g., price: NumberDecimal("40.00") and price: NumberDecimal("2.25").

    If you cannot enforce a consistent BSON type, you can omit the unwrap.

  2. Register the connector with Kafka Connect:

    curl -X POST -H "Content-Type:application/json" \
      http://<your-host>:8083/connectors \
      -d @dbz_mongodb_connector.json
    

    Replace <your-host> with your Kafka Connect hostname or IP address.

C. Ingest data in Materialize

1. Create the Kafka and CSR connections.

In Materialize, Create the Kafka connection and the Confluent Schema Registry connection

  1. Create the Kafka connection. For example, if using SASL_PLAINTEXT:

     CREATE SECRET IF NOT EXISTS kafka_secret AS '<kafka-password>';
    
     CREATE CONNECTION IF NOT EXISTS kafka_connection TO KAFKA (
       BROKER '<kafka-bootstrap-server>:9092',
       SECURITY PROTOCOL = 'SASL_PLAINTEXT',
       SASL MECHANISMS = 'SCRAM-SHA-256',
       SASL USERNAME = '<kafka-SASL-username>',
       SASL PASSWORD = SECRET kafka_secret
     );
    
  2. Create the Confluent Schema Registry connection:

    CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
       URL 'http://schema-registry:8081'
    );
    

2. Start ingesting data

Create the sources for the specific Kafka topic (<topic.prefix>.<database>.<collection>).

CREATE SOURCE mdb_items
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;

CREATE SOURCE mdb_orders
FROM KAFKA CONNECTION kafka_connection (TOPIC 'mdb-prod-rs0.test.items')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;

3. Query the data

Query the data using standard SQL.

  • To query the mdb_items:

    SELECT * FROM mdb_items;
    
    NOTE: The query includes the Kafka message id field as a column in the table.
                id            |           _id            | item_id |         item          | price | currency
    --------------------------+--------------------------+---------+-----------------------+-------+----------
    693c608a8de2a41c2ba365f3 | 693c608a8de2a41c2ba365f3 |       8 | donut                 | 1.25  | USD
    693c608a8de2a41c2ba365ec | 693c608a8de2a41c2ba365ec |       1 | brownie               | 2.25  | USD
    693c608a8de2a41c2ba365f2 | 693c608a8de2a41c2ba365f2 |       7 | cupcake               | 3.00  | USD
    693c608a8de2a41c2ba365f4 | 693c608a8de2a41c2ba365f4 |       9 | egg tart              | 2.50  | USD
    
    💡 Tip: If you did not use the unwrap transform, the document is stored as a JSON string in the after field in mdb_items. You can create a parsing view to map the individual document fields to columns instead.
  • To query the mdb_orders:

    SELECT * FROM mdb_orders;
    
    NOTE: The query includes the Kafka message id field as a column in the table.
                id            |           _id            | order_id |       order_date        |         item          | quantity |   status
    --------------------------+--------------------------+----------+-------------------------+-----------------------+----------+------------
    693c608a8de2a41c2ba365d4 | 693c608a8de2a41c2ba365d4 |        3 | 2025-12-12 18:05:54.648 | donut                 |       36 | Pending
    693c608a8de2a41c2ba365e3 | 693c608a8de2a41c2ba365e3 |        8 | 2025-12-10 18:50:54.648 | donut                 |       12 | Shipped
    693c608a8de2a41c2ba365cc | 693c608a8de2a41c2ba365cc |        1 | 2025-12-12 18:35:54.648 | brownie               |       10 | Pending
    693c608a8de2a41c2ba365d2 | 693c608a8de2a41c2ba365d2 |        2 | 2025-12-12 18:20:54.648 | brownie               |       20 | Pending
    
    💡 Tip: If you did not use the unwrap transform, the document is stored as a JSON string in the after field in mdb_orders. You can create a parsing view to map the individual document fields to columns instead.

Troubleshooting

See also: Debezium documentation on the MongoDB Connector

Back to top ↑