Ingest data from Self-hosted Kafka

This guide goes through the required steps to connect Materialize to a self-hosted Kafka cluster.

Before you begin

Before you begin, you must have:

  • A Kafka cluster running.
  • A client machine that can interact with your cluster.

Configure network security

There are various ways to configure your Kafka network to allow Materialize to connect:

  • Use AWS PrivateLink: If your Kafka cluster is running on AWS, you can use AWS PrivateLink to connect Materialize to the cluster.

  • Use an SSH tunnel: If your Kafka cluster is running in a private network, you can use an SSH tunnel to connect Materialize to the cluster.

  • Allow Materialize IPs: If your Kafka cluster is publicly accessible, you can configure your firewall to allow connections from a set of static Materialize IP addresses.

Select the option that works best for you.

NOTE: Materialize provides Terraform modules for both Amazon MSK clusters and self-managed Kafka clusters which can be used to create the target groups for each Kafka broker (step 1), the network load balancer (step 2), the TCP listeners (step 3) and the VPC endpoint service (step 5).

This section covers how to create AWS PrivateLink connections and retrieve the AWS principal needed to configure the AWS PrivateLink service.

  1. Create target groups

    Create a dedicated target group for each broker with the following details:

    a. Target type as IP address.

    b. Protocol as TCP.

    c. Port as 9092, or the port that you are using in case it is not 9092 (e.g. 9094 for TLS or 9096 for SASL).

    d. Make sure that the target group is in the same VPC as the Kafka cluster.

    e. Click next, and register the respective Kafka broker to each target group using its IP address.

  2. Create a Network Load Balancer (NLB)

    Create a Network Load Balancer that is enabled for the same subnets that the Kafka brokers are in.

  3. Create TCP listeners

    Create a TCP listener for every Kafka broker that forwards to the corresponding target group you created (e.g. b-1, b-2, b-3).

    The listener port needs to be unique, and will be used later on in the CREATE CONNECTION statement.

    For example, you can create a listener for:

    a. Port 9001 → broker b-1....

    b. Port 9002 → broker b-2....

    c. Port 9003 → broker b-3....

  4. Verify security groups and health checks

    Once the TCP listeners have been created, make sure that the health checks for each target group are passing and that the targets are reported as healthy.

    If you have set up a security group for your Kafka cluster, you must ensure that it allows traffic on both the listener port and the health check port.

    Remarks:

    a. Network Load Balancers do not have associated security groups. Therefore, the security groups for your targets must use IP addresses to allow traffic.

    b. You can’t use the security groups for the clients as a source in the security groups for the targets. Therefore, the security groups for your targets must use the IP addresses of the clients to allow traffic. For more details, check the AWS documentation.

  5. Create a VPC endpoint service

    Create a VPC endpoint service and associate it with the Network Load Balancer that you’ve just created.

    Note the service name that is generated for the endpoint service.

  6. In Materialize, create an AWS PrivateLink connection that references the endpoint service that you created in the previous step.

    CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
       SERVICE NAME 'com.amazonaws.vpce.<region_id>.vpce-svc-<endpoint_service_id>',
       AVAILABILITY ZONES ('use1-az1', 'use1-az2', 'use1-az3')
    );
    

    Update the list of the availability zones to match the ones in your AWS account.

  1. Retrieve the AWS principal for the AWS PrivateLink connection you just created:

    SELECT principal
    FROM mz_aws_privatelink_connections plc
    JOIN mz_connections c ON plc.id = c.id
    WHERE c.name = 'privatelink_svc';
    
       id   |                                 principal
    --------+---------------------------------------------------------------------------
     u1     | arn:aws:iam::664411391173:role/mz_20273b7c-2bbe-42b8-8c36-8cc179e9bbc3_u1
    

    Follow the instructions in the AWS PrivateLink documentation to configure your VPC endpoint service to accept connections from the provided AWS principal.

  2. If your AWS PrivateLink service is configured to require acceptance of connection requests, you must manually approve the connection request from Materialize after executing CREATE CONNECTION. For more details, check the AWS PrivateLink documentation.

    Note: It might take some time for the endpoint service connection to show up, so you would need to wait for the endpoint service connection to be ready before you create a source.

Validate the AWS PrivateLink connection you created using the VALIDATE CONNECTION command:

VALIDATE CONNECTION privatelink_svc;

If no validation error is returned, move to the next step.

Create a source connection

In Materialize, create a source connection that uses the AWS PrivateLink connection you just configured:

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS (
        'b-1.hostname-1:9096' USING AWS PRIVATELINK privatelink_svc (PORT 9001, AVAILABILITY ZONE 'use1-az2'),
        'b-2.hostname-2:9096' USING AWS PRIVATELINK privatelink_svc (PORT 9002, AVAILABILITY ZONE 'use1-az1'),
        'b-3.hostname-3:9096' USING AWS PRIVATELINK privatelink_svc (PORT 9003, AVAILABILITY ZONE 'use1-az3')
    ),
    -- Authentication details
    -- Depending on the authentication method the Kafka cluster is using
    SASL MECHANISMS = 'SCRAM-SHA-512',
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
);

