CREATE CONNECTION

A connection describes how to connect and authenticate to an external system you want Materialize to read from or write to. Once created, a connection is reusable across multiple CREATE SOURCE and CREATE SINK statements.

To use credentials that contain sensitive information (like passwords and SSL keys) in a connection, you must first create secrets to securely store each credential in Materialize’s secret management system. Credentials that are generally not sensitive (like usernames and SSL certificates) can be specified as plain text, or also stored as secrets.

Source and sink connections

Kafka

A Kafka connection establishes a link to a Kafka cluster. You can use Kafka connections to create sources and sinks.

Syntax

CREATE CONNECTION IF NOT EXISTS connection_name TO KAFKA ( field = val , ) WITH with_options

Connection options

Field
Value Description
BROKER text The Kafka bootstrap server.

Exactly one of BROKER or BROKERS must be specified.
BROKERS text[] A comma-separated list of Kafka bootstrap servers.

Exactly one of BROKER or BROKERS must be specified.
SECURITY PROTOCOL text The security protocol to use: PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL.

Defaults to SASL_SSL if any SASL ... options are specified, otherwise defaults to SSL.
SASL MECHANISMS text The SASL mechanism to use for authentication: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Despite the name, this option only allows a single mechanism to be specified.

Required if the security protocol is SASL_PLAINTEXT or SASL_SSL.
SASL USERNAME secret or text Your SASL username.

Required and only valid when the security protocol is SASL_PLAINTEXT or SASL_SSL.
SASL PASSWORD secret Your SASL password.

Required and only valid when the security protocol is SASL_PLAINTEXT or SASL_SSL.
SSL CERTIFICATE AUTHORITY secret or text The certificate authority (CA) certificate in PEM format. Used to validate the brokers' TLS certificates. If unspecified, uses the system’s default CA certificates.

Only valid when the security protocol is SSL or SASL_SSL.
SSL CERTIFICATE secret or text Your TLS certificate in PEM format for SSL client authentication. If unspecified, no client authentication is performed.

Only valid when the security protocol is SSL or SASL_SSL.
SSL KEY secret Your TLS certificate’s key in PEM format.

Required and only valid when SSL CERTIFICATE is specified.
SSH TUNNEL object name The name of an SSH tunnel connection to route network traffic through by default.
PROGRESS TOPIC text The name of a topic that Kafka sinks can use to track internal consistency metadata. If this is not specified, a default topic name will be selected.

WITH options

Field Value Description
VALIDATE boolean Whether connection validation should be performed on connection creation.

Defaults to true.

To connect to a Kafka cluster with multiple bootstrap servers, use the BROKERS option:

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS ('broker1:9092', 'broker2:9092')
);

Security protocol examples

WARNING! It is insecure to use the PLAINTEXT security protocol unless you are using a network security connection to tunnel into a private network, as shown below.
CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
    SECURITY PROTOCOL = 'PLAINTEXT',
    SSH TUNNEL ssh_connection
);

With both TLS encryption and TLS client authentication:

CREATE SECRET kafka_ssl_cert AS '-----BEGIN CERTIFICATE----- ...';
CREATE SECRET kafka_ssl_key AS '-----BEGIN PRIVATE KEY----- ...';
CREATE SECRET ca_cert AS '-----BEGIN CERTIFICATE----- ...';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'rp-f00000bar.data.vectorized.cloud:30365',
    SECURITY PROTOCOL = 'SSL'
    SSL CERTIFICATE = SECRET kafka_ssl_cert,
    SSL KEY = SECRET kafka_ssl_key,
    -- Specifying a certificate authority is only required if your cluster's
    -- certificates are not issued by a CA trusted by the Mozilla root store.
    SSL CERTIFICATE AUTHORITY = SECRET ca_cert
);

With only TLS encryption:

WARNING! It is insecure to use TLS encryption with no authentication unless you are using a network security connection to tunnel into a private network as shown below.
CREATE SECRET ca_cert AS '-----BEGIN CERTIFICATE----- ...';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER = 'rp-f00000bar.data.vectorized.cloud:30365',
    SECURITY PROTOCOL = 'SSL',
    SSH TUNNEL ssh_connection,
    -- Specifying a certificate authority is only required if your cluster's
    -- certificates are not issued by a CA trusted by the Mozilla root store.
    SSL CERTIFICATE AUTHORITY = SECRET ca_cert
);
WARNING! It is insecure to use the SASL_PLAINTEXT security protocol unless you are using a network security connection to tunnel into a private network, as shown below.
CREATE SECRET kafka_password AS '...';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
    SECURITY PROTOCOL = 'SASL_PLAINTEXT',
    SASL MECHANISMS = 'SCRAM-SHA-256', -- or `PLAIN` or `SCRAM-SHA-512`
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password
    SSH TUNNEL ssh_connection
);
CREATE SECRET kafka_password AS '...';
CREATE SECRET ca_cert AS '-----BEGIN CERTIFICATE----- ...';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000-kafka.upstash.io:9092',
    SECURITY PROTOCOL = 'SASL_SSL',
    SASL MECHANISMS = 'SCRAM-SHA-256', -- or `PLAIN` or `SCRAM-SHA-512`
    SASL USERNAME = 'foo',
    SASL PASSWORD = SECRET kafka_password,
    -- Specifying a certificate authority is only required if your cluster's
    -- certificates are not issued by a CA trusted by the Mozilla root store.
    SSL CERTIFICATE AUTHORITY = SECRET ca_cert
);

