Segment
This guide walks through the steps to ingest data from Segment into Materialize using the Webhook source.
Before you begin
Ensure that you have:
- A Segment source set up and running.
Step 1. (Optional) Create a cluster
quickstart
), you can skip this step. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
To create a cluster in Materialize, use the CREATE CLUSTER
command:
CREATE CLUSTER webhooks_cluster (SIZE = '25cc');
SET CLUSTER = webhooks_cluster;
Step 2. Create a secret
To validate requests between Segment and Materialize, you must create a secret:
CREATE SECRET segment_webhook_secret AS '<secret_value>';
Change the <secret_value>
to a unique value that only you know and store it in a secure location.
Step 3. Set up a webhook source
Using the secret from the previous step, create a webhook source
in Materialize to ingest data from Segment. By default, the source will be
created in the active cluster; to use a different cluster, use the IN CLUSTER
clause.
CREATE SOURCE segment_source IN CLUSTER webhooks_cluster FROM WEBHOOK
BODY FORMAT JSON
INCLUDE HEADER 'event-type' AS event_type
INCLUDE HEADERS
CHECK (
WITH ( BODY BYTES, HEADERS, SECRET segment_webhook_secret BYTES AS validation_secret)
-- The constant_time_eq validation function **does not support** fully
-- qualified secret names. We recommend always aliasing the secret name
-- for ease of use.
constant_time_eq(decode(headers->'x-signature', 'hex'), hmac(body, validation_secret, 'sha1'))
);
After a successful run, the command returns a NOTICE
message containing the
unique webhook URL
that allows you to POST
events to the source. Copy and store it. You will need
it for the next step.
The URL will have the following format:
https://<HOST>/api/webhook/<database>/<schema>/<src_name>
If you missed the notice, you can find the URLs for all webhook sources in the
mz_internal.mz_webhook_sources
system table.
Access and authentication
CHECK
statement, all requests will be accepted. To prevent bad
actors from injecting data into your source, it is strongly encouraged that
you define a CHECK
statement with your webhook sources.
The CHECK
clause defines how to validate each request. At the time of writing,
Segment validates requests by signing them with an HMAC in the X-Signature
request header. The HMAC is a hex-encoded SHA1 hash using the secret
from Step 2. and the request body. Materialize decodes the signature using
the decode
function, getting the raw bytes, and
generate our own HMAC using the hmac
function. If the
two values are equal, then the request is legitimate!
Step 4. Create a webhook destination in Segment
-
In Segment, go to Connections > Catalog.
-
Search and click Webhooks (Actions).
-
Click Add destination.
-
Select a data source and click Next.
-
Jot a Destination Name and click Create Destination.
Step 5. Configure the mapping in Segment
A webhook destination in Segment requires a data mapping to send events from the source to the destination. For this guide, the destination is the Materialize source. Follow these steps to create the correct mapping:
-
Go to your webhook destination created in the previous step.
-
Click on Mappings > New Mapping.
-
Click Send and fill the configuration as follows:
- In Select events to map and send, fill the conditions as you wish.
- In Add test, select any of the events.
- In Select mappings, fill the fields as follows:
Field Value Url Use the URL from the Step 3. Method POST
Batch Size 0
Headers - Data $event
Enable Batching No
.
-
Click Test Mapping and validate the webhook is working.
-
After a succesful test, click Save.
If Test Mapping fails in and throws a “failed to validate the request” error, it means the shared secret is not right. To fix this, follow this steps:
- In Segment, go to your webhook destination created in the Step 4.
- Click Settings.
- In Shared Secret, enter the secret created in the Step 2.
- Click Save Changes.
Step 6. Validate incoming data
With the source set up in Materialize and the webhook destination configured in Segment, you can now query the incoming data:
-
In the Materialize console, navigate to the SQL Shell.
-
Use SQL queries to inspect and analyze the incoming data:
SELECT * FROM segment_source LIMIT 10;
Step 7. Transform incoming data
JSON parsing
Webhook data is ingested as a JSON blob. We recommend creating a parsing view on
top of your webhook source that uses jsonb
operators
to map the individual fields to columns with the required data types.
CREATE VIEW parse_segment AS SELECT
body->>'anonymousId' AS anonymousId,
body->>'channel' AS channel,
body->'context'->>'ip' AS context_ip,
body->'context'->>'userAgent' AS context_user_agent,
(body->'integrations'->>'All')::bool AS integrations_all,
(body->'integrations'->>'Mixpanel')::bool AS integrations_mixpanel,
(body->'integrations'->>'Salesforce')::bool AS integrations_salesforce,
body->>'messageId' AS messageId,
body->>'name' AS name,
body->'properties'->>'title' AS properties_title,
body->'properties'->>'url' AS properties_url,
try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
body->>'type' AS type,
body->>'userId' AS user_id,
body->>'version' AS version
FROM segment_source;
CREATE VIEW parse_segment AS SELECT
body->>'anonymousId' AS anonymous_id,
body->'context'->'library'->>'name' AS context_library_name,
(body->'context'->'library'->>'version') AS context_library_version,
body->'context'->'page'->>'path' AS context_page_path,
body->'context'->'page'->>'referrer' AS context_page_referrer,
body->'context'->'page'->>'search' AS context_page_search,
body->'context'->'page'->>'title' AS context_page_title,
body->'context'->'page'->>'url' AS context_page_url,
body->'context'->>'userAgent' AS context_userAgent,
body->'context'->>'ip' AS context_ip,
body->>'event' AS event,
body->>'messageId' AS message_id,
body->'properties'->>'title' AS properties_title,
try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
body->>'type' AS type,
body->>'userId' AS user_id,
try_parse_monotonic_iso8601_timestamp(body->>'originalTimestamp') AS original_timestamp
FROM segment_source;
CREATE VIEW parse_segment AS SELECT
body->>'anonymousId' AS anonymous_id,
body->>'channel' AS channel,
body->'context'->>'ip' AS context_ip,
body->'context'->>'userAgent' AS context_user_agent,
(body->'integrations'->>'All')::bool AS integrations_all,
(body->'integrations'->>'Mixpanel')::bool AS integrations_mixpanel,
(body->'integrations'->>'Salesforce')::bool AS integrations_salesforce,
body->>'messageId' AS messageId,
try_parse_monotonic_iso8601_timestamp(body->>'receivedAt') AS received_at,
try_parse_monotonic_iso8601_timestamp(body->>'sentAt') AS sent_at,
try_parse_monotonic_iso8601_timestamp(body->>'timestamp') AS timestamp,
body->'traits'->>'name' AS traits_name,
body->'traits'->>'email' AS traits_email,
body->'traits'->>'plan' AS traits_plan,
(body->'traits'->>'logins')::numeric AS traits_logins,
body->'traits'->'address'->>'street' AS traits_address_street,
body->'traits'->'address'->>'city' AS traits_address_city,
body->'traits'->'address'->>'state' AS traits_address_state,
(body->'traits'->'address'->>'postalCode') AS traits_address_postalCode,
body->'traits'->'address'->>'country' AS traits_address_country,
body->>'type' AS type,
body->>'userId' AS user_id,
body->>'version' AS version
FROM segment_source;
Timestamp handling
We highly recommend using the try_parse_monotonic_iso8601_timestamp
function when casting from text
to timestamp
, which enables temporal filter
pushdown.
Deduplication
With the vast amount of data processed and potential network issues, it’s not
uncommon to receive duplicate records. You can use the DISTINCT ON
clause to
efficiently remove duplicates. For more details, refer to the webhook source
reference documentation.
Next steps
With Materialize ingesting your Segment data, you can start exploring it, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently. For more details, check out the Segment documentation and the webhook source reference documentation.