CREATE CONNECTION

A connection describes how to connect and authenticate to an external system you want Materialize to read data from. Once created, a connection is reusable across multiple CREATE SOURCE and CREATE SINK connections.

To use credentials that contain sensitive information (like passwords and SSL keys) in a connection, you must first create secrets to securely store each credential in Materialize’s secret management system. Credentials that are generally not sensitive (like usernames and SSL certificates) can be specified as plain text, or also stored as secrets.

Syntax

CREATE CONNECTION IF NOT EXISTS connection_name TO AWS PRIVATELINK CONFLUENT SCHEMA REGISTRY KAFKA POSTGRES SSH TUNNEL ( field = val , )
ALPHA! This feature is in alpha. It has known performance or stability issues and is under active development. It is not subject to our backwards compatibility guarantees.

You must contact us to enable this feature in your Materialize region.

An AWS PrivateLink connection establishes a link to an AWS PrivateLink service.

You can use AWS PrivateLink connections in Confluent Schema Registry connections, Kafka connections, and Postgres connections.

Field Value Required Description
SERVICE NAME text The name of the AWS PrivateLink service.
AVAILABILITY ZONES text[] The IDs of the AWS availability zones in which the service is accessible.

Permissions

After you create the connection, you must configure your AWS PrivateLink service to accept connections from the following AWS principal:

arn:aws:iam::664411391173:role/mz_<EXTERNAL-ID>_<CONNECTION-ID>

Fill in the values as follows:

  • EXTERNAL-ID: A unique ID associated with your Materialize region. You must contact support to determine this ID.
  • CONNECTION-ID: The ID of the AWS PrivateLink connection in your Materialize region. You can determine this ID by querying mz_connections.

For example:

arn:aws:iam::664411391173:role/mz_20273b7c-2bbe-42b8-8c36-8cc179e9bbc3_u23

See Manage permissions in the AWS PrivateLink documentation for details.

WARNING! Do not grant access to the root principal for the Materialize AWS account. Doing so will allow any Materialize customer to create a connection to your AWS PrivateLink service.

Accepting connection requests

If your AWS PrivateLink service is configured to require acceptance of connection requests, you must manually approve the connection request from Materialize after executing CREATE CONNECTION. See Accept or reject connection requests in the AWS PrivateLink documentation for more details.

Example

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

Confluent Schema Registry

A Confluent Schema Registry connection establishes a link to a Confluent Schema Registry server.

You can use Confluent Schema Registry connections in the FORMAT clause of CREATE SOURCE and CREATE SINK.

Field Value Required Description
URL text The schema registry URL.
SSL CERTIFICATE AUTHORITY secret or text The absolute path to the certificate authority (CA) certificate in PEM format. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.
SSL CERTIFICATE secret or text Your SSL certificate in PEM format. Required for SSL client authentication.
SSL KEY secret Your SSL certificate’s key in PEM format. Required for SSL client authentication.
PASSWORD secret The password used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR.
USERNAME secret or text The username used to connect to the schema registry with basic HTTP authentication. This is compatible with the ssl options, which control the transport between Materialize and the CSR.
AWS PRIVATELINK object name The name of an AWS PrivateLink connection through which network traffic should be routed.

Examples

Connect directly to a Confluent Schema Registry server:

CREATE SECRET csr_ssl_crt AS '<CSR_SSL_CRT>';
CREATE SECRET csr_ssl_key AS '<CSR_SSL_KEY>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_ssl TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://rp-f00000bar.data.vectorized.cloud:30993',
    SSL KEY = SECRET csr_ssl_key,
    SSL CERTIFICATE = SECRET csr_ssl_crt,
    USERNAME = 'foo',
    PASSWORD = SECRET csr_password
);

Connect to a Confluent Schema Registry server via an AWS PrivateLink service:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION csr_privatelink TO CONFLUENT SCHEMA REGISTRY (
    URL 'http://my-confluent-schema-registry:8081',
    AWS PRIVATELINK privatelink_svc
);

Kafka

A Kafka connection establishes a link to a Kafka cluster.

You can use Kafka connections to create Kafka sources and Kafka sinks.

General options

Field Value Required Description
BROKER text The Kafka bootstrap server. Exclusive with BROKERS.
BROKERS text[] A comma-separated list of Kafka bootstrap servers. Exclusive with BROKER.
PROGRESS TOPIC text The name of a topic that Kafka sinks can use to track internal consistency metadata. If this is not specified, a default topic name will be selected.

SSL

To connect to a Kafka broker that requires SSL authentication, use the provided options.

SSL options

Field Value Required Description
SSL CERTIFICATE AUTHORITY secret or text The absolute path to the certificate authority (CA) certificate in PEM format. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.
SSL CERTIFICATE secret or text Your SSL certificate in PEM format. Required for SSL client authentication.
SSL KEY secret Your SSL certificate’s key in PEM format. Required for SSL client authentication.

Examples

Create an SSL-authenticated Kafka connection:

CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';
CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'rp-f00000bar.data.vectorized.cloud:30365',
    SSL KEY = SECRET kafka_ssl_key,
    SSL CERTIFICATE = SECRET kafka_ssl_crt
);

Create a connection to a Kafka cluster with multiple bootstrap servers:

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS ('broker1:9092', 'broker2:9092')
);

SASL

To create a connection to a Kafka broker that requires SASL authentication, use the provided options.

SASL options

Field Value Required Description
BROKER text The Kafka bootstrap server. Exclusive with BROKERS.
BROKERS text[] A comma-separated list of Kafka bootstrap servers. Exclusive with BROKER.
SASL MECHANISMS text The SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
SASL USERNAME secret or text Your SASL username, if any. Required if SASL MECHANISMS is PLAIN.
SASL PASSWORD secret Your SASL password, if any. Required if SASL MECHANISMS is PLAIN.
SSL CERTIFICATE AUTHORITY secret or text The absolute path to the certificate authority (CA) certificate. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.

Examples

CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
);

Syntax

The full syntax for the BROKERS option is:

BROKERS ( kafka_broker , )
kafka_broker
'host:port' USING AWS PRIVATELINK connection ( PORT number )

Description

Field Value Required Description
connection object name The name of an AWS PrivateLink connection through which network traffic for this broker should be routed.
PORT integer The port of the AWS PrivateLink service to connect to. Defaults to the broker’s port.

The USING PRIVATELINK clause specifies that Materialize should connect to the designated broker in a Kafka cluster via an AWS PrivateLink service.

You must attach the clause individually to each broker that you want to connect to via AWS PrivateLink.

Examples

Suppose you have the following infrastructure:

  • A Kafka cluster consisting of two brokers named broker1 and broker2, both listening on port 9092.

  • A Network Load Balancer that forwards port 9092 to broker1:9092 and port 9093 to broker2:9092.

  • A PrivateLink endpoint service attached to the load balancer.

You can create a connection to this Kafka broker in Materialize like so:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS (
        'broker1:9092' USING AWS PRIVATELINK privatelink_svc,
        'broker2:9092' USING AWS PRIVATELINK privatelink_svc (PORT 9093)
    )
);

Postgres

A Postgres connection establishes a link to a single database of a PostgreSQL server.

You can use Postgres connections to create Postgres sources.

Field Value Required Description
DATABASE text Target database.
HOST text Database hostname.
PORT integer Default: 5432. Port number to connect to at the server host.
PASSWORD secret Password for the connection
SSH TUNNEL object name The name of an SSH tunnel connection through which network traffic should be routed.
SSL CERTIFICATE AUTHORITY secret or text The absolute path to the certificate authority (CA) certificate in PEM format. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.
SSL MODE text Default: disable. Enables SSL connections if set to require, verify_ca, or verify_full.
SSL CERTIFICATE secret or text Client SSL certificate in PEM format.
SSL KEY secret Client SSL key in PEM format.
USER text Database username.
AWS PRIVATELINK object name The name of an AWS PrivateLink connection through which network traffic should be routed.

Examples

Connect directly to a PostgreSQL server:

CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    USER 'postgres',
    PASSWORD SECRET pgpass,
    SSL MODE 'require',
    DATABASE 'postgres'
);

If your PostgreSQL server is not exposed to the public internet, you can tunnel the connection through an SSH bastion host:

CREATE CONNECTION tunnel TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize',
);

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    SSH TUNNEL tunnel,
    DATABASE 'postgres'
);

SSH tunnel

An SSH tunnel connection establishes a link to an SSH bastion server.

You can use SSH tunnel connections in Postgres connections.

Field Value Required Description
HOST text The hostname of the SSH bastion server.
PORT integer The port to connect to.
USER text The name of the user to connect as.

Key pairs

Materialize automatically manages the key pairs for an SSH tunnel connection. Each connection is associated with two key pairs. The private key for each key pair is stored securely within your region and cannot be retrieved. The public key for each key pair is announced in the mz_ssh_tunnel_connections system table.

When Materialize connects to the SSH bastion server, it will present both keys for authentication. You should configure your SSH bastion server to accept both key pairs to permit key pair rotation without downtime.

To rotate the key pairs associated with a connection, use ALTER CONNECTION.

Materialize currently generates SSH key pairs using the Ed25519 algorithm, which is fast, secure, and recommended by security professionals. Some legacy SSH servers do not support the Ed25519 algorithm. You will not be able to use these servers with Materialize’s SSH tunnel connections.

We routinely evaluate the security of the cryptographic algorithms in use in Materialize. Future versions of Materialize may use a different SSH key generation algorithm as security best practices evolve.

Examples

Create an SSH tunnel connection:

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize',
);

Retrieve the public keys for all SSH tunnel connections:

SELECT * FROM mz_ssh_tunnel_connections;
| id    | public_key_1                          | public_key_2                          |
|-------|---------------------------------------|---------------------------------------|
| ...   | ssh-ed25519 AAAA...76RH materialize   | ssh-ed25519 AAAA...hLYV materialize   |
Back to top ↑