Network security

If your Kafka broker is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host.

PREVIEW This feature is in public preview. It may have performance or stability issues.
WARNING! If your Kafka cluster advertises brokers that are not specified in the BROKERS clause, Materialize will attempt to connect to those brokers without any tunneling.
BROKERS ( kafka_broker , )
kafka_broker
'host:port' USING AWS PRIVATELINK aws_connection ( broker_option , )
broker_option
PORT number AVAILABILITY ZONE az

The USING clause specifies that Materialize should connect to the designated broker via an AWS PrivateLink service. Brokers do not need to be configured the same way, but the clause must be individually attached to each broker that you want to connect to via the tunnel.

Field Value Required Description
AWS PRIVATELINK object name The name of an AWS PrivateLink connection through which network traffic for this broker should be routed.
AVAILABILITY ZONE text The ID of the availability zone of the AWS PrivateLink service in which the broker is accessible. If unspecified, traffic will be routed to each availability zone declared in the AWS PrivateLink connection in sequence until the correct availability zone for the broker is discovered. If specified, Materialize will always route connections via the specified availability zone.
PORT integer The port of the AWS PrivateLink service to connect to. Defaults to the broker’s port.

Suppose you have the following infrastructure:

  • A Kafka cluster consisting of two brokers named broker1 and broker2, both listening on port 9092.

  • A Network Load Balancer that forwards port 9092 to broker1:9092 and port 9093 to broker2:9092.

  • A PrivateLink endpoint service attached to the load balancer.

You can create a connection to this Kafka broker in Materialize like so:

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS (
        'broker1:9092' USING AWS PRIVATELINK privatelink_svc,
        'broker2:9092' USING AWS PRIVATELINK privatelink_svc (PORT 9093)
    )
);

For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.

Syntax
WARNING! If you do not specify a default SSH TUNNEL and your Kafka cluster advertises brokers that are not listed in the BROKERS clause, Materialize will attempt to connect to those brokers without any tunneling.
BROKERS ( kafka_broker , )
kafka_broker
'host:port' USING SSH TUNNEL ssh_connection

The USING clause specifies that Materialize should connect to the designated broker via an SSH bastion server. Brokers do not need to be configured the same way, but the clause must be individually attached to each broker that you want to connect to via the tunnel.

Connection options
Field Value Required Description
SSH TUNNEL object name The name of an SSH tunnel connection through which network traffic for this broker should be routed.
Example

Using a default SSH tunnel:

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'broker1:9092',
    SSH TUNNEL ssh_connection
);

Using different SSH tunnels for each broker, with a default for brokers that are not listed:

CREATE CONNECTION ssh1 TO SSH TUNNEL (HOST 'ssh1', ...);
CREATE CONNECTION ssh2 TO SSH TUNNEL (HOST 'ssh2', ...);

CREATE CONNECTION kafka_connection TO KAFKA (
BROKERS (
    'broker1:9092' USING SSH TUNNEL ssh1,
    'broker2:9092' USING SSH TUNNEL ssh2
    )
    SSH TUNNEL ssh_1
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.

Confluent Schema Registry

A Confluent Schema Registry connection establishes a link to a Confluent Schema Registry server. You can use Confluent Schema Registry connections in the FORMAT clause of CREATE SOURCE and CREATE SINK statements.

Syntax

CREATE CONNECTION IF NOT EXISTS connection_name TO CONFLUENT SCHEMA REGISTRY ( field = val , ) WITH with_options

Connection options

Field
Value Description
URL text The schema registry URL.

Required.
USERNAME secret or text The username to use for basic HTTP authentication.
PASSWORD secret The password to use for basic HTTP authentication.

Required and only valid if USERNAME is specified.
SSL CERTIFICATE secret or text Your TLS certificate in PEM format for TLS client authentication. If unspecified, no TLS client authentication is performed.

Only respected if the URL uses the https protocol.
SSL KEY secret Your TLS certificate’s key in PEM format.

Required and only valid if SSL CERTIFICATE is specified.
SSL CERTIFICATE AUTHORITY secret or text The certificate authority (CA) certificate in PEM format. Used to validate the server’s TLS certificate. If unspecified, uses the system’s default CA certificates.

Only respected if the URL uses the https protocol.

WITH options

Field Value Description
VALIDATE boolean Default: true. Whether connection validation should be performed on connection creation.

Examples

Using username and password authentication with TLS encryption:

CREATE SECRET csr_password AS '...';
CREATE SECRET ca_cert AS '-----BEGIN CERTIFICATE----- ...';

CREATE CONNECTION csr_basic TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://rp-f00000bar.data.vectorized.cloud:30993',
    USERNAME = 'foo',
    PASSWORD = SECRET csr_password
    -- Specifying a certificate authority is only required if your cluster's
    -- certificates are not issued by a CA trusted by the Mozilla root store.
    SSL CERTIFICATE AUTHORITY = SECRET ca_cert
);

