Segment

This guide walks through the steps to ingest data from Segment into Materialize using the Webhook source.

💡 Tip: For help getting started with your own data, you can schedule a free guided trial.

Before you begin

Ensure that you have:

  • A Segment source set up and running.

Step 1. (Optional) Create a cluster

NOTE: If you are prototyping and already have a cluster to host your webhook source (e.g. quickstart), you can skip this step. For production scenarios, we recommend separating your workloads into multiple clusters for resource isolation.

To create a cluster in Materialize, use the CREATE CLUSTER command:

CREATE CLUSTER webhooks_cluster (SIZE = '25cc');

SET CLUSTER = webhooks_cluster;

Step 2. Create a secret

To validate requests between Segment and Materialize, you must create a secret:

CREATE SECRET segment_webhook_secret AS '<secret_value>';

Change the <secret_value> to a unique value that only you know and store it in a secure location.

Step 3. Set up a webhook source

Using the secret from the previous step, create a webhook source in Materialize to ingest data from Segment. By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause.

CREATE SOURCE segment_source IN CLUSTER webhooks_cluster FROM WEBHOOK
  BODY FORMAT JSON
  INCLUDE HEADER 'event-type' AS event_type
  INCLUDE HEADERS
  CHECK (
    WITH ( BODY BYTES, HEADERS, SECRET segment_webhook_secret BYTES AS validation_secret)
    -- The constant_time_eq validation function **does not support** fully
    -- qualified secret names. We recommend always aliasing the secret name
    -- for ease of use.
    constant_time_eq(decode(headers->'x-signature', 'hex'), hmac(body, validation_secret, 'sha1'))
  );

After a successful run, the command returns a NOTICE message containing the unique webhook URL that allows you to POST events to the source. Copy and store it. You will need it for the next step.

The URL will have the following format:

https://<HOST>/api/webhook/<database>/<schema>/<src_name>

If you missed the notice, you can find the URLs for all webhook sources in the mz_internal.mz_webhook_sources system table.

Access and authentication

WARNING! Without a CHECK statement, all requests will be accepted. To prevent bad actors from injecting data into your source, it is strongly encouraged that you define a CHECK statement with your webhook sources.

The CHECK clause defines how to validate each request. At the time of writing, Segment validates requests by signing them with an HMAC in the X-Signature request header. The HMAC is a hex-encoded SHA1 hash using the secret from Step 2. and the request body. Materialize decodes the signature using the decode function, getting the raw bytes, and generate our own HMAC using the hmac function. If the two values are equal, then the request is legitimate!

Step 4. Create a webhook destination in Segment

  1. In Segment, go to Connections > Catalog.

  2. Search and click Webhooks (Actions).

  3. Click Add destination.

  4. Select a data source and click Next.

  5. Jot a Destination Name and click Create Destination.

Step 5. Configure the mapping in Segment

A webhook destination in Segment requires a data mapping to send events from the source to the destination. For this guide, the destination is the Materialize source. Follow these steps to create the correct mapping:

  1. Go to your webhook destination created in the previous step.

  2. Click on Mappings > New Mapping.

  3. Click Send and fill the configuration as follows:

    1. In Select events to map and send, fill the conditions as you wish.
    2. In Add test, select any of the events.
    3. In Select mappings, fill the fields as follows:
      Field Value
      Url Use the URL from the Step 3.
      Method POST
      Batch Size 0
      Headers -
      Data $event
      Enable Batching No.
  4. Click Test Mapping and validate the webhook is working.

  5. After a succesful test, click Save.

NOTE:

If Test Mapping fails in and throws a “failed to validate the request” error, it means the shared secret is not right. To fix this, follow this steps:

  1. In Segment, go to your webhook destination created in the Step 4.
  2. Click Settings.
  3. In Shared Secret, enter the secret created in the Step 2.
  4. Click Save Changes.

Step 6. Validate incoming data

With the source set up in Materialize and the webhook destination configured in Segment, you can now query the incoming data:

  1. In the Materialize console, navigate to the SQL Shell.

  2. Use SQL queries to inspect and analyze the incoming data:

    SELECT * FROM segment_source LIMIT 10;
    

