CREATE SOURCE: Webhook

PREVIEW This feature is in public preview. It may have performance or stability issues.

CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.

Webhook sources expose a public URL that allows your applications to push webhook events into Materialize.

Syntax

CREATE SOURCE IF NOT EXISTS src_name IN CLUSTER cluster_name FROM WEBHOOK BODY FORMAT TEXT JSON ARRAY BYTES INCLUDE HEADER header_name AS column_alias BYTES INCLUDE HEADERS ( NOT header_name , ) CHECK ( WITH ( webhook_check_option , ) check_expression )

webhook_check_option

BODY HEADERS SECRET secret_name AS alias BYTES
Field Use
src_name The name for the source
IN CLUSTER cluster_name The cluster to maintain this source.
INCLUDE HEADER Map a header value from a request into a column.
INCLUDE HEADERS Include a column named 'headers' of type map[text => text] containing the headers of the request.
CHECK Specify a boolean expression that is used to validate each request received by the source.

CHECK WITH options

Field Type Description
BODY text or bytea Provide a body column to the check expression. The column can be renamed with the optional AS alias statement, and the type can be changed to bytea with the optional BYTES keyword.
HEADERS map[text=>text] or map[text=>bytea] Provide a column 'headers' to the check expression. The column can be renamed with the optional AS alias statement, and the type can be changed to map[text => bytea] with the optional BYTES keyword.
SECRET secret_name text or bytea Provide a secret_name column to the check expression, with the value of the SECRET. The column can be renamed with the optional AS alias statement, and the type can be changed to bytea with the optional BYTES keyword.

Supported formats

Body format
Type Description
BYTES bytea Does no parsing of the request, and stores the body of a request as it was received.
JSON jsonb Parses the body of a request as JSON. If the body is not valid JSON, a response of 400 Bad Request will be returned.
JSON ARRAY jsonb Parses the body of a request as a list of JSON objects, automatically expanding the list of objects to individual rows. Also accepts a single JSON object. If the body is not valid JSON, a response of 400 Bad Request will be returned.
TEXT text Parses the body of a request as UTF-8 text. If the body is not valid UTF-8, a response of 400 Bad Request will be returned.

Output

If source creation is successful, you’ll have a new source object with name src_name and, based on what you defined with BODY FORMAT and INCLUDE HEADERS, the following columns:

Column Type Optional?
body bytea, jsonb, or text
headers map[text => text] ✓ . Present if INCLUDE HEADERS is specified.

Webhook URL

After source creation, the unique URL that allows you to POST events to the source can be looked up in the mz_internal.mz_webhook_sources system catalog table. The URL will have the following format:

https://<HOST>/api/webhook/<database>/<schema>/<src_name>

A breakdown of each component is as follows:

  • <HOST>: The Materialize instance URL, which can be found on the Materialize console.
  • <database>: The name of the database where the source is created (default is materialize).
  • <schema>: The schema name where the source gets created (default is public).
  • <src_name>: The name you provided for your source at the time of creation.
NOTE: This is a public URL that is open to the internet and has no security. To validate that requests are legitimate, see Validating requests. For limits imposed on this endpoint, see Request limits.

Features

Exposing headers

In addition to the request body, Materialize can expose headers to SQL. If a request header exists, you can map its fields to columns using the INCLUDE HEADER syntax.

CREATE SOURCE my_webhook_source IN CLUSTER my_cluster FROM WEBHOOK
  BODY FORMAT JSON
  INCLUDE HEADER 'timestamp' as ts
  INCLUDE HEADER 'x-event-type' as event_type;

This example would have the following columns:

Column Type Nullable?
body jsonb No
ts text Yes
event_type text Yes

All of the header columns are nullable. If the header of a request does not contain a specified field, the NULL value will be used as a default.

Excluding header fields

To exclude specific header fields from the mapping, use the INCLUDER HEADERS syntax in combination with the NOT option. This can be useful if, for example, you need to accept a dynamic list of fields but want to exclude sensitive information like authorization.

CREATE SOURCE my_webhook_source IN CLUSTER my_cluster FROM WEBHOOK
  BODY FORMAT JSON
  INCLUDE HEADERS ( NOT 'authorization', NOT 'x-api-key' );

This example would have the following columns:

Column Type Nullable?
body jsonb No
headers map[text => text] No

All header fields but 'authorization' and 'x-api-key' will get included in the headers map column.

Validating requests

WARNING! Without a CHECK statement, all requests will be accepted. To prevent bad actors from injecting data into your source, it is strongly encouraged that you define a CHECK statement with your webhook sources.

It’s common for applications using webhooks to provide a method for validating a request is legitimate. You can specify an expression to do this validation for your webhook source using the CHECK clause.

For example, the following source HMACs the request body using the sha256 hashing algorithm, and asserts the result is equal to the value provided in the x-signature header, decoded with base64.

CREATE SOURCE my_webhook_source IN CLUSTER my_cluster FROM WEBHOOK
  BODY FORMAT JSON
  CHECK (
    WITH (
      HEADERS, BODY AS request_body,
      SECRET my_webhook_shared_secret
    )
    constant_time_eq(
        decode(headers->'x-signature', 'base64'),
        hmac(request_body, my_webhook_shared_secret, 'sha256')
    )
  );