Using TLS for encryption and authentication:

CREATE SECRET csr_ssl_cert AS '-----BEGIN CERTIFICATE----- ...';
CREATE SECRET csr_ssl_key AS '-----BEGIN PRIVATE KEY----- ...';
CREATE SECRET ca_cert AS '-----BEGIN CERTIFICATE----- ...';

CREATE CONNECTION csr_ssl TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://rp-f00000bar.data.vectorized.cloud:30993',
    SSL CERTIFICATE = SECRET csr_ssl_cert,
    SSL KEY = SECRET csr_ssl_key,
    -- Specifying a certificate authority is only required if your cluster's
    -- certificates are not issued by a CA trusted by the Mozilla root store.
    SSL CERTIFICATE AUTHORITY = SECRET ca_cert
);

Network security

If your Confluent Schema Registry server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host.

Field Value Required Description
AWS PRIVATELINK object name The name of an AWS PrivateLink connection through which network traffic should be routed.
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION csr_privatelink TO CONFLUENT SCHEMA REGISTRY (
    URL 'http://my-confluent-schema-registry:8081',
    AWS PRIVATELINK privatelink_svc
);
Connection options
Field Value Required Description
SSH TUNNEL object name The name of an SSH tunnel connection through which network traffic should be routed.
Example
CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST '<SSH_BASTION_HOST>',
    USER '<SSH_BASTION_USER>',
    PORT <SSH_BASTION_PORT>
);

CREATE CONNECTION csr_ssh TO CONFLUENT SCHEMA REGISTRY (
    URL 'http://my-confluent-schema-registry:8081',
    SSH TUNNEL ssh_connection
);

PostgreSQL

A Postgres connection establishes a link to a single database of a PostgreSQL server. You can use Postgres connections to create sources.

Syntax

CREATE CONNECTION IF NOT EXISTS connection_name TO POSTGRES ( field = val , ) WITH with_options

Connection options

Field Value Required Description
HOST text Database hostname.
PORT integer Default: 5432. Port number to connect to at the server host.
DATABASE text Target database.
USER text Database username.
PASSWORD secret Password for the connection
SSL CERTIFICATE AUTHORITY secret or text The certificate authority (CA) certificate in PEM format. Used for both SSL client and server authentication. If unspecified, uses the system’s default CA certificates.
SSL MODE text Default: disable. Enables SSL connections if set to require, verify_ca, or verify_full.
SSL CERTIFICATE secret or text Client SSL certificate in PEM format.
SSL KEY secret Client SSL key in PEM format.

WITH options

Field Value Description
VALIDATE boolean Default: true. Whether connection validation should be performed on connection creation.

Example

CREATE SECRET pgpass AS '<POSTGRES_PASSWORD>';

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    USER 'postgres',
    PASSWORD SECRET pgpass,
    SSL MODE 'require',
    DATABASE 'postgres'
);

Network security

If your PostgreSQL server is not exposed to the public internet, you can tunnel the connection through an AWS PrivateLink service or an SSH bastion host.

Field Value Required Description
AWS PRIVATELINK object name The name of an AWS PrivateLink connection through which network traffic should be routed.
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
   SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
   AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass,
    AWS PRIVATELINK privatelink_svc
);

For step-by-step instructions on creating AWS PrivateLink connections and configuring an AWS PrivateLink service to accept connections from Materialize, check this guide.

Connection options
Field Value Required Description
SSH TUNNEL object name The name of an SSH tunnel connection through which network traffic should be routed.
Example
CREATE CONNECTION tunnel TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize',
);

CREATE CONNECTION pg_connection TO POSTGRES (
    HOST 'instance.foo000.us-west-1.rds.amazonaws.com',
    PORT 5432,
    SSH TUNNEL tunnel,
    DATABASE 'postgres'
);

