Striim Cloud
Striim is a real-time data integration platform that offers a variety of connectors for databases, messaging systems, and other data sources. This guide walks through the steps to ingest Striim Change Data Capture (CDC) into Materialize using the Kafka source.
Before you begin
As is, integrating Striim Cloud with Materialize requires using a message broker (like Apache Kafka) as an intermediary, as well as a schema registry (like Confluent Schema Registry) configured for Avro schema management.
Ensure that you have:
- An active Striim account and Striim Cloud service.
- A Kafka cluster up and running with access to a Confluent Schema Registry service.
Step 1. Configure Striim CDC
Database configuration
Follow Striim’s guidance to enable and configure CDC from your source relational database:
Kafka writer configuration
-
In Striim, create a
KafkaWriter
using the version that corresponds to your target Kafka broker. -
Select the writer created in the previous step as the Input stream.
-
Configure the Kafka broker details and the target topic name.
-
Under Advanced settings > Kafka Config, add the
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
configuration to serialize records using the Confluent wire format. -
Add a
MESSAGE KEY
field to specify the primary key of the table. This will configure theKafkaWriter
to use the table’s primary key as the Kafka message key.
-
Schema registry configuration
-
From the Formatter dropdown menu, select
AvroFormatter
to serialize records using the Avro format. -
Configure the Schema Registry URL, and its authentication credentials (if necessary).
-
Set the format to
Table
. -
Leave the Schema Registry Subject Name and Schema Registry Subject Name Mapping fields empty.
Known limitations
Supported types
Some data types might not be correctly supported in Striim’s Avro handling (e.g.
bigint
).
If you run into Avro serialization issues with the KafkaWriter
, please reach
out to Striim support or the Striim community.
Once your Striim Cloud service is configured for CDC, start the app to begin streaming changes from your source relational database into the specified Kafka cluster. Next, you’ll configure Materialize to consume this data.
Step 2. Start ingesting data
quickstart
), you don’t need to create a new cluster. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
-
In the SQL Shell, or your preferred SQL client connected to Materialize, use the
CREATE CONNECTION
command to create connection objects with access and authentication details to your Kafka cluster and schema registry:CREATE SECRET kafka_password AS '<your-password>'; CREATE SECRET csr_password AS '<your-password>'; CREATE CONNECTION kafka_connection TO KAFKA ( BROKER '<broker-url>', SECURITY PROTOCOL = 'SASL_PLAINTEXT', SASL MECHANISMS = 'SCRAM-SHA-256', -- or `PLAIN` or `SCRAM-SHA-512` SASL USERNAME = '<your-username>', SASL PASSWORD = SECRET kafka_password SSH TUNNEL ssh_connection ); CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY ( URL '<schema-registry-url>', USERNAME = '<your-username>', PASSWORD = SECRET csr_password );
-
Use the
CREATE SOURCE
command to connect Materialize to your Kafka broker and schema registry using the connections you created in the previous step.CREATE SOURCE src FROM KAFKA CONNECTION kafka_connection (TOPIC '<topic-name>') KEY FORMAT TEXT VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection VALUE STRATEGY ID <id> ENVELOPE UPSERT;
Fetching the
VALUE STRATEGY ID
Striim uses nonstandard subject names in the schema registry, which prevents Materialize from finding the schema using the default configuration. You must manually fetch and add the schema identifier. To get this value, open a terminal and use the following command:
curl <schema-registry-url>:8081/subjects/<schema-name>/versions/latest | jq .id