Amazon Managed Streaming for Apache Kafka (Amazon MSK)

This guide goes through the required steps to connect Materialize to an Amazon MSK cluster.

💡 Tip: For help getting started with your own data, you can schedule a free guided trial.

Before you begin

Before you begin, you must have:

  • An Amazon MSK cluster running on AWS.
  • A client machine that can interact with your cluster.

Creating a connection

There are various ways to configure your Kafka network to allow Materialize to connect:

  • Use an SSH tunnel: If your Kafka cluster is running in a private network, you can use an SSH tunnel to connect Materialize to the cluster.

  • Allow Materialize IPs: If your Kafka cluster is publicly accessible, you can configure your firewall to allow connections from a set of static Materialize IP addresses.

Materialize can connect to data sources like Kafka, Confluent, and PostgreSQL with a secure SSH bastion host. In this guide, you will create an SSH tunnel connection, configure your Materialize authentication settings, and create a source connection.

Before you begin

Before you begin, make sure you have access to a bastion host. You will need:

  • The bastion host IP address and port number
  • The bastion host username

Create an SSH tunnel connection

In Materialize, create an SSH tunnel connection to the bastion host:

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);

Configure the SSH bastion host

The bastion host needs a public key to connect to the Materialize tunnel you created in the previous step.

  1. Materialize stores public keys for SSH tunnels in the system catalog. Query mz_ssh_tunnel_connections to retrieve the public keys for the SSH tunnel connection you just created:

    SELECT
        mz_connections.name,
        mz_ssh_tunnel_connections.*
    FROM
        mz_connections JOIN
        mz_ssh_tunnel_connections USING(id)
    WHERE
        mz_connections.name = 'ssh_connection';
    
    | id    | public_key_1                          | public_key_2                          |
    |-------|---------------------------------------|---------------------------------------|
    | u75   | ssh-ed25519 AAAA...76RH materialize   | ssh-ed25519 AAAA...hLYV materialize   |
    

    Materialize provides two public keys to allow you to rotate keys without connection downtime. Review the ALTER CONNECTION documentation for more information on how to rotate your keys.

  2. Log in to your SSH bastion server and add each key to the bastion authorized_keys file:

    # Command for Linux
    echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys
    echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
    
  3. Configure your internal firewall to allow the SSH bastion host to connect to your Kafka cluster or PostgreSQL instance.

    If you are using a cloud provider like AWS or GCP, update the security group or firewall rules for your PostgreSQL instance or Kafka brokers.

    Allow incoming traffic from the SSH bastion host IP address on the necessary ports.

    For example, use port 5432 for PostgreSQL and ports 9092, 9094, and 9096 for Kafka.

    Test the connection from the bastion host to the Kafka cluster or PostgreSQL instance.

    telnet <KAFKA_BROKER_HOST> <KAFKA_BROKER_PORT>
    telnet <POSTGRES_HOST> <POSTGRES_PORT>
    

    If the command hangs, double-check your security group and firewall settings. If the connection is successful, you can proceed to the next step.

  4. Verify the SSH tunnel connection from your source to your bastion host:

    # Command for Linux
    ssh -L 9092:kafka-broker:9092 <SSH_BASTION_USER>@<SSH_BASTION_HOST>
    

    Verify that you can connect to the Kafka broker or PostgreSQL instance via the SSH tunnel:

    telnet localhost 9092
    

    If you are unable to connect using the telnet command, enable AllowTcpForwarding and PermitTunnel on your bastion host SSH configuration file.

    On your SSH bastion host, open the SSH config file (usually located at /etc/ssh/sshd_config) using a text editor:

    sudo nano /etc/ssh/sshd_config
    

    Add or uncomment the following lines:

    AllowTcpForwarding yes
    PermitTunnel yes
    

    Save the changes and restart the SSH service:

    sudo systemctl restart sshd
    
  5. Ensure materialize cluster pods have network access to your SSH bastion host.

Validate the SSH tunnel connection

To confirm that the SSH tunnel connection is correctly configured, use the VALIDATE CONNECTION command:

VALIDATE CONNECTION ssh_connection;

If no validation errors are returned, the connection can be used to create a source connection.

Create a source connection

In Materialize, create a source connection that uses the SSH tunnel connection you configured in the previous section:

CREATE CONNECTION kafka_connection TO KAFKA (
  BROKER 'broker1:9092',
  SSH TUNNEL ssh_connection
);

This section goes through the required steps to connect Materialize to an Amazon MSK cluster, including some of the more complicated bits around configuring security settings in Amazon MSK.

If you already have an Amazon MSK cluster, you can skip step 1 and directly move on to Make the cluster public and enable SASL. You can also skip steps 3 and 4 if you already have Apache Kafka installed and running, and have created a topic that you want to create a source for.