The (PORT <port_number>) value must match the port that you used when creating the TCP listener in the Network Load Balancer. Be sure to specify the correct availability zone for each broker.

Materialize can connect to data sources like Kafka, Confluent, and PostgreSQL with a secure SSH bastion host. In this guide, you will create an SSH tunnel connection, configure your Materialize authentication settings, and create a source connection.

Before you begin

Before you begin, make sure you have access to a bastion host. You will need:

  • The bastion host IP address and port number
  • The bastion host username

Create an SSH tunnel connection

In Materialize, create an SSH tunnel connection to the bastion host:

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);

Configure the SSH bastion host

The bastion host needs a public key to connect to the Materialize tunnel you created in the previous step.

  1. Materialize stores public keys for SSH tunnels in the system catalog. Query mz_ssh_tunnel_connections to retrieve the public keys for the SSH tunnel connection you just created:

    SELECT
        mz_connections.name,
        mz_ssh_tunnel_connections.*
    FROM
        mz_connections JOIN
        mz_ssh_tunnel_connections USING(id)
    WHERE
        mz_connections.name = 'ssh_connection';
    
    | id    | public_key_1                          | public_key_2                          |
    |-------|---------------------------------------|---------------------------------------|
    | u75   | ssh-ed25519 AAAA...76RH materialize   | ssh-ed25519 AAAA...hLYV materialize   |
    

    Materialize provides two public keys to allow you to rotate keys without connection downtime. Review the ALTER CONNECTION documentation for more information on how to rotate your keys.

  2. Log in to your SSH bastion server and add each key to the bastion authorized_keys file:

    # Command for Linux
    echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys
    echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
    
  3. Configure your internal firewall to allow the SSH bastion host to connect to your Kafka cluster or PostgreSQL instance.

    If you are using a cloud provider like AWS or GCP, update the security group or firewall rules for your PostgreSQL instance or Kafka brokers.

    Allow incoming traffic from the SSH bastion host IP address on the necessary ports.

    For example, use port 5432 for PostgreSQL and ports 9092, 9094, and 9096 for Kafka.

    Test the connection from the bastion host to the Kafka cluster or PostgreSQL instance.

    telnet <KAFKA_BROKER_HOST> <KAFKA_BROKER_PORT>
    telnet <POSTGRES_HOST> <POSTGRES_PORT>
    

    If the command hangs, double-check your security group and firewall settings. If the connection is successful, you can proceed to the next step.

  4. Verify the SSH tunnel connection from your source to your bastion host:

    # Command for Linux
    ssh -L 9092:kafka-broker:9092 <SSH_BASTION_USER>@<SSH_BASTION_HOST>
    

    Verify that you can connect to the Kafka broker or PostgreSQL instance via the SSH tunnel:

    telnet localhost 9092
    

    If you are unable to connect using the telnet command, enable AllowTcpForwarding and PermitTunnel on your bastion host SSH configuration file.

    On your SSH bastion host, open the SSH config file (usually located at /etc/ssh/sshd_config) using a text editor:

    sudo nano /etc/ssh/sshd_config
    

    Add or uncomment the following lines:

    AllowTcpForwarding yes
    PermitTunnel yes
    

    Save the changes and restart the SSH service:

    sudo systemctl restart sshd
    
  5. Retrieve the static egress IPs from Materialize and configure the firewall rules (e.g. AWS Security Groups) for your bastion host to allow SSH traffic for those IP addresses only.

    SELECT * FROM mz_catalog.mz_egress_ips;
    
    XXX.140.90.33
    XXX.198.159.213
    XXX.100.27.23
    

Validate the SSH tunnel connection

To confirm that the SSH tunnel connection is correctly configured, use the VALIDATE CONNECTION command:

VALIDATE CONNECTION ssh_connection;

If no validation errors are returned, the connection can be used to create a source connection.

Create a source connection

In Materialize, create a source connection that uses the SSH tunnel connection you configured in the previous section:

CREATE CONNECTION kafka_connection TO KAFKA (
  BROKER 'broker1:9092',
  SSH TUNNEL ssh_connection
);
  1. In the SQL Shell, or your preferred SQL client connected to Materialize, find the static egress IP addresses for the Materialize region you are running in:

    SELECT * FROM mz_egress_ips;
    
  2. Update your Kafka cluster firewall rules to allow traffic from each IP address from the previous step.

  3. Create a Kafka connection that references your Kafka cluster:

    CREATE SECRET kafka_password AS '<your-password>';
    
    CREATE CONNECTION kafka_connection TO KAFKA (
        BROKER '<broker-url>',
        SASL MECHANISMS = 'SCRAM-SHA-512',
        SASL USERNAME = '<your-username>',
        SASL PASSWORD = SECRET kafka_password
    );
    

Creating a source

The Kafka connection created in the previous section can then be reused across multiple CREATE SOURCE statements:

CREATE SOURCE json_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT JSON;

By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

Back to top ↑