Step 7. Transform incoming data

JSON parsing

Webhook data is ingested as a JSON blob. We recommend creating a parsing view on top of your webhook source that uses jsonb operators to map the individual fields to columns with the required data types.

CREATE VIEW parse_segment AS SELECT
    body->>'anonymousId' AS anonymousId,
    body->>'channel' AS channel,
    body->'context'->>'ip' AS context_ip,
    body->'context'->>'userAgent' AS context_user_agent,
    (body->'integrations'->>'All')::bool AS integrations_all,
    (body->'integrations'->>'Mixpanel')::bool AS integrations_mixpanel,
    (body->'integrations'->>'Salesforce')::bool AS integrations_salesforce,
    body->>'messageId' AS messageId,
    body->>'name' AS name,
    body->'properties'->>'title' AS properties_title,
    body->'properties'->>'url' AS properties_url,
    try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
    try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
    try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
    body->>'type' AS type,
    body->>'userId' AS user_id,
    body->>'version' AS version
FROM segment_source;
CREATE VIEW parse_segment AS SELECT
    body->>'anonymousId' AS anonymous_id,
    body->'context'->'library'->>'name' AS context_library_name,
    (body->'context'->'library'->>'version') AS context_library_version,
    body->'context'->'page'->>'path' AS context_page_path,
    body->'context'->'page'->>'referrer' AS context_page_referrer,
    body->'context'->'page'->>'search' AS context_page_search,
    body->'context'->'page'->>'title' AS context_page_title,
    body->'context'->'page'->>'url' AS context_page_url,
    body->'context'->>'userAgent' AS context_userAgent,
    body->'context'->>'ip' AS context_ip,
    body->>'event' AS event,
    body->>'messageId' AS message_id,
    body->'properties'->>'title' AS properties_title,
    try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
    try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
    try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
    body->>'type' AS type,
    body->>'userId' AS user_id,
    try_parse_monotonic_iso8601_timestamp(body->>'originalTimestamp') AS original_timestamp
FROM segment_source;
CREATE VIEW parse_segment AS SELECT
    body->>'anonymousId' AS anonymous_id,
    body->>'channel' AS channel,
    body->'context'->>'ip' AS context_ip,
    body->'context'->>'userAgent' AS context_user_agent,
    (body->'integrations'->>'All')::bool AS integrations_all,
    (body->'integrations'->>'Mixpanel')::bool AS integrations_mixpanel,
    (body->'integrations'->>'Salesforce')::bool AS integrations_salesforce,
    body->>'messageId' AS messageId,
    try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
    try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
    try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
    body->'traits'->>'name' AS traits_name,
    body->'traits'->>'email' AS traits_email,
    body->'traits'->>'plan' AS traits_plan,
    (body->'traits'->>'logins')::numeric AS traits_logins,
    body->'traits'->'address'->>'street' AS traits_address_street,
    body->'traits'->'address'->>'city' AS traits_address_city,
    body->'traits'->'address'->>'state' AS traits_address_state,
    (body->'traits'->'address'->>'postalCode') AS traits_address_postalCode,
    body->'traits'->'address'->>'country' AS traits_address_country,
    body->>'type' AS type,
    body->>'userId' AS user_id,
    body->>'version' AS version
FROM segment_source;
Manually parsing JSON-formatted data in SQL can be tedious. 🫠 You can use the widget below to automatically turn a sample JSON payload into a parsing view with the individual fields mapped to columns.

Target object type

Timestamp handling

We highly recommend using the try_parse_monotonic_iso8601_timestamp function when casting from text to timestamp, which enables temporal filter pushdown.

Deduplication

With the vast amount of data processed and potential network issues, it’s not uncommon to receive duplicate records. You can use the DISTINCT ON clause to efficiently remove duplicates. For more details, refer to the webhook source reference documentation.

Next steps

With Materialize ingesting your Segment data, you can start exploring it, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently. For more details, check out the Segment documentation and the webhook source reference documentation.

Back to top ↑