The process to connect Materialize to Amazon MSK consists of the following steps:

  1. Create an Amazon MSK cluster

    If you already have an Amazon MSK cluster set up, then you can skip this step.

    a. Sign in to the AWS Management Console and open the Amazon MSK console

    b. Choose Create cluster

    c. Enter a cluster name, and leave all other settings unchanged

    d. From the table under All cluster settings, copy the values of the following settings and save them because you need them later in this tutorial: VPC, Subnets, Security groups associated with VPC

    e. Choose Create cluster

    Note: This creation can take about 15 minutes.

  2. Make the cluster public and enable SASL

    Turn on SASL

    a. Navigate to the Amazon MSK console

    b. Choose the MSK cluster you just created in Step 1

    c. Click on the Properties tab

    d. In the Security settings section, choose Edit

    e. Check the checkbox next to SASL/SCRAM authentication

    f. Click Save changes

    You can find more details about updating a cluster’s security configurations here.

    Create a symmetric key

    a. Now go to the AWS Key Management Service (AWS KMS) console

    b. Click Create Key

    c. Choose Symmetric and click Next

    d. Give the key and Alias and click Next

    e. Under Administrative permissions, check the checkbox next to the AWSServiceRoleForKafka and click Next

    f. Under Key usage permissions, again check the checkbox next to the AWSServiceRoleForKafka and click Next

    g. Click on Create secret

    h. Review the details and click Finish

    You can find more details about creating a symmetric key here.

    Store a new Secret

    a. Go to the AWS Secrets Manager console

    b. Click Store a new secret

    c. Choose Other type of secret (e.g. API key) for the secret type

    d. Under Key/value pairs click on Plaintext

    e. Paste the following in the space below it and replace <your-username> and <your-password> with the username and password you want to set for the cluster

      {
        "username": "<your-username>",
        "password": "<your-password>"
      }
    

    f. On the next page, give a Secret name that starts with AmazonMSK_

    g. Under Encryption Key, select the symmetric key you just created in the previous sub-section from the dropdown

    h. Go forward to the next steps and finish creating the secret. Once created, record the ARN (Amazon Resource Name) value for your secret

    You can find more details about creating a secret using AWS Secrets Manager here.

    Associate secret with MSK cluster

    a. Navigate back to the Amazon MSK console and click on the cluster you created in Step 1

    b. Click on the Properties tab

    c. In the Security settings section, under SASL/SCRAM authentication, click on Associate secrets

    d. Paste the ARN you recorded in the previous subsection and click Associate secrets

    Create the cluster’s configuration

    a. Go to the Amazon CloudShell console

    b. Create a file (eg. msk-config.txt) with the following line

      allow.everyone.if.no.acl.found = false
    

    c. Run the following AWS CLI command, replacing <config-file-path> with the path to the file where you saved your configuration in the previous step

      aws kafka create-configuration --name "MakePublic" \
      --description "Set allow.everyone.if.no.acl.found = false" \
      --kafka-versions "2.6.2" \
      --server-properties fileb://<config-file-path>/msk-config.txt
    

    You can find more information about making your cluster public here.

  3. Create a client machine

    If you already have a client machine set up that can interact with your cluster, then you can skip this step.

    If not, you can create an EC2 client machine and then add the security group of the client to the inbound rules of the cluster’s security group from the VPC console. You can find more details about how to do that here.

  4. Install Apache Kafka and create a topic

    To start using Materialize with Apache Kafka, you need to create a Materialize source over an Apache Kafka topic. If you already have Apache Kafka installed and a topic created, you can skip this step.

    Otherwise, you can install Apache Kafka on your client machine from the previous step and create a topic. You can find more information about how to do that here.

  5. Create ACLs

    As allow.everyone.if.no.acl.found is set to false, you must create ACLs for the cluster and topics configured in the previous step to set appropriate access permissions. For more information, see the Amazon MSK documentation.

  6. Create a source in Materialize

    a. Open the Amazon MSK console and select your cluster

    b. Click on View client information

    c. Copy the url under Private endpoint and against SASL/SCRAM. This will be your <broker-url> going forward.

    d. Connect to Materialize using the SQL Shell, or your preferred SQL client.

    e. Create a connection using the command below. The broker URL is what you copied in step c of this subsection. The <topic-name> is the name of the topic you created in Step 4. The <your-username> and <your-password> is from Store a new secret under Step 2.

    CREATE SECRET msk_password AS '<your-password>';
    
    CREATE CONNECTION kafka_connection TO KAFKA (
        BROKER '<broker-url>',
        SASL MECHANISMS = 'SCRAM-SHA-512',
        SASL USERNAME = '<your-username>',
        SASL PASSWORD = SECRET msk_password
      );
    

    f. If the command executes without an error and outputs CREATE SOURCE, it means that you have successfully connected Materialize to your cluster.

    Note: The example above walked through creating a source which is a way of connecting Materialize to an external data source. We created a connection to Amazon MSK using SASL authentication, using credentials securely stored as secrets in Materialize’s secret management system. For input formats, we used text, however, Materialize supports various other options as well. For example, you can ingest messages formatted in JSON, Avro and Protobuf. You can find more details about the various different supported formats and possible configurations here.

Creating a source

The Kafka connection created in the previous section can then be reused across multiple CREATE SOURCE statements. By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

CREATE SOURCE json_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'test_topic')
  FORMAT JSON;
Back to top ↑