The Problem
We need to provide (internal or end-user) access to a view of data that combines a fast-changing stream of events from Kafka with a table from a database (which is also changing).
Here are a few real-world examples where this problem comes up:
- Calculate API usage by joining API logs in Kafka with a user table
- Join IoT sensor data in Kafka with a sensor config table
- Generate affiliate program stats by joining pageviews with an affiliate user table
Solution: Stream the database to Kafka, materialize a view
The guide below walks through joining Kafka with a database by first streaming the database into Kafka using Debezium to do change data capture (CDC), and then using Materialize to maintain a SQL view that joins the two Kafka topics and outputs the exact data needed. (More context on Debezium and Materialize is provided below.)
Our Solution
Why stream the database into Kafka?
The extra step of getting everything into Kafka is necessary because:
- Solutions that query the database on every Kafka event take away the scale and throughput benefits of a message broker by reintroducing limitations of a database.
- Solutions that munge the Kafka data back into a traditional database where a join can be done eliminate the “real-time” benefits of a stream by falling back to “batch” intervals.
NOTE: For PostgreSQL users, Materialize will soon be beta testing a direct Postgres connection that removes the need for Kafka by reading directly from Postgres and the Postgres WAL. Get in touch if interested in testing this out.
Important considerations for this approach
The Debezium + Materialize approach to joining Kafka with a database doesn’t fit every use case. Consider the following factors:
- This is not creating a traditional stream-table join where Kafka events are enriched (e.g. new fields added) and sent to another Kafka broker because here we are aggregating the data into a materialized view. If you set out to build a stream-table join, this may still be useful to you: one-in-one-out enrichment often ends up in an aggregated view downstream. In those cases, the solution below is an opportunity to remove complexity.
- It’s necessary to use Debezium when the data in the database is changing. If the data needed from the database is static (e.g. country codes and names) the simplest solution is to remove the database dependency entirely and load the data into Materialize using dbt seeds or the
COPY TO
command.
Table of Contents
The remainder of this guide is split into a conceptual overview followed by a hands-on walkthrough with code examples.
Learn about the components
Debezium
Debezium is an open-source Kafka Connect component that listens for changes to a database (INSERTS
, UPDATES
, DELETES
), translates them into change data capture (CDC) events, and pushes them to a message broker.
Here’s a more tangible example of how Debezium works.
Upon running this update query:
UPDATE my_table SET column_2 = 43 WHERE id = 123;
Debezium produces an event like this to a Kafka topic matching the name of the table:
{
"op": "u",
"source": {
"table": "my_table"
...
},
"ts_ms": 1616428166123,
"before":{
"id":123,
"column_1": "abc",
"column_2": 42,
"created_at": "Mon, 15 Mar 2021 12:34:56 GMT",
"updated_at": "Mon, 15 Mar 2021 12:34:56 GMT"
},
"after":{
"id":123,
"column_1": "abc",
"column_2": 43,
"created_at": "Mon, 15 Mar 2021 12:34:56 GMT",
"updated_at": "Mon, 22 Mar 2021 15:43:21 GMT"
}
}
The change data capture event contains metadata about the table and the state of the entire row before and after the update.
Further reading on Debezium
Materialize
Once all the data is in Kafka, the next step is to join the Kafka-native data and the CDC data in a materialized view that outputs the exact structure we need. For that, we use Materialize, an engine for maintaining views on fast-changing streams of data.
What is a materialized view?
Imagine all your data was in a spreadsheet instead of Kafka. The source data would be in massive “Raw Data” worksheets/tabs where rows are continually modified and added. The materialized views are the tabs you create with formulas and pivot tables that summarize or aggregate the raw data. As you add and update raw data, the materialized views are automatically updated.
Why use Materialize?
Materialize works well for this problem for a few reasons:
- Capable of complex joins - Materialize has much broader support for JOINs than most streaming platforms, i.e. Materialize supports all types of SQL joins in all of the expected conditions.
- Strongly consistent - Eventual consistency in a streaming solution can cause unexpected results. Read Eventual Consistency isn’t for Streaming for more.
- Simple to configure and maintain - Views are defined in standard SQL, and Materialize presents as PostgreSQL, making it easy to connect and query the results from existing PostgreSQL libraries.
Materialize is source-available and free to run forever in a single-node configuration. There’s also a private beta of Materialize Cloud open for registration.
Further reading on Materialize
Build the solution
We’ll be using this ecommerce-demo
repo because it has convenient examples of Kafka-native and database data:
pageviews
- a Kafka-native stream of simulated JSON-encoded web analytics pageview events. Sample pageview event:{ "user_id": 1234, "url": "/products/56", "channel": "social", "received_at": 1619461059 }
users
- a table in a MySQL database with simulated e-commerce shop users with the following attributes:mysql> DESCRIBE users; +------------+---------------------+ | Field | Type | +------------+---------------------+ | id | bigint(20) unsigned | | email | varchar(255) | | is_vip | tinyint(1) | | created_at | timestamp | | updated_at | datetime | +------------+---------------------+
The steps below create a real-time join of the pageviews
in Kafka and the users
table in the database. The resulting materialized view can be read via a query or streamed out to a new Kafka topic.
Initialize the starting infrastructure
Start by creating the following infrastructure as Docker containers:
Service | Description |
---|---|
Kafka + Zookeeper | The message broker where our pageviews and CDC events are stored. |
Schema Registry | The Kafka service used by Debezium for CDC message serializing/deserializing using Avro schema. |
MySQL | The database containing a user table which we’ll stream to Kafka. Debezium supports other databases as well. |
Loadgen | A Python script in a Docker container that produces JSON-formatted pageview events directly to Kafka and updates the users table in the database. |
Before continuing, make sure you have Docker and Docker-compose installed. Clone the repo and use the included docker-compose.yml
file to spin up the above containers.
git clone https://github.com/MaterializeInc/ecommerce-demo.git
cd ecommerce-demo
docker-compose up -d kafka zookeeper schema-registry mysql loadgen
The last line above tells Docker to spin up five specific containers (kafka
, zookeeper
, schema-registry
, mysql
and loadgen
) from the docker-compose.yml
file.
All components need network access to each other. In the demo code this is done via a Docker network enabling services in one container to address services in other containers by name (e.g. kafka:9092
).
Start Debezium
Start the Debezium container with docker-compose
:
docker-compose up -d debezium
This uses the config specified in docker-compose.yml
to start a container named debezium
with port 8083
accessible to the host using the debezium/connect:1.4
image with the environment variables listed below included:
Config | Description |
---|---|
BOOTSTRAP_SERVERS=kafka:9092 |
The URLs of the Kafka brokers Debezium will be writing events to. |
GROUP_ID=1 |
Required by Kafka Connect, should be unique to Debezium, used to identify the cluster that our service belongs to. |
CONFIG_STORAGE_TOPIC=debezium_configs |
Required by Kafka Connect, the Kafka topic where Debezium stores config data. |
OFFSET_STORAGE_TOPIC=debezium_offsets |
Required by Kafka Connect, the Kafka topic where Debezium stores connector offsets. |
KEY_CONVERTER=io.confluent.connect.avro.AvroConverter |
set this optional param in order to use Avro schema instead of default JSON |
VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter |
Same as above |
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 |
Set this optional param to the internal URL and port of our schema registry so Debezium can write/update Avro schema encoding. |
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 |
Same as above |
Point Debezium to MySQL
Debezium is running, but it needs to connect to the database to start streaming data into Kafka. Send the config to Debezium with a curl
command:
curl -H 'Content-Type: application/json' localhost:8083/connectors --data '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.name": "mysql",
"database.server.id": "1234",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mysql-history",
"time.precision.mode": "connect"
}
}'
The code above sends JSON-formatted config data to the Debezium container which has its internal port 8083
open externally (mapped to host port 8083).
Here is more detail on the above configuration variables:
Config | Description |
---|---|
connector.class=“io.debezium.connector.mysql.MySqlConnector” | This tells Debezium we’re using a MySQL DB. |
database.hostname=mysql |
The hostname of the MySQL DB. In this case, Docker has mapped the mysql container name to the container. |
database.port=3306 |
The port used by the MySQL database. |
database.user=root |
The MySQL user that Debezium connects as, read about privileges required by Debezium here. |
database.password=debezium |
|
database.server.name=mysql |
name and id are used to identify the connector in Kafka, the name is used as a prefix on Kafka topics |
database.server.id=1234 |
See above |
database.history.kafka.bootstrap.servers=kafka:9092 |
Debezium will store the history of your database schema in a topic on this broker. |
database.history.kafka.topic=mysql-history |
The topic name for the history log of your DB schema. |
time.precision.mode=connect |
This tells Debezium to use Kafka Connect logical types for timestamps, read more here. |
At this point, debezium
is connected to the mysql
database, streaming changes into kafka
, and registering schema in schema-registry
!
Start Materialize
Spin up Materialize in Docker:
docker-compose up -d materialized
Materialize is now running in a container named materialized
with port 6875
accessible to the host.
Specify data sources in Materialize
Connect to Materialize via the psql
command-line interface and specify where to find Kafka data using CREATE SOURCE
statements. For convenience, psql
is packaged in a Docker container, run:
docker-compose run mzcli
This is equivalent to running psql -U materialize -h localhost -p 6875 materialize
In the psql
CLI, create sources for pageviews
and users
.
CREATE SOURCE raw_pageviews
FROM KAFKA BROKER 'kafka:9092' TOPIC 'pageviews'
FORMAT BYTES;
CREATE SOURCE users
FROM KAFKA BROKER 'kafka:9092' TOPIC 'mysql.shop.users'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE DEBEZIUM;
The code above creates two sources, raw_pageviews
, which is currently just raw BYTES
, and append-only, and users
from the database via Debezium, which is Avro-encoded and uses a special Debezium envelope that takes advantage of the fact that Debezium provides the old and new data in each message.
Create the SQL that converts raw_pageviews
into typed columns using CREATE VIEW
syntax:
CREATE VIEW pageviews AS
SELECT
(pageview_data->'user_id')::INT as user_id,
(pageview_data->'url')::STRING as url,
(pageview_data->'channel')::STRING as channel,
to_timestamp((pageview_data->'received_at')::INT) as ts
FROM (
SELECT convert_from(data, 'utf8')::jsonb AS pageview_data
FROM raw_pageviews
);
This is a two-step query that:
- Encodes raw bytes in UTF8 and casts to Materialize
jsonb
type:convert_from(data, 'utf8')::jsonb
- Uses PostgreSQL JSON syntax
pageview_data->'user_id'
and type casting::<TYPE>
to extract four fields into typed columns.
At this point, Materialize still hasn’t ingested any data because none of the sources or views have been materialized.
Step 2: Create a materialized view
Time to join the streams. Create a materialized view of pageview counts by channel, segmented by VIP and non-VIP users:
CREATE MATERIALIZED VIEW pageviews_by_user_segment AS
SELECT
users.is_vip,
pageviews.channel,
date_trunc('hour', pageviews.ts) as ts_hour,
count(*) as pageview_count
FROM users
JOIN pageviews ON pageviews.user_id = users.id
GROUP BY 1,2,3;
This looks almost identical to traditional SQL. The only special syntax is CREATE MATERIALIZED VIEW
, which tells Materialize to:
- Create a dataflow and arrangements (indexes) to compute and maintain the view.
- Consume all applicable events from Kafka and process them through the dataflow.
- Once caught up with real time, continue to process new events and maintain the view.
Materialize will maintain the view until it is removed with DROP VIEW
. No specific time window is necessary. Materialize is joining across all the Kafka events it can ingest.
Test the view by running:
SELECT * FROM pageviews_by_user_segment;
Running it multiple times should show the pageview_count
updating.
Read output from Materialize
There are two primary ways to access the output of the view, these can be thought of as “poll” (PostgreSQL query) and “push” (Materialize streams output via TAIL or sinks out to a new Kafka topic, downstream service consumes.)
Poll Materialize with a PostgreSQL query
If the joined data is only needed “upon request”, for example, in a business intelligence dashboard, admin view, or generated report, a simple PostgreSQL query to the results may be sufficient.
In this approach, the downstream application is given credentials to query Materialize as if it were a PostgreSQL database, this also means that many existing PostgreSQL drivers will work out-of-the-box.
Here is a very simple Python example that uses the psycopg2
module to connect to Materialize and fetch data:
One key difference between querying Materialize and querying a traditional database is that Materialize is doing almost no compute work to respond to each query (the work is done when new data appears in Kafka) so it is perfectly fine to write polling queries that run every second.
Stream output via TAIL
Materialize can stream changes to views out via the TAIL command
. For a practical example of how a downstream application can subscribe to the TAIL command see A Real Time Application Powered by Materialize’s TAIL Command.
Stream output into a new Kafka topic
If the end goal is better served by streaming data out into another Kafka topic, use a sink. (See CREATE SINK
syntax.) The format of events produced to sinks are similar to CDC events described above, where each event consists of a before and after When a sink is first created, by default Materialize pushes an initial snapshot of the table to Kafka, followed by streaming events for each change to the materialized view specified in the sink.
Connect to Materialize via psql
again and add a sink for the view created earlier:
CREATE SINK pageviews_by_user_segment_sink
FROM pageviews_by_user_segment
INTO KAFKA BROKER 'kafka' TOPIC 'pageviews-user-segment-sink'
FORMAT AVRO USING
CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081';
The code above takes the materialized view pageviews_by_user_segment
and creates a sink named pageviews_by_user_segment_sink
going to a Kafka topic named pageviews-user-segment-sink
in Avro format.
Conclusion + Where to go from here
Hopefully, the explanation and code examples above have helped to demonstrate at a conceptual level how Debezium and Materialize can be used as powerful tools for joining, reducing, and aggregating high-volume streams of data from Kafka and databases into whatever output format your use case demands.
Moving beyond the conceptual phase, there are several next steps to think about like scaling and load, handling schema evolution, and deployment and maintenance of Materialize. If you have questions or are interested in connecting with others using Materialize, join the community in Slack.