The headers and body of the request are only subject to validation if WITH ( BODY, HEADERS, ... ) is specified as part of the CHECK statement. By default, the type of body used for validation is text, regardless of the BODY FORMAT you specified for the source. In the example above, the body column for my_webhook_source has a type of jsonb, but request_body as used in the validation expression has type text. Futher, the request headers are not persisted as part of my_webhook_source, since INCLUDE HEADERS was not specified — but they are provided to the validation expression.

Debugging validation

It can be difficult to get your CHECK statement correct, especially if your application does not have a way to send test events. If you’re having trouble with your CHECK statement, we recommend creating a temporary source without CHECK and using that to iterate more quickly.

CREATE SOURCE my_webhook_temporary_debug IN CLUSTER my_cluster FROM WEBHOOK
  -- Specify the BODY FORMAT as TEXT or BYTES,
  -- which is how it's provided to CHECK.
  BODY FORMAT TEXT
  INCLUDE HEADERS;

Once you have a few events in my_webhook_temporary_debug, you can query it with your would-be CHECK statement.

SELECT
  -- Your would-be CHECK statement.
  constant_time_eq(
    decode(headers->'signature', 'base64'),
    hmac(headers->'timestamp' || body, 'my key', 'sha512')
  )
FROM my_webhook_temporary_debug
LIMIT 10;
NOTE: It’s not possible to use secrets in a SELECT statement, so you’ll need to provide these values as raw text for debugging.

Handling duplicated and partial events

Given any number of conditions, e.g. a network hiccup, it’s possible for your application to send an event more than once. If your event contains a unique identifier, you can de-duplicate these events using a MATERIALIZED VIEW and the DISCINCT ON clause.

CREATE MATERIALIZED VIEW my_webhook_idempotent IN CLUSTER my_compute_cluster AS (
  SELECT DISTINCT ON (body->>'unique_id') *
  FROM my_webhook_source
  ORDER BY id
);

We can take this technique a bit further to handle partial events. Let’s pretend our application tracks the completion of build jobs, and it sends us JSON objects with following structure.

Key Value Optional?
id text No
started_at text Yes
finished_at text Yes

When a build job starts we receive an event containing id and the started_at timestamp. When a build finished, we’ll receive a second event with the same id but now a finished_at timestamp. To merge these events into a single row, we can again use the DISTINCT ON clause.

CREATE MATERIALIZED VIEW my_build_jobs_merged IN CLUSTER my_compute_cluster AS (
  SELECT DISTINCT ON (id) *
  FROM (
    SELECT
      body->>'id' as id,
      try_parse_monotonic_iso8601_timestamp(body->>'started_at') as started_at,
      try_parse_monotonic_iso8601_timestamp(body->>'finished_at') as finished_at
    FROM my_build_jobs_source
  )
  ORDER BY id, finished_at NULLS LAST, started_at NULLS LAST
);
NOTE: When casting from text to timestamp you should prefer to use the try_parse_monotonic_iso8601_timestamp function, which enables temporal filter pushdown.

Handling batch events

The application pushing events to your webhook source may batch multiple events into a single HTTP request. You can automatically expand this batch of requests into separate rows using BODY FORMAT JSON ARRAY.

-- Webhook source that parses request bodies as a JSON array.
CREATE SOURCE webhook_source_json_batch IN CLUSTER my_cluster FROM WEBHOOK
  BODY FORMAT JSON ARRAY
  INCLUDE HEADERS;

If you POST a JSON array of three elements to webhook_source_json_batch, three rows will get appended to the source.

POST webhook_source_json_batch
[
  { "event_type": "a" },
  { "event_type": "b" },
  { "event_type": "c" }
]
SELECT COUNT(body) FROM webhook_source_json_batch;
----
3

You can also post a single object to the source, which will get appended as one row.

POST webhook_source_json_batch
{ "event_type": "d" }
SELECT body FROM webhook_source_json_batch;
----
{ "event_type": "a" }
{ "event_type": "b" }
{ "event_type": "c" }
{ "event_type": "d" }

Request limits

Webhook sources apply the following limits to received requests:

  • The maximum size of the request body is 2MB. Requests larger than this will fail with 413 Payload Too Large.
  • The maximum number of concurrent connections is 500, across all webhook sources. Trying to connect when the server is at the capacity will return 429 Too Many Requests.
  • Requests that contain a header name specified more than once will be rejected with 401 Unauthorized.

Examples

Using basic authentication

Basic authentication enables a simple and rudimentary way to grant authorization to your webhook source.

To store the sensitive credentials and make them reusable across multiple CREATE SOURCE statements, use secrets.

CREATE SECRET basic_hook_auth AS 'Basic <base64_auth>';

Creating a source

After a successful secret creation, you can use the same secret to create different webhooks with the same basic authentication to check if a request is valid.

CREATE SOURCE webhook_with_basic_auth IN CLUSTER my_cluster
FROM WEBHOOK
    BODY FORMAT JSON
    CHECK (
      WITH (
        HEADERS,
        BODY AS request_body,
        SECRET basic_hook_auth
      )
      constant_time_eq(headers->'authorization', basic_hook_auth)
    );

Your new webhook is now up and ready to accept requests using basic authentication.

JSON parsing

Webhook data is ingested as a JSON blob. We recommend creating a parsing view on top of your webhook source that maps the individual fields to columns with the required data types. To avoid doing this tedious task manually, you can use this JSON parsing widget!

Back to top ↑