Striim Cloud

Striim is a real-time data integration platform that offers a variety of connectors for databases, messaging systems, and other data sources. This guide walks through the steps to ingest Striim Change Data Capture (CDC) into Materialize using the Kafka source.

Before you begin

NOTE: We are in touch with the Striim team to build a direct CDC connector to Materialize. If you’re interested in this integration, please contact our team!

As is, integrating Striim Cloud with Materialize requires using a message broker (like Apache Kafka) as an intermediary, as well as a schema registry (like Confluent Schema Registry) configured for Avro schema management.

Ensure that you have:

Step 1. Configure Striim CDC

Database configuration

Follow Striim’s guidance to enable and configure CDC from your source relational database:

Kafka writer configuration

  1. In Striim, create a KafkaWriter using the version that corresponds to your target Kafka broker.

  2. Select the writer created in the previous step as the Input stream.

  3. Configure the Kafka broker details and the target topic name.

    1. Under Advanced settings > Kafka Config, add the value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer configuration to serialize records using the Confluent wire format.

    2. Add a MESSAGE KEY field to specify the primary key of the table. This will configure the KafkaWriter to use the table’s primary key as the Kafka message key.

Schema registry configuration

  1. From the Formatter dropdown menu, select AvroFormatter to serialize records using the Avro format.

  2. Configure the Schema Registry URL, and its authentication credentials (if necessary).

  3. Set the format to Table.

  4. Leave the Schema Registry Subject Name and Schema Registry Subject Name Mapping fields empty.

Known limitations

Supported types

Some data types might not be correctly supported in Striim’s Avro handling (e.g. bigint). If you run into Avro serialization issues with the KafkaWriter, please reach out to Striim support or the Striim community.


Once your Striim Cloud service is configured for CDC, start the app to begin streaming changes from your source relational database into the specified Kafka cluster. Next, you’ll configure Materialize to consume this data.

Step 2. Start ingesting data

NOTE: If you are prototyping and already have a cluster to host your Kafka source (e.g. quickstart), you don’t need to create a new cluster. For production scenarios, we recommend separating your workloads into multiple clusters for resource isolation.
  1. In the SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CONNECTION command to create connection objects with access and authentication details to your Kafka cluster and schema registry:

     CREATE SECRET kafka_password AS '<your-password>';
     CREATE SECRET csr_password AS '<your-password>';
    
     CREATE CONNECTION kafka_connection TO KAFKA (
         BROKER '<broker-url>',
         SECURITY PROTOCOL = 'SASL_PLAINTEXT',
         SASL MECHANISMS = 'SCRAM-SHA-256', -- or `PLAIN` or `SCRAM-SHA-512`
         SASL USERNAME = '<your-username>',
         SASL PASSWORD = SECRET kafka_password
         SSH TUNNEL ssh_connection
     );
    
     CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
         URL '<schema-registry-url>',
         USERNAME = '<your-username>',
         PASSWORD = SECRET csr_password
     );
    
  2. Use the CREATE SOURCE command to connect Materialize to your Kafka broker and schema registry using the connections you created in the previous step.

    CREATE SOURCE src
      FROM KAFKA CONNECTION kafka_connection (TOPIC '<topic-name>')
      KEY FORMAT TEXT
      VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
      VALUE STRATEGY ID <id>
    ENVELOPE UPSERT;
    

    Fetching the VALUE STRATEGY ID

    Striim uses nonstandard subject names in the schema registry, which prevents Materialize from finding the schema using the default configuration. You must manually fetch and add the schema identifier. To get this value, open a terminal and use the following command:

    curl <schema-registry-url>:8081/subjects/<schema-name>/versions/latest | jq .id
    
Back to top ↑