Ingest data from Self-hosted Kafka
This guide goes through the required steps to connect Materialize to a self-hosted Kafka cluster.
Before you begin
Before you begin, you must have:
- A Kafka cluster running.
- A client machine that can interact with your cluster.
Configure network security
There are various ways to configure your Kafka network to allow Materialize to connect:
-
Use an SSH tunnel: If your Kafka cluster is running in a private network, you can use an SSH tunnel to connect Materialize to the cluster.
-
Allow Materialize IPs: If your Kafka cluster is publicly accessible, you can configure your firewall to allow connections from a set of static Materialize IP addresses.
Select the option that works best for you.
Materialize can connect to data sources like Kafka, Confluent, and PostgreSQL with a secure SSH bastion host. In this guide, you will create an SSH tunnel connection, configure your Materialize authentication settings, and create a source connection.
Before you begin
Before you begin, make sure you have access to a bastion host. You will need:
- The bastion host IP address and port number
- The bastion host username
Create an SSH tunnel connection
In Materialize, create an SSH tunnel connection to the bastion host:
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
HOST '<SSH_BASTION_HOST>',
USER '<SSH_BASTION_USER>',
PORT <SSH_BASTION_PORT>
);
Configure the SSH bastion host
The bastion host needs a public key to connect to the Materialize tunnel you created in the previous step.
-
Materialize stores public keys for SSH tunnels in the system catalog. Query
mz_ssh_tunnel_connections
to retrieve the public keys for the SSH tunnel connection you just created:SELECT mz_connections.name, mz_ssh_tunnel_connections.* FROM mz_connections JOIN mz_ssh_tunnel_connections USING(id) WHERE mz_connections.name = 'ssh_connection';
| id | public_key_1 | public_key_2 | |-------|---------------------------------------|---------------------------------------| | u75 | ssh-ed25519 AAAA...76RH materialize | ssh-ed25519 AAAA...hLYV materialize |
Materialize provides two public keys to allow you to rotate keys without connection downtime. Review the
ALTER CONNECTION
documentation for more information on how to rotate your keys. -
Log in to your SSH bastion server and add each key to the bastion
authorized_keys
file:# Command for Linux echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
-
Configure your internal firewall to allow the SSH bastion host to connect to your Kafka cluster or PostgreSQL instance.
If you are using a cloud provider like AWS or GCP, update the security group or firewall rules for your PostgreSQL instance or Kafka brokers.
Allow incoming traffic from the SSH bastion host IP address on the necessary ports.
For example, use port
5432
for PostgreSQL and ports9092
,9094
, and9096
for Kafka.Test the connection from the bastion host to the Kafka cluster or PostgreSQL instance.
telnet <KAFKA_BROKER_HOST> <KAFKA_BROKER_PORT> telnet <POSTGRES_HOST> <POSTGRES_PORT>
If the command hangs, double-check your security group and firewall settings. If the connection is successful, you can proceed to the next step.
-
Verify the SSH tunnel connection from your source to your bastion host:
# Command for Linux ssh -L 9092:kafka-broker:9092 <SSH_BASTION_USER>@<SSH_BASTION_HOST>
Verify that you can connect to the Kafka broker or PostgreSQL instance via the SSH tunnel:
telnet localhost 9092
If you are unable to connect using the
telnet
command, enableAllowTcpForwarding
andPermitTunnel
on your bastion host SSH configuration file.On your SSH bastion host, open the SSH config file (usually located at
/etc/ssh/sshd_config
) using a text editor:sudo nano /etc/ssh/sshd_config
Add or uncomment the following lines:
AllowTcpForwarding yes PermitTunnel yes
Save the changes and restart the SSH service:
sudo systemctl restart sshd
-
Ensure materialize cluster pods have network access to your SSH bastion host.
Validate the SSH tunnel connection
To confirm that the SSH tunnel connection is correctly configured, use the VALIDATE CONNECTION
command:
VALIDATE CONNECTION ssh_connection;
If no validation errors are returned, the connection can be used to create a source connection.
Create a source connection
In Materialize, create a source connection that uses the SSH tunnel connection you configured in the previous section:
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER 'broker1:9092',
SSH TUNNEL ssh_connection
);
-
Update your Kafka cluster firewall rules to allow traffic from Materialize.
-
Create a Kafka connection that references your Kafka cluster:
CREATE SECRET kafka_password AS '<your-password>'; CREATE CONNECTION kafka_connection TO KAFKA ( BROKER '<broker-url>', SASL MECHANISMS = 'SCRAM-SHA-512', SASL USERNAME = '<your-username>', SASL PASSWORD = SECRET kafka_password );
Creating a source
The Kafka connection created in the previous section can then be reused across
multiple CREATE SOURCE
statements:
CREATE SOURCE json_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
FORMAT JSON;
By default, the source will be created in the active cluster; to use a different
cluster, use the IN CLUSTER
clause.