Materialize Logo

Materialize CDC

Change data capture (CDC) tools provide feeds that record any changes to a database. Typically, the feeds are then saved to another platform, like Kafka, for storage or processing. However, sometimes the stream can have missing or duplicate records, or records can be received out of order. For example, if a CDC tool crashes while writing a record, it may retry and write the record again, resulting in a duplicate entry.

The Materialize CDC format has been designed to provide a downstream data consumer (like Materialize) with enough information to recognize when records are duplicated or out of order. For a technical deep dive on the subject, see our blog post on Change Data Capture.

Currently, the Materialize CDC format is only supported for Avro-formatted Kafka sources. If you’re interested in using it for another source type, let us know in our Slack workspace.

To use the Materialize CDC format, you must:

  1. Transform the changefeed produced by your CDC tool into the Materialize CDC format.
  2. Define the Materialize CDC format in an Avro schema when you create a source in Materialize.

Materialize CDC schema components

The Materialize CDC format has two components:

Materialize compares the record updates it has received to the expected changes as described by the applicable progress update, and discards duplicates if there are too many identical record updates for a timestamp.

Data requirements

Record updates take the form of (data, time, diff) triples, representing respectively the changed record, the logical timestamp of the update, and the type of update (-1 for deletion and 1 for addition or upsert).

There should be at most one diff value for each (data, time) pair. That is, if a stream would have updates (data1, time1, -1) and (data1, time1, 1), these should be combined and expressed as (data1, time1, 0) (and then suppressed). In effect, the data must be pre-consolidated before it is transmitted to Materialize. There can be multiple identical (data, time, diff) triples, but they will be treated as duplications, and only one update will be recorded.

Example Materialize CDC Avro schema

[
  {
    "type": "array",
    "items": {
      "type": "record",
      "name": "update",
      "namespace": "com.materialize.cdc",
      "fields": [
        {
          "name": "data",
          "type": {
            "type": "record",
            "name": "data",
            "fields": [
              {
                "name": "COLUMN_NAME1",
                "type": "DATA_TYPE"
              },
              {
                "name": "COLUMN_NAME2",
                "type": [
                  "DATA_TYPE1",
                  "DATA_TYPE2"
                ]
              }
            ]
          }
        },
        {
          "name": "time",
          "type": "long"
        },
        {
          "name": "diff",
          "type": "long"
        }
      ]
    }
  },
  {
    "type": "record",
    "name": "progress",
    "namespace": "com.materialize.cdc",
    "fields": [
      {
        "name": "lower",
        "type": {
          "type": "array",
          "items": "long"
        }
      },
      {
        "name": "upper",
        "type": {
          "type": "array",
          "items": "long"
        }
      },
      {
        "name": "counts",
        "type": {
          "type": "array",
          "items": {
            "type": "record",
            "name": "counts",
            "fields": [
              {
                "name": "time",
                "type": "long"
              },
              {
                "name": "count",
                "type": "long"
              }
            ]
          }
        }
      }
    ]
  }
  ]
Field Description
data The array that represents the changed record. Each array is composed of objects that define the name and type (data type) of each field to be included in the changefeed. type supports multiple data types for a single object (for example, both null and int may be permissible for some fields).
time The logical timestamp of the update. Materialize uses this to determine the order of updates and, in conjunction with the update count, to determine whether there are duplicate updates to be discarded.
diff The type of update (-1 for deletion, 1 for addition or upsert)
lower The earliest logical timestamp for a set of updates
upper The latest logical timestamp for a set of updates
counts The number of updates transmitted between lower and upper.

Example Materialize CDC workflow

You specify the use of the Materialize CDC format in the Avro schema when a source is created.

  CREATE MATERIALIZED SOURCE name_of_source
  FROM KAFKA BROKER 'kafka_url:9092' TOPIC 'name_of_kafka_topic'
  FORMAT AVRO USING SCHEMA '<schema goes here>'
  ENVELOPE MATERIALIZE

The following example schema specifies that records will consist of id and price fields. Note that price object supports multiple datatypes: It can be null or int.

[
  {
    "type": "array",
    "items": {
      "type": "record",
      "name": "update",
      "namespace": "com.materialize.cdc",
      "fields": [
        {
          "name": "data",
          "type": {
            "type": "record",
            "name": "data",
            "fields": [
              {
                "name": "id",
                "type": "long"
              },
              {
                "name": "price",
                "type": [
                  "null",
                  "int"
                ]
              }
            ]
          }
        },
        {
          "name": "time",
          "type": "long"
        },
        {
          "name": "diff",
          "type": "long"
        }
      ]
    }
  },
  {
    "type": "record",
    "name": "progress",
    "namespace": "com.materialize.cdc",
    "fields": [
      {
        "name": "lower",
        "type": {
          "type": "array",
          "items": "long"
        }
      },
      {
        "name": "upper",
        "type": {
          "type": "array",
          "items": "long"
        }
      },
      {
        "name": "counts",
        "type": {
          "type": "array",
          "items": {
            "type": "record",
            "name": "counts",
            "fields": [
              {
                "name": "time",
                "type": "long"
              },
              {
                "name": "count",
                "type": "long"
              }
            ]
          }
        }
      }
    ]
  }
  ]

As an example, the schema above might produce this changefeed:

{{"array":[{"data":{"id":5,"price":{"int":10}},"time":5,"diff":1}]}
{"array":[{"data":{"id":5,"price":{"int":12}},"time":4,"diff":1}]}
{"array":[{"data":{"id":5,"price":{"int":12}},"time":5,"diff":-1}]}
{"array":[{"data":{"id":5,"price":{"int":10}},"time":6,"diff":-1}]}

{"com.materialize.cdc.progress":{"lower":[0],"upper":[3],"counts":[]}}
{"com.materialize.cdc.progress":{"lower":[3],"upper":[10],"counts":[{"time":4,"count":1},{"time":5,"count":2}, {"time": 6, "count": 1}]}}

Even if Materializes receives the updates in an order different from the order in which they were transmitted, it will be able to reorder the updates by time. Additionally, the progress updates tell Materialize to expect one updated record for timestamp 4, two updated records for timestamp 5, and one updated record for timestamp 6. If, for example, there are two identical updated records for timestamp 4, Materializes determines that this is a duplicate entry and discards one update.

Did this info help?
Yes No