For step-by-step instructions on creating SSH tunnel connections and configuring an SSH bastion server to accept connections from Materialize, check this guide.

Network security connections

PREVIEW This feature is in public preview. It may have performance or stability issues.

An AWS PrivateLink connection establishes a link to an AWS PrivateLink service. You can use AWS PrivateLink connections in Confluent Schema Registry connections, Kafka connections, and Postgres connections.

CREATE CONNECTION IF NOT EXISTS connection_name TO AWS PRIVATELINK ( field = val , ) WITH with_options
Field Value Required Description
SERVICE NAME text The name of the AWS PrivateLink service.
AVAILABILITY ZONES text[] The IDs of the AWS availability zones in which the service is accessible.

Materialize assigns a unique principal to each AWS PrivateLink connection in your region using an Amazon Resource Name of the following form:

arn:aws:iam::664411391173:role/mz_<REGION-ID>_<CONNECTION-ID>

After creating the connection, you must configure the AWS PrivateLink service to accept connections from the AWS principal Materialize will connect as. The principals for AWS PrivateLink connections in your region are stored in the mz_aws_privatelink_connections system table.

SELECT * FROM mz_aws_privatelink_connections;
   id   |                                 principal
--------+---------------------------------------------------------------------------
 u1     | arn:aws:iam::664411391173:role/mz_20273b7c-2bbe-42b8-8c36-8cc179e9bbc3_u1
 u7     | arn:aws:iam::664411391173:role/mz_20273b7c-2bbe-42b8-8c36-8cc179e9bbc3_u7

For more details on configuring a trusted principal for your AWS PrivateLink service, see the AWS PrivateLink documentation.

WARNING! Do not grant access to the root principal for the Materialize AWS account. Doing so will allow any Materialize customer to create a connection to your AWS PrivateLink service.

If your AWS PrivateLink service is configured to require acceptance of connection requests, you must additionally approve the connection request from Materialize after creating the connection. For more details on manually accepting connection requests, see the AWS PrivateLink documentation.

CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
    SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
    AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);

SSH tunnel

An SSH tunnel connection establishes a link to an SSH bastion server. You can use SSH tunnel connections in Kafka connections, and Postgres connections.

Syntax

CREATE CONNECTION IF NOT EXISTS connection_name TO SSH TUNNEL ( field = val , ) WITH with_options

Connection options

Field Value Required Description
HOST text The hostname of the SSH bastion server.
PORT integer The port to connect to.
USER text The name of the user to connect as.

Key pairs

Materialize automatically manages the key pairs for an SSH tunnel connection. Each connection is associated with two key pairs. The private key for each key pair is stored securely within your region and cannot be retrieved. The public key for each key pair is stored in the mz_ssh_tunnel_connections system table.

When Materialize connects to the SSH bastion server, it presents both keys for authentication. To allow key pair rotation without downtime, you should configure your SSH bastion server to accept both key pairs. You can then rotate the key pairs using ALTER CONNECTION.

Materialize currently generates SSH key pairs using the Ed25519 algorithm, which is fast, secure, and recommended by security professionals. Some legacy SSH servers do not support the Ed25519 algorithm. You will not be able to use these servers with Materialize’s SSH tunnel connections.

We routinely evaluate the security of the cryptographic algorithms in use in Materialize. Future versions of Materialize may use a different SSH key generation algorithm as security best practices evolve.

Examples

Create an SSH tunnel connection:

CREATE CONNECTION ssh_connection TO SSH TUNNEL (
    HOST 'bastion-host',
    PORT 22,
    USER 'materialize'
);

Retrieve the public keys for all SSH tunnel connections:

SELECT * FROM mz_ssh_tunnel_connections;
 id    | public_key_1                          | public_key_2
-------+---------------------------------------+---------------------------------------
 ...   | ssh-ed25519 AAAA...76RH materialize   | ssh-ed25519 AAAA...hLYV materialize

Connection validation

Materialize automatically validates the connection and authentication parameters for most connection types on connection creation:

Connection type Validated by default
PostgreSQL
Kafka
Confluent Schema Registry
SSH Tunnel
AWS PrivateLink

For connection types that are validated by default, if the validation step fails, the creation of the connection will also fail and a validation error is returned. You can disable connection validation by setting the VALIDATE option to false. This is useful, for example, when the parameters are known to be correct but the external system is unavailable at the time of creation.

Connection types that require additional setup steps after creation, like SSH tunnel and AWS PrivateLink connections, can be manually validated using the VALIDATE CONNECTION syntax once all setup steps are completed.

Privileges

The privileges required to execute this statement are:

  • CREATE privileges on the containing schema.
  • USAGE privileges on all connections and secrets used in the connection definition.
  • USAGE privileges on the schemas that all connections and secrets in the statement are contained in.
Back to top ↑